diff options
Diffstat (limited to 'Lib')
-rw-r--r-- | Lib/test/test_largefile.py | 91 |
1 files changed, 85 insertions, 6 deletions
diff --git a/Lib/test/test_largefile.py b/Lib/test/test_largefile.py index 8870c72..6c8813e 100644 --- a/Lib/test/test_largefile.py +++ b/Lib/test/test_largefile.py @@ -5,17 +5,19 @@ import os import stat import sys import unittest -from test.support import TESTFN, requires, unlink, bigmemtest +import socket +import shutil +import threading +from test.support import TESTFN, requires, unlink, bigmemtest, find_unused_port import io # C implementation of io import _pyio as pyio # Python implementation of io # size of file to create (>2 GiB; 2 GiB == 2,147,483,648 bytes) size = 2_500_000_000 +TESTFN2 = TESTFN + '2' + class LargeFileTest: - """Test that each file function works as expected for large - (i.e. > 2 GiB) files. - """ def setUp(self): if os.path.exists(TESTFN): @@ -44,6 +46,13 @@ class LargeFileTest: if not os.stat(TESTFN)[stat.ST_SIZE] == 0: raise cls.failureException('File was not truncated by opening ' 'with mode "wb"') + unlink(TESTFN2) + + +class TestFileMethods(LargeFileTest): + """Test that each file function works as expected for large + (i.e. > 2 GiB) files. + """ # _pyio.FileIO.readall() uses a temporary bytearray then casted to bytes, # so memuse=2 is needed @@ -140,6 +149,72 @@ class LargeFileTest: f.seek(pos) self.assertTrue(f.seekable()) + +class TestCopyfile(LargeFileTest, unittest.TestCase): + open = staticmethod(io.open) + + def test_it(self): + # Internally shutil.copyfile() can use "fast copy" methods like + # os.sendfile(). + size = os.path.getsize(TESTFN) + shutil.copyfile(TESTFN, TESTFN2) + self.assertEqual(os.path.getsize(TESTFN2), size) + with open(TESTFN2, 'rb') as f: + self.assertEqual(f.read(5), b'z\x00\x00\x00\x00') + f.seek(size - 5) + self.assertEqual(f.read(), b'\x00\x00\x00\x00a') + + +@unittest.skipIf(not hasattr(os, 'sendfile'), 'sendfile not supported') +class TestSocketSendfile(LargeFileTest, unittest.TestCase): + open = staticmethod(io.open) + timeout = 3 + + def setUp(self): + super().setUp() + self.thread = None + + def tearDown(self): + super().tearDown() + if self.thread is not None: + self.thread.join(self.timeout) + self.thread = None + + def tcp_server(self, sock): + def run(sock): + with sock: + conn, _ = sock.accept() + with conn, open(TESTFN2, 'wb') as f: + event.wait(self.timeout) + while True: + chunk = conn.recv(65536) + if not chunk: + return + f.write(chunk) + + event = threading.Event() + sock.settimeout(self.timeout) + self.thread = threading.Thread(target=run, args=(sock, )) + self.thread.start() + event.set() + + def test_it(self): + port = find_unused_port() + with socket.create_server(("", port)) as sock: + self.tcp_server(sock) + with socket.create_connection(("127.0.0.1", port)) as client: + with open(TESTFN, 'rb') as f: + client.sendfile(f) + self.tearDown() + + size = os.path.getsize(TESTFN) + self.assertEqual(os.path.getsize(TESTFN2), size) + with open(TESTFN2, 'rb') as f: + self.assertEqual(f.read(5), b'z\x00\x00\x00\x00') + f.seek(size - 5) + self.assertEqual(f.read(), b'\x00\x00\x00\x00a') + + def setUpModule(): try: import signal @@ -176,14 +251,18 @@ def setUpModule(): unlink(TESTFN) -class CLargeFileTest(LargeFileTest, unittest.TestCase): +class CLargeFileTest(TestFileMethods, unittest.TestCase): open = staticmethod(io.open) -class PyLargeFileTest(LargeFileTest, unittest.TestCase): + +class PyLargeFileTest(TestFileMethods, unittest.TestCase): open = staticmethod(pyio.open) + def tearDownModule(): unlink(TESTFN) + unlink(TESTFN2) + if __name__ == '__main__': unittest.main() |