From f7049b5fb680c774e4950d10be62859a749f4e79 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Tue, 15 Dec 2020 16:08:16 +0100 Subject: bpo-42639: Add script_helper.run_test_script() (GH-23777) * Add run_test_script() function to test.support.script_helper. * Rename Lib/test/eintrdata/eintr_tester.py to Lib/test/_test_eintr.py. * test_eintr.py uses run_test_script(). --- Lib/test/_test_eintr.py | 530 +++++++++++++++++++++++++++++++++++++ Lib/test/eintrdata/eintr_tester.py | 530 ------------------------------------- Lib/test/support/script_helper.py | 32 +++ Lib/test/test_eintr.py | 18 +- 4 files changed, 564 insertions(+), 546 deletions(-) create mode 100644 Lib/test/_test_eintr.py delete mode 100644 Lib/test/eintrdata/eintr_tester.py diff --git a/Lib/test/_test_eintr.py b/Lib/test/_test_eintr.py new file mode 100644 index 0000000..e43b59d --- /dev/null +++ b/Lib/test/_test_eintr.py @@ -0,0 +1,530 @@ +""" +This test suite exercises some system calls subject to interruption with EINTR, +to check that it is actually handled transparently. +It is intended to be run by the main test suite within a child process, to +ensure there is no background thread running (so that signals are delivered to +the correct thread). +Signals are generated in-process using setitimer(ITIMER_REAL), which allows +sub-second periodicity (contrarily to signal()). +""" + +import contextlib +import faulthandler +import fcntl +import os +import platform +import select +import signal +import socket +import subprocess +import sys +import time +import unittest + +from test import support +from test.support import os_helper +from test.support import socket_helper + +@contextlib.contextmanager +def kill_on_error(proc): + """Context manager killing the subprocess if a Python exception is raised.""" + with proc: + try: + yield proc + except: + proc.kill() + raise + + +@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()") +class EINTRBaseTest(unittest.TestCase): + """ Base class for EINTR tests. """ + + # delay for initial signal delivery + signal_delay = 0.1 + # signal delivery periodicity + signal_period = 0.1 + # default sleep time for tests - should obviously have: + # sleep_time > signal_period + sleep_time = 0.2 + + def sighandler(self, signum, frame): + self.signals += 1 + + def setUp(self): + self.signals = 0 + self.orig_handler = signal.signal(signal.SIGALRM, self.sighandler) + signal.setitimer(signal.ITIMER_REAL, self.signal_delay, + self.signal_period) + + # Use faulthandler as watchdog to debug when a test hangs + # (timeout of 10 minutes) + faulthandler.dump_traceback_later(10 * 60, exit=True, + file=sys.__stderr__) + + @staticmethod + def stop_alarm(): + signal.setitimer(signal.ITIMER_REAL, 0, 0) + + def tearDown(self): + self.stop_alarm() + signal.signal(signal.SIGALRM, self.orig_handler) + faulthandler.cancel_dump_traceback_later() + + def subprocess(self, *args, **kw): + cmd_args = (sys.executable, '-c') + args + return subprocess.Popen(cmd_args, **kw) + + +@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()") +class OSEINTRTest(EINTRBaseTest): + """ EINTR tests for the os module. """ + + def new_sleep_process(self): + code = 'import time; time.sleep(%r)' % self.sleep_time + return self.subprocess(code) + + def _test_wait_multiple(self, wait_func): + num = 3 + processes = [self.new_sleep_process() for _ in range(num)] + for _ in range(num): + wait_func() + # Call the Popen method to avoid a ResourceWarning + for proc in processes: + proc.wait() + + def test_wait(self): + self._test_wait_multiple(os.wait) + + @unittest.skipUnless(hasattr(os, 'wait3'), 'requires wait3()') + def test_wait3(self): + self._test_wait_multiple(lambda: os.wait3(0)) + + def _test_wait_single(self, wait_func): + proc = self.new_sleep_process() + wait_func(proc.pid) + # Call the Popen method to avoid a ResourceWarning + proc.wait() + + def test_waitpid(self): + self._test_wait_single(lambda pid: os.waitpid(pid, 0)) + + @unittest.skipUnless(hasattr(os, 'wait4'), 'requires wait4()') + def test_wait4(self): + self._test_wait_single(lambda pid: os.wait4(pid, 0)) + + def test_read(self): + rd, wr = os.pipe() + self.addCleanup(os.close, rd) + # wr closed explicitly by parent + + # the payload below are smaller than PIPE_BUF, hence the writes will be + # atomic + datas = [b"hello", b"world", b"spam"] + + code = '\n'.join(( + 'import os, sys, time', + '', + 'wr = int(sys.argv[1])', + 'datas = %r' % datas, + 'sleep_time = %r' % self.sleep_time, + '', + 'for data in datas:', + ' # let the parent block on read()', + ' time.sleep(sleep_time)', + ' os.write(wr, data)', + )) + + proc = self.subprocess(code, str(wr), pass_fds=[wr]) + with kill_on_error(proc): + os.close(wr) + for data in datas: + self.assertEqual(data, os.read(rd, len(data))) + self.assertEqual(proc.wait(), 0) + + def test_write(self): + rd, wr = os.pipe() + self.addCleanup(os.close, wr) + # rd closed explicitly by parent + + # we must write enough data for the write() to block + data = b"x" * support.PIPE_MAX_SIZE + + code = '\n'.join(( + 'import io, os, sys, time', + '', + 'rd = int(sys.argv[1])', + 'sleep_time = %r' % self.sleep_time, + 'data = b"x" * %s' % support.PIPE_MAX_SIZE, + 'data_len = len(data)', + '', + '# let the parent block on write()', + 'time.sleep(sleep_time)', + '', + 'read_data = io.BytesIO()', + 'while len(read_data.getvalue()) < data_len:', + ' chunk = os.read(rd, 2 * data_len)', + ' read_data.write(chunk)', + '', + 'value = read_data.getvalue()', + 'if value != data:', + ' raise Exception("read error: %s vs %s bytes"', + ' % (len(value), data_len))', + )) + + proc = self.subprocess(code, str(rd), pass_fds=[rd]) + with kill_on_error(proc): + os.close(rd) + written = 0 + while written < len(data): + written += os.write(wr, memoryview(data)[written:]) + self.assertEqual(proc.wait(), 0) + + +@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()") +class SocketEINTRTest(EINTRBaseTest): + """ EINTR tests for the socket module. """ + + @unittest.skipUnless(hasattr(socket, 'socketpair'), 'needs socketpair()') + def _test_recv(self, recv_func): + rd, wr = socket.socketpair() + self.addCleanup(rd.close) + # wr closed explicitly by parent + + # single-byte payload guard us against partial recv + datas = [b"x", b"y", b"z"] + + code = '\n'.join(( + 'import os, socket, sys, time', + '', + 'fd = int(sys.argv[1])', + 'family = %s' % int(wr.family), + 'sock_type = %s' % int(wr.type), + 'datas = %r' % datas, + 'sleep_time = %r' % self.sleep_time, + '', + 'wr = socket.fromfd(fd, family, sock_type)', + 'os.close(fd)', + '', + 'with wr:', + ' for data in datas:', + ' # let the parent block on recv()', + ' time.sleep(sleep_time)', + ' wr.sendall(data)', + )) + + fd = wr.fileno() + proc = self.subprocess(code, str(fd), pass_fds=[fd]) + with kill_on_error(proc): + wr.close() + for data in datas: + self.assertEqual(data, recv_func(rd, len(data))) + self.assertEqual(proc.wait(), 0) + + def test_recv(self): + self._test_recv(socket.socket.recv) + + @unittest.skipUnless(hasattr(socket.socket, 'recvmsg'), 'needs recvmsg()') + def test_recvmsg(self): + self._test_recv(lambda sock, data: sock.recvmsg(data)[0]) + + def _test_send(self, send_func): + rd, wr = socket.socketpair() + self.addCleanup(wr.close) + # rd closed explicitly by parent + + # we must send enough data for the send() to block + data = b"xyz" * (support.SOCK_MAX_SIZE // 3) + + code = '\n'.join(( + 'import os, socket, sys, time', + '', + 'fd = int(sys.argv[1])', + 'family = %s' % int(rd.family), + 'sock_type = %s' % int(rd.type), + 'sleep_time = %r' % self.sleep_time, + 'data = b"xyz" * %s' % (support.SOCK_MAX_SIZE // 3), + 'data_len = len(data)', + '', + 'rd = socket.fromfd(fd, family, sock_type)', + 'os.close(fd)', + '', + 'with rd:', + ' # let the parent block on send()', + ' time.sleep(sleep_time)', + '', + ' received_data = bytearray(data_len)', + ' n = 0', + ' while n < data_len:', + ' n += rd.recv_into(memoryview(received_data)[n:])', + '', + 'if received_data != data:', + ' raise Exception("recv error: %s vs %s bytes"', + ' % (len(received_data), data_len))', + )) + + fd = rd.fileno() + proc = self.subprocess(code, str(fd), pass_fds=[fd]) + with kill_on_error(proc): + rd.close() + written = 0 + while written < len(data): + sent = send_func(wr, memoryview(data)[written:]) + # sendall() returns None + written += len(data) if sent is None else sent + self.assertEqual(proc.wait(), 0) + + def test_send(self): + self._test_send(socket.socket.send) + + def test_sendall(self): + self._test_send(socket.socket.sendall) + + @unittest.skipUnless(hasattr(socket.socket, 'sendmsg'), 'needs sendmsg()') + def test_sendmsg(self): + self._test_send(lambda sock, data: sock.sendmsg([data])) + + def test_accept(self): + sock = socket.create_server((socket_helper.HOST, 0)) + self.addCleanup(sock.close) + port = sock.getsockname()[1] + + code = '\n'.join(( + 'import socket, time', + '', + 'host = %r' % socket_helper.HOST, + 'port = %s' % port, + 'sleep_time = %r' % self.sleep_time, + '', + '# let parent block on accept()', + 'time.sleep(sleep_time)', + 'with socket.create_connection((host, port)):', + ' time.sleep(sleep_time)', + )) + + proc = self.subprocess(code) + with kill_on_error(proc): + client_sock, _ = sock.accept() + client_sock.close() + self.assertEqual(proc.wait(), 0) + + # Issue #25122: There is a race condition in the FreeBSD kernel on + # handling signals in the FIFO device. Skip the test until the bug is + # fixed in the kernel. + # https://bugs.freebsd.org/bugzilla/show_bug.cgi?id=203162 + @support.requires_freebsd_version(10, 3) + @unittest.skipUnless(hasattr(os, 'mkfifo'), 'needs mkfifo()') + def _test_open(self, do_open_close_reader, do_open_close_writer): + filename = os_helper.TESTFN + + # Use a fifo: until the child opens it for reading, the parent will + # block when trying to open it for writing. + os_helper.unlink(filename) + try: + os.mkfifo(filename) + except PermissionError as e: + self.skipTest('os.mkfifo(): %s' % e) + self.addCleanup(os_helper.unlink, filename) + + code = '\n'.join(( + 'import os, time', + '', + 'path = %a' % filename, + 'sleep_time = %r' % self.sleep_time, + '', + '# let the parent block', + 'time.sleep(sleep_time)', + '', + do_open_close_reader, + )) + + proc = self.subprocess(code) + with kill_on_error(proc): + do_open_close_writer(filename) + self.assertEqual(proc.wait(), 0) + + def python_open(self, path): + fp = open(path, 'w') + fp.close() + + @unittest.skipIf(sys.platform == "darwin", + "hangs under macOS; see bpo-25234, bpo-35363") + def test_open(self): + self._test_open("fp = open(path, 'r')\nfp.close()", + self.python_open) + + def os_open(self, path): + fd = os.open(path, os.O_WRONLY) + os.close(fd) + + @unittest.skipIf(sys.platform == "darwin", + "hangs under macOS; see bpo-25234, bpo-35363") + def test_os_open(self): + self._test_open("fd = os.open(path, os.O_RDONLY)\nos.close(fd)", + self.os_open) + + +@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()") +class TimeEINTRTest(EINTRBaseTest): + """ EINTR tests for the time module. """ + + def test_sleep(self): + t0 = time.monotonic() + time.sleep(self.sleep_time) + self.stop_alarm() + dt = time.monotonic() - t0 + self.assertGreaterEqual(dt, self.sleep_time) + + +@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()") +# bpo-30320: Need pthread_sigmask() to block the signal, otherwise the test +# is vulnerable to a race condition between the child and the parent processes. +@unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), + 'need signal.pthread_sigmask()') +class SignalEINTRTest(EINTRBaseTest): + """ EINTR tests for the signal module. """ + + def check_sigwait(self, wait_func): + signum = signal.SIGUSR1 + pid = os.getpid() + + old_handler = signal.signal(signum, lambda *args: None) + self.addCleanup(signal.signal, signum, old_handler) + + code = '\n'.join(( + 'import os, time', + 'pid = %s' % os.getpid(), + 'signum = %s' % int(signum), + 'sleep_time = %r' % self.sleep_time, + 'time.sleep(sleep_time)', + 'os.kill(pid, signum)', + )) + + old_mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) + self.addCleanup(signal.pthread_sigmask, signal.SIG_UNBLOCK, [signum]) + + t0 = time.monotonic() + proc = self.subprocess(code) + with kill_on_error(proc): + wait_func(signum) + dt = time.monotonic() - t0 + + self.assertEqual(proc.wait(), 0) + + @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'), + 'need signal.sigwaitinfo()') + def test_sigwaitinfo(self): + def wait_func(signum): + signal.sigwaitinfo([signum]) + + self.check_sigwait(wait_func) + + @unittest.skipUnless(hasattr(signal, 'sigtimedwait'), + 'need signal.sigwaitinfo()') + def test_sigtimedwait(self): + def wait_func(signum): + signal.sigtimedwait([signum], 120.0) + + self.check_sigwait(wait_func) + + +@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()") +class SelectEINTRTest(EINTRBaseTest): + """ EINTR tests for the select module. """ + + def test_select(self): + t0 = time.monotonic() + select.select([], [], [], self.sleep_time) + dt = time.monotonic() - t0 + self.stop_alarm() + self.assertGreaterEqual(dt, self.sleep_time) + + @unittest.skipIf(sys.platform == "darwin", + "poll may fail on macOS; see issue #28087") + @unittest.skipUnless(hasattr(select, 'poll'), 'need select.poll') + def test_poll(self): + poller = select.poll() + + t0 = time.monotonic() + poller.poll(self.sleep_time * 1e3) + dt = time.monotonic() - t0 + self.stop_alarm() + self.assertGreaterEqual(dt, self.sleep_time) + + @unittest.skipUnless(hasattr(select, 'epoll'), 'need select.epoll') + def test_epoll(self): + poller = select.epoll() + self.addCleanup(poller.close) + + t0 = time.monotonic() + poller.poll(self.sleep_time) + dt = time.monotonic() - t0 + self.stop_alarm() + self.assertGreaterEqual(dt, self.sleep_time) + + @unittest.skipUnless(hasattr(select, 'kqueue'), 'need select.kqueue') + def test_kqueue(self): + kqueue = select.kqueue() + self.addCleanup(kqueue.close) + + t0 = time.monotonic() + kqueue.control(None, 1, self.sleep_time) + dt = time.monotonic() - t0 + self.stop_alarm() + self.assertGreaterEqual(dt, self.sleep_time) + + @unittest.skipUnless(hasattr(select, 'devpoll'), 'need select.devpoll') + def test_devpoll(self): + poller = select.devpoll() + self.addCleanup(poller.close) + + t0 = time.monotonic() + poller.poll(self.sleep_time * 1e3) + dt = time.monotonic() - t0 + self.stop_alarm() + self.assertGreaterEqual(dt, self.sleep_time) + + +class FNTLEINTRTest(EINTRBaseTest): + def _lock(self, lock_func, lock_name): + self.addCleanup(os_helper.unlink, os_helper.TESTFN) + code = '\n'.join(( + "import fcntl, time", + "with open('%s', 'wb') as f:" % os_helper.TESTFN, + " fcntl.%s(f, fcntl.LOCK_EX)" % lock_name, + " time.sleep(%s)" % self.sleep_time)) + start_time = time.monotonic() + proc = self.subprocess(code) + with kill_on_error(proc): + with open(os_helper.TESTFN, 'wb') as f: + while True: # synchronize the subprocess + dt = time.monotonic() - start_time + if dt > 60.0: + raise Exception("failed to sync child in %.1f sec" % dt) + try: + lock_func(f, fcntl.LOCK_EX | fcntl.LOCK_NB) + lock_func(f, fcntl.LOCK_UN) + time.sleep(0.01) + except BlockingIOError: + break + # the child locked the file just a moment ago for 'sleep_time' seconds + # that means that the lock below will block for 'sleep_time' minus some + # potential context switch delay + lock_func(f, fcntl.LOCK_EX) + dt = time.monotonic() - start_time + self.assertGreaterEqual(dt, self.sleep_time) + self.stop_alarm() + proc.wait() + + # Issue 35633: See https://bugs.python.org/issue35633#msg333662 + # skip test rather than accept PermissionError from all platforms + @unittest.skipIf(platform.system() == "AIX", "AIX returns PermissionError") + def test_lockf(self): + self._lock(fcntl.lockf, "lockf") + + def test_flock(self): + self._lock(fcntl.flock, "flock") + + +if __name__ == "__main__": + unittest.main() diff --git a/Lib/test/eintrdata/eintr_tester.py b/Lib/test/eintrdata/eintr_tester.py deleted file mode 100644 index e43b59d..0000000 --- a/Lib/test/eintrdata/eintr_tester.py +++ /dev/null @@ -1,530 +0,0 @@ -""" -This test suite exercises some system calls subject to interruption with EINTR, -to check that it is actually handled transparently. -It is intended to be run by the main test suite within a child process, to -ensure there is no background thread running (so that signals are delivered to -the correct thread). -Signals are generated in-process using setitimer(ITIMER_REAL), which allows -sub-second periodicity (contrarily to signal()). -""" - -import contextlib -import faulthandler -import fcntl -import os -import platform -import select -import signal -import socket -import subprocess -import sys -import time -import unittest - -from test import support -from test.support import os_helper -from test.support import socket_helper - -@contextlib.contextmanager -def kill_on_error(proc): - """Context manager killing the subprocess if a Python exception is raised.""" - with proc: - try: - yield proc - except: - proc.kill() - raise - - -@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()") -class EINTRBaseTest(unittest.TestCase): - """ Base class for EINTR tests. """ - - # delay for initial signal delivery - signal_delay = 0.1 - # signal delivery periodicity - signal_period = 0.1 - # default sleep time for tests - should obviously have: - # sleep_time > signal_period - sleep_time = 0.2 - - def sighandler(self, signum, frame): - self.signals += 1 - - def setUp(self): - self.signals = 0 - self.orig_handler = signal.signal(signal.SIGALRM, self.sighandler) - signal.setitimer(signal.ITIMER_REAL, self.signal_delay, - self.signal_period) - - # Use faulthandler as watchdog to debug when a test hangs - # (timeout of 10 minutes) - faulthandler.dump_traceback_later(10 * 60, exit=True, - file=sys.__stderr__) - - @staticmethod - def stop_alarm(): - signal.setitimer(signal.ITIMER_REAL, 0, 0) - - def tearDown(self): - self.stop_alarm() - signal.signal(signal.SIGALRM, self.orig_handler) - faulthandler.cancel_dump_traceback_later() - - def subprocess(self, *args, **kw): - cmd_args = (sys.executable, '-c') + args - return subprocess.Popen(cmd_args, **kw) - - -@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()") -class OSEINTRTest(EINTRBaseTest): - """ EINTR tests for the os module. """ - - def new_sleep_process(self): - code = 'import time; time.sleep(%r)' % self.sleep_time - return self.subprocess(code) - - def _test_wait_multiple(self, wait_func): - num = 3 - processes = [self.new_sleep_process() for _ in range(num)] - for _ in range(num): - wait_func() - # Call the Popen method to avoid a ResourceWarning - for proc in processes: - proc.wait() - - def test_wait(self): - self._test_wait_multiple(os.wait) - - @unittest.skipUnless(hasattr(os, 'wait3'), 'requires wait3()') - def test_wait3(self): - self._test_wait_multiple(lambda: os.wait3(0)) - - def _test_wait_single(self, wait_func): - proc = self.new_sleep_process() - wait_func(proc.pid) - # Call the Popen method to avoid a ResourceWarning - proc.wait() - - def test_waitpid(self): - self._test_wait_single(lambda pid: os.waitpid(pid, 0)) - - @unittest.skipUnless(hasattr(os, 'wait4'), 'requires wait4()') - def test_wait4(self): - self._test_wait_single(lambda pid: os.wait4(pid, 0)) - - def test_read(self): - rd, wr = os.pipe() - self.addCleanup(os.close, rd) - # wr closed explicitly by parent - - # the payload below are smaller than PIPE_BUF, hence the writes will be - # atomic - datas = [b"hello", b"world", b"spam"] - - code = '\n'.join(( - 'import os, sys, time', - '', - 'wr = int(sys.argv[1])', - 'datas = %r' % datas, - 'sleep_time = %r' % self.sleep_time, - '', - 'for data in datas:', - ' # let the parent block on read()', - ' time.sleep(sleep_time)', - ' os.write(wr, data)', - )) - - proc = self.subprocess(code, str(wr), pass_fds=[wr]) - with kill_on_error(proc): - os.close(wr) - for data in datas: - self.assertEqual(data, os.read(rd, len(data))) - self.assertEqual(proc.wait(), 0) - - def test_write(self): - rd, wr = os.pipe() - self.addCleanup(os.close, wr) - # rd closed explicitly by parent - - # we must write enough data for the write() to block - data = b"x" * support.PIPE_MAX_SIZE - - code = '\n'.join(( - 'import io, os, sys, time', - '', - 'rd = int(sys.argv[1])', - 'sleep_time = %r' % self.sleep_time, - 'data = b"x" * %s' % support.PIPE_MAX_SIZE, - 'data_len = len(data)', - '', - '# let the parent block on write()', - 'time.sleep(sleep_time)', - '', - 'read_data = io.BytesIO()', - 'while len(read_data.getvalue()) < data_len:', - ' chunk = os.read(rd, 2 * data_len)', - ' read_data.write(chunk)', - '', - 'value = read_data.getvalue()', - 'if value != data:', - ' raise Exception("read error: %s vs %s bytes"', - ' % (len(value), data_len))', - )) - - proc = self.subprocess(code, str(rd), pass_fds=[rd]) - with kill_on_error(proc): - os.close(rd) - written = 0 - while written < len(data): - written += os.write(wr, memoryview(data)[written:]) - self.assertEqual(proc.wait(), 0) - - -@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()") -class SocketEINTRTest(EINTRBaseTest): - """ EINTR tests for the socket module. """ - - @unittest.skipUnless(hasattr(socket, 'socketpair'), 'needs socketpair()') - def _test_recv(self, recv_func): - rd, wr = socket.socketpair() - self.addCleanup(rd.close) - # wr closed explicitly by parent - - # single-byte payload guard us against partial recv - datas = [b"x", b"y", b"z"] - - code = '\n'.join(( - 'import os, socket, sys, time', - '', - 'fd = int(sys.argv[1])', - 'family = %s' % int(wr.family), - 'sock_type = %s' % int(wr.type), - 'datas = %r' % datas, - 'sleep_time = %r' % self.sleep_time, - '', - 'wr = socket.fromfd(fd, family, sock_type)', - 'os.close(fd)', - '', - 'with wr:', - ' for data in datas:', - ' # let the parent block on recv()', - ' time.sleep(sleep_time)', - ' wr.sendall(data)', - )) - - fd = wr.fileno() - proc = self.subprocess(code, str(fd), pass_fds=[fd]) - with kill_on_error(proc): - wr.close() - for data in datas: - self.assertEqual(data, recv_func(rd, len(data))) - self.assertEqual(proc.wait(), 0) - - def test_recv(self): - self._test_recv(socket.socket.recv) - - @unittest.skipUnless(hasattr(socket.socket, 'recvmsg'), 'needs recvmsg()') - def test_recvmsg(self): - self._test_recv(lambda sock, data: sock.recvmsg(data)[0]) - - def _test_send(self, send_func): - rd, wr = socket.socketpair() - self.addCleanup(wr.close) - # rd closed explicitly by parent - - # we must send enough data for the send() to block - data = b"xyz" * (support.SOCK_MAX_SIZE // 3) - - code = '\n'.join(( - 'import os, socket, sys, time', - '', - 'fd = int(sys.argv[1])', - 'family = %s' % int(rd.family), - 'sock_type = %s' % int(rd.type), - 'sleep_time = %r' % self.sleep_time, - 'data = b"xyz" * %s' % (support.SOCK_MAX_SIZE // 3), - 'data_len = len(data)', - '', - 'rd = socket.fromfd(fd, family, sock_type)', - 'os.close(fd)', - '', - 'with rd:', - ' # let the parent block on send()', - ' time.sleep(sleep_time)', - '', - ' received_data = bytearray(data_len)', - ' n = 0', - ' while n < data_len:', - ' n += rd.recv_into(memoryview(received_data)[n:])', - '', - 'if received_data != data:', - ' raise Exception("recv error: %s vs %s bytes"', - ' % (len(received_data), data_len))', - )) - - fd = rd.fileno() - proc = self.subprocess(code, str(fd), pass_fds=[fd]) - with kill_on_error(proc): - rd.close() - written = 0 - while written < len(data): - sent = send_func(wr, memoryview(data)[written:]) - # sendall() returns None - written += len(data) if sent is None else sent - self.assertEqual(proc.wait(), 0) - - def test_send(self): - self._test_send(socket.socket.send) - - def test_sendall(self): - self._test_send(socket.socket.sendall) - - @unittest.skipUnless(hasattr(socket.socket, 'sendmsg'), 'needs sendmsg()') - def test_sendmsg(self): - self._test_send(lambda sock, data: sock.sendmsg([data])) - - def test_accept(self): - sock = socket.create_server((socket_helper.HOST, 0)) - self.addCleanup(sock.close) - port = sock.getsockname()[1] - - code = '\n'.join(( - 'import socket, time', - '', - 'host = %r' % socket_helper.HOST, - 'port = %s' % port, - 'sleep_time = %r' % self.sleep_time, - '', - '# let parent block on accept()', - 'time.sleep(sleep_time)', - 'with socket.create_connection((host, port)):', - ' time.sleep(sleep_time)', - )) - - proc = self.subprocess(code) - with kill_on_error(proc): - client_sock, _ = sock.accept() - client_sock.close() - self.assertEqual(proc.wait(), 0) - - # Issue #25122: There is a race condition in the FreeBSD kernel on - # handling signals in the FIFO device. Skip the test until the bug is - # fixed in the kernel. - # https://bugs.freebsd.org/bugzilla/show_bug.cgi?id=203162 - @support.requires_freebsd_version(10, 3) - @unittest.skipUnless(hasattr(os, 'mkfifo'), 'needs mkfifo()') - def _test_open(self, do_open_close_reader, do_open_close_writer): - filename = os_helper.TESTFN - - # Use a fifo: until the child opens it for reading, the parent will - # block when trying to open it for writing. - os_helper.unlink(filename) - try: - os.mkfifo(filename) - except PermissionError as e: - self.skipTest('os.mkfifo(): %s' % e) - self.addCleanup(os_helper.unlink, filename) - - code = '\n'.join(( - 'import os, time', - '', - 'path = %a' % filename, - 'sleep_time = %r' % self.sleep_time, - '', - '# let the parent block', - 'time.sleep(sleep_time)', - '', - do_open_close_reader, - )) - - proc = self.subprocess(code) - with kill_on_error(proc): - do_open_close_writer(filename) - self.assertEqual(proc.wait(), 0) - - def python_open(self, path): - fp = open(path, 'w') - fp.close() - - @unittest.skipIf(sys.platform == "darwin", - "hangs under macOS; see bpo-25234, bpo-35363") - def test_open(self): - self._test_open("fp = open(path, 'r')\nfp.close()", - self.python_open) - - def os_open(self, path): - fd = os.open(path, os.O_WRONLY) - os.close(fd) - - @unittest.skipIf(sys.platform == "darwin", - "hangs under macOS; see bpo-25234, bpo-35363") - def test_os_open(self): - self._test_open("fd = os.open(path, os.O_RDONLY)\nos.close(fd)", - self.os_open) - - -@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()") -class TimeEINTRTest(EINTRBaseTest): - """ EINTR tests for the time module. """ - - def test_sleep(self): - t0 = time.monotonic() - time.sleep(self.sleep_time) - self.stop_alarm() - dt = time.monotonic() - t0 - self.assertGreaterEqual(dt, self.sleep_time) - - -@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()") -# bpo-30320: Need pthread_sigmask() to block the signal, otherwise the test -# is vulnerable to a race condition between the child and the parent processes. -@unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), - 'need signal.pthread_sigmask()') -class SignalEINTRTest(EINTRBaseTest): - """ EINTR tests for the signal module. """ - - def check_sigwait(self, wait_func): - signum = signal.SIGUSR1 - pid = os.getpid() - - old_handler = signal.signal(signum, lambda *args: None) - self.addCleanup(signal.signal, signum, old_handler) - - code = '\n'.join(( - 'import os, time', - 'pid = %s' % os.getpid(), - 'signum = %s' % int(signum), - 'sleep_time = %r' % self.sleep_time, - 'time.sleep(sleep_time)', - 'os.kill(pid, signum)', - )) - - old_mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) - self.addCleanup(signal.pthread_sigmask, signal.SIG_UNBLOCK, [signum]) - - t0 = time.monotonic() - proc = self.subprocess(code) - with kill_on_error(proc): - wait_func(signum) - dt = time.monotonic() - t0 - - self.assertEqual(proc.wait(), 0) - - @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'), - 'need signal.sigwaitinfo()') - def test_sigwaitinfo(self): - def wait_func(signum): - signal.sigwaitinfo([signum]) - - self.check_sigwait(wait_func) - - @unittest.skipUnless(hasattr(signal, 'sigtimedwait'), - 'need signal.sigwaitinfo()') - def test_sigtimedwait(self): - def wait_func(signum): - signal.sigtimedwait([signum], 120.0) - - self.check_sigwait(wait_func) - - -@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()") -class SelectEINTRTest(EINTRBaseTest): - """ EINTR tests for the select module. """ - - def test_select(self): - t0 = time.monotonic() - select.select([], [], [], self.sleep_time) - dt = time.monotonic() - t0 - self.stop_alarm() - self.assertGreaterEqual(dt, self.sleep_time) - - @unittest.skipIf(sys.platform == "darwin", - "poll may fail on macOS; see issue #28087") - @unittest.skipUnless(hasattr(select, 'poll'), 'need select.poll') - def test_poll(self): - poller = select.poll() - - t0 = time.monotonic() - poller.poll(self.sleep_time * 1e3) - dt = time.monotonic() - t0 - self.stop_alarm() - self.assertGreaterEqual(dt, self.sleep_time) - - @unittest.skipUnless(hasattr(select, 'epoll'), 'need select.epoll') - def test_epoll(self): - poller = select.epoll() - self.addCleanup(poller.close) - - t0 = time.monotonic() - poller.poll(self.sleep_time) - dt = time.monotonic() - t0 - self.stop_alarm() - self.assertGreaterEqual(dt, self.sleep_time) - - @unittest.skipUnless(hasattr(select, 'kqueue'), 'need select.kqueue') - def test_kqueue(self): - kqueue = select.kqueue() - self.addCleanup(kqueue.close) - - t0 = time.monotonic() - kqueue.control(None, 1, self.sleep_time) - dt = time.monotonic() - t0 - self.stop_alarm() - self.assertGreaterEqual(dt, self.sleep_time) - - @unittest.skipUnless(hasattr(select, 'devpoll'), 'need select.devpoll') - def test_devpoll(self): - poller = select.devpoll() - self.addCleanup(poller.close) - - t0 = time.monotonic() - poller.poll(self.sleep_time * 1e3) - dt = time.monotonic() - t0 - self.stop_alarm() - self.assertGreaterEqual(dt, self.sleep_time) - - -class FNTLEINTRTest(EINTRBaseTest): - def _lock(self, lock_func, lock_name): - self.addCleanup(os_helper.unlink, os_helper.TESTFN) - code = '\n'.join(( - "import fcntl, time", - "with open('%s', 'wb') as f:" % os_helper.TESTFN, - " fcntl.%s(f, fcntl.LOCK_EX)" % lock_name, - " time.sleep(%s)" % self.sleep_time)) - start_time = time.monotonic() - proc = self.subprocess(code) - with kill_on_error(proc): - with open(os_helper.TESTFN, 'wb') as f: - while True: # synchronize the subprocess - dt = time.monotonic() - start_time - if dt > 60.0: - raise Exception("failed to sync child in %.1f sec" % dt) - try: - lock_func(f, fcntl.LOCK_EX | fcntl.LOCK_NB) - lock_func(f, fcntl.LOCK_UN) - time.sleep(0.01) - except BlockingIOError: - break - # the child locked the file just a moment ago for 'sleep_time' seconds - # that means that the lock below will block for 'sleep_time' minus some - # potential context switch delay - lock_func(f, fcntl.LOCK_EX) - dt = time.monotonic() - start_time - self.assertGreaterEqual(dt, self.sleep_time) - self.stop_alarm() - proc.wait() - - # Issue 35633: See https://bugs.python.org/issue35633#msg333662 - # skip test rather than accept PermissionError from all platforms - @unittest.skipIf(platform.system() == "AIX", "AIX returns PermissionError") - def test_lockf(self): - self._lock(fcntl.lockf, "lockf") - - def test_flock(self): - self._lock(fcntl.flock, "flock") - - -if __name__ == "__main__": - unittest.main() diff --git a/Lib/test/support/script_helper.py b/Lib/test/support/script_helper.py index 09bb586..6d699c8 100644 --- a/Lib/test/support/script_helper.py +++ b/Lib/test/support/script_helper.py @@ -11,12 +11,14 @@ import py_compile import zipfile from importlib.util import source_from_cache +from test import support from test.support.import_helper import make_legacy_pyc # Cached result of the expensive test performed in the function below. __cached_interp_requires_environment = None + def interpreter_requires_environment(): """ Returns True if our sys.executable interpreter requires environment @@ -136,12 +138,14 @@ def run_python_until_end(*args, **env_vars): rc = proc.returncode return _PythonRunResult(rc, out, err), cmd_line + def _assert_python(expected_success, /, *args, **env_vars): res, cmd_line = run_python_until_end(*args, **env_vars) if (res.rc and expected_success) or (not res.rc and not expected_success): res.fail(cmd_line) return res + def assert_python_ok(*args, **env_vars): """ Assert that running the interpreter with `args` and optional environment @@ -155,6 +159,7 @@ def assert_python_ok(*args, **env_vars): """ return _assert_python(True, *args, **env_vars) + def assert_python_failure(*args, **env_vars): """ Assert that running the interpreter with `args` and optional environment @@ -165,6 +170,7 @@ def assert_python_failure(*args, **env_vars): """ return _assert_python(False, *args, **env_vars) + def spawn_python(*args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, **kw): """Run a Python subprocess with the given arguments. @@ -187,6 +193,7 @@ def spawn_python(*args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, **kw): stdout=stdout, stderr=stderr, **kw) + def kill_python(p): """Run the given Popen process until completion and return stdout.""" p.stdin.close() @@ -198,6 +205,7 @@ def kill_python(p): subprocess._cleanup() return data + def make_script(script_dir, script_basename, source, omit_suffix=False): script_filename = script_basename if not omit_suffix: @@ -209,6 +217,7 @@ def make_script(script_dir, script_basename, source, omit_suffix=False): importlib.invalidate_caches() return script_name + def make_zip_script(zip_dir, zip_basename, script_name, name_in_zip=None): zip_filename = zip_basename+os.extsep+'zip' zip_name = os.path.join(zip_dir, zip_filename) @@ -228,10 +237,12 @@ def make_zip_script(zip_dir, zip_basename, script_name, name_in_zip=None): # zip_file.printdir() return zip_name, os.path.join(zip_name, name_in_zip) + def make_pkg(pkg_dir, init_source=''): os.mkdir(pkg_dir) make_script(pkg_dir, '__init__', init_source) + def make_zip_pkg(zip_dir, zip_basename, pkg_name, script_basename, source, depth=1, compiled=False): unlink = [] @@ -260,3 +271,24 @@ def make_zip_pkg(zip_dir, zip_basename, pkg_name, script_basename, # print 'Contents of %r:' % zip_name # zip_file.printdir() return zip_name, os.path.join(zip_name, script_name_in_zip) + + +def run_test_script(script): + # use -u to try to get the full output if the test hangs or crash + if support.verbose: + def title(text): + return f"===== {text} ======" + + name = f"script {os.path.basename(script)}" + print() + print(title(name), flush=True) + # In verbose mode, the child process inherit stdout and stdout, + # to see output in realtime and reduce the risk of losing output. + args = [sys.executable, "-E", "-X", "faulthandler", "-u", script, "-v"] + proc = subprocess.run(args) + print(title(f"{name} completed: exit code {proc.returncode}"), + flush=True) + if proc.returncode: + raise AssertionError(f"{name} failed") + else: + assert_python_ok("-u", script, "-v") diff --git a/Lib/test/test_eintr.py b/Lib/test/test_eintr.py index a5f8f64..b61cdfa 100644 --- a/Lib/test/test_eintr.py +++ b/Lib/test/test_eintr.py @@ -15,22 +15,8 @@ class EINTRTests(unittest.TestCase): def test_all(self): # Run the tester in a sub-process, to make sure there is only one # thread (for reliable signal delivery). - tester = support.findfile("eintr_tester.py", subdir="eintrdata") - # use -u to try to get the full output if the test hangs or crash - args = ["-u", tester, "-v"] - if support.verbose: - print() - print("--- run eintr_tester.py ---", flush=True) - # In verbose mode, the child process inherit stdout and stdout, - # to see output in realtime and reduce the risk of losing output. - args = [sys.executable, "-E", "-X", "faulthandler", *args] - proc = subprocess.run(args) - print(f"--- eintr_tester.py completed: " - f"exit code {proc.returncode} ---", flush=True) - if proc.returncode: - self.fail("eintr_tester.py failed") - else: - script_helper.assert_python_ok("-u", tester, "-v") + script = support.findfile("_test_eintr.py") + script_helper.run_test_script(script) if __name__ == "__main__": -- cgit v0.12