summaryrefslogtreecommitdiffstats
path: root/Lib/test
diff options
context:
space:
mode:
authorGiampaolo Rodola <g.rodola@gmail.com>2019-09-30 04:51:55 (GMT)
committerGitHub <noreply@github.com>2019-09-30 04:51:55 (GMT)
commit5bcc6d89bcb622a6786fff632fabdcaf67dbb4e2 (patch)
tree99b61dfdbff605a44db4090aff63c0b0fa937c5a /Lib/test
parent25e115ec00b5f75e3589c9f21013c47c21e1753f (diff)
downloadcpython-5bcc6d89bcb622a6786fff632fabdcaf67dbb4e2.zip
cpython-5bcc6d89bcb622a6786fff632fabdcaf67dbb4e2.tar.gz
cpython-5bcc6d89bcb622a6786fff632fabdcaf67dbb4e2.tar.bz2
bpo-37096: Add large-file tests for modules using sendfile(2) (GH-13676)
Diffstat (limited to 'Lib/test')
-rw-r--r--Lib/test/test_largefile.py91
1 files changed, 85 insertions, 6 deletions
diff --git a/Lib/test/test_largefile.py b/Lib/test/test_largefile.py
index 8870c72..6c8813e 100644
--- a/Lib/test/test_largefile.py
+++ b/Lib/test/test_largefile.py
@@ -5,17 +5,19 @@ import os
import stat
import sys
import unittest
-from test.support import TESTFN, requires, unlink, bigmemtest
+import socket
+import shutil
+import threading
+from test.support import TESTFN, requires, unlink, bigmemtest, find_unused_port
import io # C implementation of io
import _pyio as pyio # Python implementation of io
# size of file to create (>2 GiB; 2 GiB == 2,147,483,648 bytes)
size = 2_500_000_000
+TESTFN2 = TESTFN + '2'
+
class LargeFileTest:
- """Test that each file function works as expected for large
- (i.e. > 2 GiB) files.
- """
def setUp(self):
if os.path.exists(TESTFN):
@@ -44,6 +46,13 @@ class LargeFileTest:
if not os.stat(TESTFN)[stat.ST_SIZE] == 0:
raise cls.failureException('File was not truncated by opening '
'with mode "wb"')
+ unlink(TESTFN2)
+
+
+class TestFileMethods(LargeFileTest):
+ """Test that each file function works as expected for large
+ (i.e. > 2 GiB) files.
+ """
# _pyio.FileIO.readall() uses a temporary bytearray then casted to bytes,
# so memuse=2 is needed
@@ -140,6 +149,72 @@ class LargeFileTest:
f.seek(pos)
self.assertTrue(f.seekable())
+
+class TestCopyfile(LargeFileTest, unittest.TestCase):
+ open = staticmethod(io.open)
+
+ def test_it(self):
+ # Internally shutil.copyfile() can use "fast copy" methods like
+ # os.sendfile().
+ size = os.path.getsize(TESTFN)
+ shutil.copyfile(TESTFN, TESTFN2)
+ self.assertEqual(os.path.getsize(TESTFN2), size)
+ with open(TESTFN2, 'rb') as f:
+ self.assertEqual(f.read(5), b'z\x00\x00\x00\x00')
+ f.seek(size - 5)
+ self.assertEqual(f.read(), b'\x00\x00\x00\x00a')
+
+
+@unittest.skipIf(not hasattr(os, 'sendfile'), 'sendfile not supported')
+class TestSocketSendfile(LargeFileTest, unittest.TestCase):
+ open = staticmethod(io.open)
+ timeout = 3
+
+ def setUp(self):
+ super().setUp()
+ self.thread = None
+
+ def tearDown(self):
+ super().tearDown()
+ if self.thread is not None:
+ self.thread.join(self.timeout)
+ self.thread = None
+
+ def tcp_server(self, sock):
+ def run(sock):
+ with sock:
+ conn, _ = sock.accept()
+ with conn, open(TESTFN2, 'wb') as f:
+ event.wait(self.timeout)
+ while True:
+ chunk = conn.recv(65536)
+ if not chunk:
+ return
+ f.write(chunk)
+
+ event = threading.Event()
+ sock.settimeout(self.timeout)
+ self.thread = threading.Thread(target=run, args=(sock, ))
+ self.thread.start()
+ event.set()
+
+ def test_it(self):
+ port = find_unused_port()
+ with socket.create_server(("", port)) as sock:
+ self.tcp_server(sock)
+ with socket.create_connection(("127.0.0.1", port)) as client:
+ with open(TESTFN, 'rb') as f:
+ client.sendfile(f)
+ self.tearDown()
+
+ size = os.path.getsize(TESTFN)
+ self.assertEqual(os.path.getsize(TESTFN2), size)
+ with open(TESTFN2, 'rb') as f:
+ self.assertEqual(f.read(5), b'z\x00\x00\x00\x00')
+ f.seek(size - 5)
+ self.assertEqual(f.read(), b'\x00\x00\x00\x00a')
+
+
def setUpModule():
try:
import signal
@@ -176,14 +251,18 @@ def setUpModule():
unlink(TESTFN)
-class CLargeFileTest(LargeFileTest, unittest.TestCase):
+class CLargeFileTest(TestFileMethods, unittest.TestCase):
open = staticmethod(io.open)
-class PyLargeFileTest(LargeFileTest, unittest.TestCase):
+
+class PyLargeFileTest(TestFileMethods, unittest.TestCase):
open = staticmethod(pyio.open)
+
def tearDownModule():
unlink(TESTFN)
+ unlink(TESTFN2)
+
if __name__ == '__main__':
unittest.main()