diff options
Diffstat (limited to 'Lib/test')
-rw-r--r-- | Lib/test/_test_multiprocessing.py (renamed from Lib/test/test_multiprocessing.py) | 481 | ||||
-rw-r--r-- | Lib/test/mp_fork_bomb.py | 5 | ||||
-rwxr-xr-x | Lib/test/regrtest.py | 2 | ||||
-rw-r--r-- | Lib/test/test_multiprocessing_fork.py | 7 | ||||
-rw-r--r-- | Lib/test/test_multiprocessing_forkserver.py | 7 | ||||
-rw-r--r-- | Lib/test/test_multiprocessing_spawn.py | 7 |
6 files changed, 342 insertions, 167 deletions
diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 2a6381d..f777edc 100644 --- a/Lib/test/test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -43,7 +43,7 @@ from multiprocessing import util try: from multiprocessing import reduction - HAS_REDUCTION = True + HAS_REDUCTION = reduction.HAVE_SEND_HANDLE except ImportError: HAS_REDUCTION = False @@ -99,6 +99,9 @@ try: except: MAXFD = 256 +# To speed up tests when using the forkserver, we can preload these: +PRELOAD = ['__main__', 'test.test_multiprocessing_forkserver'] + # # Some tests require ctypes # @@ -330,7 +333,6 @@ class _TestProcess(BaseTestCase): @classmethod def _test_recursion(cls, wconn, id): - from multiprocessing import forking wconn.send(id) if len(id) < 2: for i in range(2): @@ -378,7 +380,7 @@ class _TestProcess(BaseTestCase): self.assertFalse(wait_for_handle(sentinel, timeout=0.0)) event.set() p.join() - self.assertTrue(wait_for_handle(sentinel, timeout=DELTA)) + self.assertTrue(wait_for_handle(sentinel, timeout=1)) # # @@ -2493,7 +2495,7 @@ class _TestPicklingConnections(BaseTestCase): @classmethod def tearDownClass(cls): - from multiprocessing.reduction import resource_sharer + from multiprocessing import resource_sharer resource_sharer.stop(timeout=5) @classmethod @@ -2807,30 +2809,40 @@ class _TestFinalize(BaseTestCase): # Test that from ... import * works for each module # -class _TestImportStar(BaseTestCase): +class _TestImportStar(unittest.TestCase): - ALLOWED_TYPES = ('processes',) + def get_module_names(self): + import glob + folder = os.path.dirname(multiprocessing.__file__) + pattern = os.path.join(folder, '*.py') + files = glob.glob(pattern) + modules = [os.path.splitext(os.path.split(f)[1])[0] for f in files] + modules = ['multiprocessing.' + m for m in modules] + modules.remove('multiprocessing.__init__') + modules.append('multiprocessing') + return modules def test_import(self): - modules = [ - 'multiprocessing', 'multiprocessing.connection', - 'multiprocessing.heap', 'multiprocessing.managers', - 'multiprocessing.pool', 'multiprocessing.process', - 'multiprocessing.synchronize', 'multiprocessing.util' - ] - - if HAS_REDUCTION: - modules.append('multiprocessing.reduction') + modules = self.get_module_names() + if sys.platform == 'win32': + modules.remove('multiprocessing.popen_fork') + modules.remove('multiprocessing.popen_forkserver') + modules.remove('multiprocessing.popen_spawn_posix') + else: + modules.remove('multiprocessing.popen_spawn_win32') + if not HAS_REDUCTION: + modules.remove('multiprocessing.popen_forkserver') - if c_int is not None: + if c_int is None: # This module requires _ctypes - modules.append('multiprocessing.sharedctypes') + modules.remove('multiprocessing.sharedctypes') for name in modules: __import__(name) mod = sys.modules[name] + self.assertTrue(hasattr(mod, '__all__'), name) - for attr in getattr(mod, '__all__', ()): + for attr in mod.__all__: self.assertTrue( hasattr(mod, attr), '%r does not have attribute %r' % (mod, attr) @@ -2953,131 +2965,6 @@ class TestInvalidHandle(unittest.TestCase): self.assertRaises((ValueError, OSError), multiprocessing.connection.Connection, -1) -# -# Functions used to create test cases from the base ones in this module -# - -def create_test_cases(Mixin, type): - result = {} - glob = globals() - Type = type.capitalize() - ALL_TYPES = {'processes', 'threads', 'manager'} - - for name in list(glob.keys()): - if name.startswith('_Test'): - base = glob[name] - assert set(base.ALLOWED_TYPES) <= ALL_TYPES, set(base.ALLOWED_TYPES) - if type in base.ALLOWED_TYPES: - newname = 'With' + Type + name[1:] - class Temp(base, Mixin, unittest.TestCase): - pass - result[newname] = Temp - Temp.__name__ = Temp.__qualname__ = newname - Temp.__module__ = Mixin.__module__ - return result - -# -# Create test cases -# - -class ProcessesMixin(object): - TYPE = 'processes' - Process = multiprocessing.Process - connection = multiprocessing.connection - current_process = staticmethod(multiprocessing.current_process) - active_children = staticmethod(multiprocessing.active_children) - Pool = staticmethod(multiprocessing.Pool) - Pipe = staticmethod(multiprocessing.Pipe) - Queue = staticmethod(multiprocessing.Queue) - JoinableQueue = staticmethod(multiprocessing.JoinableQueue) - Lock = staticmethod(multiprocessing.Lock) - RLock = staticmethod(multiprocessing.RLock) - Semaphore = staticmethod(multiprocessing.Semaphore) - BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore) - Condition = staticmethod(multiprocessing.Condition) - Event = staticmethod(multiprocessing.Event) - Barrier = staticmethod(multiprocessing.Barrier) - Value = staticmethod(multiprocessing.Value) - Array = staticmethod(multiprocessing.Array) - RawValue = staticmethod(multiprocessing.RawValue) - RawArray = staticmethod(multiprocessing.RawArray) - -testcases_processes = create_test_cases(ProcessesMixin, type='processes') -globals().update(testcases_processes) - - -class ManagerMixin(object): - TYPE = 'manager' - Process = multiprocessing.Process - Queue = property(operator.attrgetter('manager.Queue')) - JoinableQueue = property(operator.attrgetter('manager.JoinableQueue')) - Lock = property(operator.attrgetter('manager.Lock')) - RLock = property(operator.attrgetter('manager.RLock')) - Semaphore = property(operator.attrgetter('manager.Semaphore')) - BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore')) - Condition = property(operator.attrgetter('manager.Condition')) - Event = property(operator.attrgetter('manager.Event')) - Barrier = property(operator.attrgetter('manager.Barrier')) - Value = property(operator.attrgetter('manager.Value')) - Array = property(operator.attrgetter('manager.Array')) - list = property(operator.attrgetter('manager.list')) - dict = property(operator.attrgetter('manager.dict')) - Namespace = property(operator.attrgetter('manager.Namespace')) - - @classmethod - def Pool(cls, *args, **kwds): - return cls.manager.Pool(*args, **kwds) - - @classmethod - def setUpClass(cls): - cls.manager = multiprocessing.Manager() - - @classmethod - def tearDownClass(cls): - # only the manager process should be returned by active_children() - # but this can take a bit on slow machines, so wait a few seconds - # if there are other children too (see #17395) - t = 0.01 - while len(multiprocessing.active_children()) > 1 and t < 5: - time.sleep(t) - t *= 2 - gc.collect() # do garbage collection - if cls.manager._number_of_objects() != 0: - # This is not really an error since some tests do not - # ensure that all processes which hold a reference to a - # managed object have been joined. - print('Shared objects which still exist at manager shutdown:') - print(cls.manager._debug_info()) - cls.manager.shutdown() - cls.manager.join() - cls.manager = None - -testcases_manager = create_test_cases(ManagerMixin, type='manager') -globals().update(testcases_manager) - - -class ThreadsMixin(object): - TYPE = 'threads' - Process = multiprocessing.dummy.Process - connection = multiprocessing.dummy.connection - current_process = staticmethod(multiprocessing.dummy.current_process) - active_children = staticmethod(multiprocessing.dummy.active_children) - Pool = staticmethod(multiprocessing.Pool) - Pipe = staticmethod(multiprocessing.dummy.Pipe) - Queue = staticmethod(multiprocessing.dummy.Queue) - JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue) - Lock = staticmethod(multiprocessing.dummy.Lock) - RLock = staticmethod(multiprocessing.dummy.RLock) - Semaphore = staticmethod(multiprocessing.dummy.Semaphore) - BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore) - Condition = staticmethod(multiprocessing.dummy.Condition) - Event = staticmethod(multiprocessing.dummy.Event) - Barrier = staticmethod(multiprocessing.dummy.Barrier) - Value = staticmethod(multiprocessing.dummy.Value) - Array = staticmethod(multiprocessing.dummy.Array) - -testcases_threads = create_test_cases(ThreadsMixin, type='threads') -globals().update(testcases_threads) class OtherTest(unittest.TestCase): @@ -3427,7 +3314,7 @@ class TestFlags(unittest.TestCase): def test_flags(self): import json, subprocess # start child process using unusual flags - prog = ('from test.test_multiprocessing import TestFlags; ' + + prog = ('from test._test_multiprocessing import TestFlags; ' + 'TestFlags.run_in_child()') data = subprocess.check_output( [sys.executable, '-E', '-S', '-O', '-c', prog]) @@ -3474,13 +3361,14 @@ class TestTimeouts(unittest.TestCase): class TestNoForkBomb(unittest.TestCase): def test_noforkbomb(self): + sm = multiprocessing.get_start_method() name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py') - if WIN32: - rc, out, err = test.script_helper.assert_python_failure(name) + if sm != 'fork': + rc, out, err = test.script_helper.assert_python_failure(name, sm) self.assertEqual('', out.decode('ascii')) self.assertIn('RuntimeError', err.decode('ascii')) else: - rc, out, err = test.script_helper.assert_python_ok(name) + rc, out, err = test.script_helper.assert_python_ok(name, sm) self.assertEqual('123', out.decode('ascii').rstrip()) self.assertEqual('', err.decode('ascii')) @@ -3514,6 +3402,72 @@ class TestForkAwareThreadLock(unittest.TestCase): self.assertLessEqual(new_size, old_size) # +# Check that non-forked child processes do not inherit unneeded fds/handles +# + +class TestCloseFds(unittest.TestCase): + + def get_high_socket_fd(self): + if WIN32: + # The child process will not have any socket handles, so + # calling socket.fromfd() should produce WSAENOTSOCK even + # if there is a handle of the same number. + return socket.socket().detach() + else: + # We want to produce a socket with an fd high enough that a + # freshly created child process will not have any fds as high. + fd = socket.socket().detach() + to_close = [] + while fd < 50: + to_close.append(fd) + fd = os.dup(fd) + for x in to_close: + os.close(x) + return fd + + def close(self, fd): + if WIN32: + socket.socket(fileno=fd).close() + else: + os.close(fd) + + @classmethod + def _test_closefds(cls, conn, fd): + try: + s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM) + except Exception as e: + conn.send(e) + else: + s.close() + conn.send(None) + + def test_closefd(self): + if not HAS_REDUCTION: + raise unittest.SkipTest('requires fd pickling') + + reader, writer = multiprocessing.Pipe() + fd = self.get_high_socket_fd() + try: + p = multiprocessing.Process(target=self._test_closefds, + args=(writer, fd)) + p.start() + writer.close() + e = reader.recv() + p.join(timeout=5) + finally: + self.close(fd) + writer.close() + reader.close() + + if multiprocessing.get_start_method() == 'fork': + self.assertIs(e, None) + else: + WSAENOTSOCK = 10038 + self.assertIsInstance(e, OSError) + self.assertTrue(e.errno == errno.EBADF or + e.winerror == WSAENOTSOCK, e) + +# # Issue #17097: EINTR should be ignored by recv(), send(), accept() etc # @@ -3557,10 +3511,10 @@ class TestIgnoreEINTR(unittest.TestCase): def handler(signum, frame): pass signal.signal(signal.SIGUSR1, handler) - l = multiprocessing.connection.Listener() - conn.send(l.address) - a = l.accept() - a.send('welcome') + with multiprocessing.connection.Listener() as l: + conn.send(l.address) + a = l.accept() + a.send('welcome') @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1') def test_ignore_listener(self): @@ -3581,26 +3535,221 @@ class TestIgnoreEINTR(unittest.TestCase): finally: conn.close() +class TestStartMethod(unittest.TestCase): + def test_set_get(self): + multiprocessing.set_forkserver_preload(PRELOAD) + count = 0 + old_method = multiprocessing.get_start_method() + try: + for method in ('fork', 'spawn', 'forkserver'): + try: + multiprocessing.set_start_method(method) + except ValueError: + continue + self.assertEqual(multiprocessing.get_start_method(), method) + count += 1 + finally: + multiprocessing.set_start_method(old_method) + self.assertGreaterEqual(count, 1) + + def test_get_all(self): + methods = multiprocessing.get_all_start_methods() + if sys.platform == 'win32': + self.assertEqual(methods, ['spawn']) + else: + self.assertTrue(methods == ['fork', 'spawn'] or + methods == ['fork', 'spawn', 'forkserver']) + +# +# Check that killing process does not leak named semaphores +# + +@unittest.skipIf(sys.platform == "win32", + "test semantics don't make sense on Windows") +class TestSemaphoreTracker(unittest.TestCase): + def test_semaphore_tracker(self): + import subprocess + cmd = '''if 1: + import multiprocessing as mp, time, os + mp.set_start_method("spawn") + lock1 = mp.Lock() + lock2 = mp.Lock() + os.write(%d, lock1._semlock.name.encode("ascii") + b"\\n") + os.write(%d, lock2._semlock.name.encode("ascii") + b"\\n") + time.sleep(10) + ''' + print("\nTestSemaphoreTracker will output warnings a bit like:\n" + " ... There appear to be 2 leaked semaphores" + " to clean up at shutdown\n" + " ... '/mp-03jgqz': [Errno 2] No such file or directory", + file=sys.stderr) + r, w = os.pipe() + p = subprocess.Popen([sys.executable, + #'-W', 'ignore:semaphore_tracker', + '-c', cmd % (w, w)], + pass_fds=[w]) + os.close(w) + with open(r, 'rb', closefd=True) as f: + name1 = f.readline().rstrip().decode('ascii') + name2 = f.readline().rstrip().decode('ascii') + _multiprocessing.sem_unlink(name1) + p.terminate() + p.wait() + time.sleep(1.0) + with self.assertRaises(OSError) as ctx: + _multiprocessing.sem_unlink(name2) + # docs say it should be ENOENT, but OSX seems to give EINVAL + self.assertIn(ctx.exception.errno, (errno.ENOENT, errno.EINVAL)) + # +# Mixins # + +class ProcessesMixin(object): + TYPE = 'processes' + Process = multiprocessing.Process + connection = multiprocessing.connection + current_process = staticmethod(multiprocessing.current_process) + active_children = staticmethod(multiprocessing.active_children) + Pool = staticmethod(multiprocessing.Pool) + Pipe = staticmethod(multiprocessing.Pipe) + Queue = staticmethod(multiprocessing.Queue) + JoinableQueue = staticmethod(multiprocessing.JoinableQueue) + Lock = staticmethod(multiprocessing.Lock) + RLock = staticmethod(multiprocessing.RLock) + Semaphore = staticmethod(multiprocessing.Semaphore) + BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore) + Condition = staticmethod(multiprocessing.Condition) + Event = staticmethod(multiprocessing.Event) + Barrier = staticmethod(multiprocessing.Barrier) + Value = staticmethod(multiprocessing.Value) + Array = staticmethod(multiprocessing.Array) + RawValue = staticmethod(multiprocessing.RawValue) + RawArray = staticmethod(multiprocessing.RawArray) + + +class ManagerMixin(object): + TYPE = 'manager' + Process = multiprocessing.Process + Queue = property(operator.attrgetter('manager.Queue')) + JoinableQueue = property(operator.attrgetter('manager.JoinableQueue')) + Lock = property(operator.attrgetter('manager.Lock')) + RLock = property(operator.attrgetter('manager.RLock')) + Semaphore = property(operator.attrgetter('manager.Semaphore')) + BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore')) + Condition = property(operator.attrgetter('manager.Condition')) + Event = property(operator.attrgetter('manager.Event')) + Barrier = property(operator.attrgetter('manager.Barrier')) + Value = property(operator.attrgetter('manager.Value')) + Array = property(operator.attrgetter('manager.Array')) + list = property(operator.attrgetter('manager.list')) + dict = property(operator.attrgetter('manager.dict')) + Namespace = property(operator.attrgetter('manager.Namespace')) + + @classmethod + def Pool(cls, *args, **kwds): + return cls.manager.Pool(*args, **kwds) + + @classmethod + def setUpClass(cls): + cls.manager = multiprocessing.Manager() + + @classmethod + def tearDownClass(cls): + # only the manager process should be returned by active_children() + # but this can take a bit on slow machines, so wait a few seconds + # if there are other children too (see #17395) + t = 0.01 + while len(multiprocessing.active_children()) > 1 and t < 5: + time.sleep(t) + t *= 2 + gc.collect() # do garbage collection + if cls.manager._number_of_objects() != 0: + # This is not really an error since some tests do not + # ensure that all processes which hold a reference to a + # managed object have been joined. + print('Shared objects which still exist at manager shutdown:') + print(cls.manager._debug_info()) + cls.manager.shutdown() + cls.manager.join() + cls.manager = None + + +class ThreadsMixin(object): + TYPE = 'threads' + Process = multiprocessing.dummy.Process + connection = multiprocessing.dummy.connection + current_process = staticmethod(multiprocessing.dummy.current_process) + active_children = staticmethod(multiprocessing.dummy.active_children) + Pool = staticmethod(multiprocessing.Pool) + Pipe = staticmethod(multiprocessing.dummy.Pipe) + Queue = staticmethod(multiprocessing.dummy.Queue) + JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue) + Lock = staticmethod(multiprocessing.dummy.Lock) + RLock = staticmethod(multiprocessing.dummy.RLock) + Semaphore = staticmethod(multiprocessing.dummy.Semaphore) + BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore) + Condition = staticmethod(multiprocessing.dummy.Condition) + Event = staticmethod(multiprocessing.dummy.Event) + Barrier = staticmethod(multiprocessing.dummy.Barrier) + Value = staticmethod(multiprocessing.dummy.Value) + Array = staticmethod(multiprocessing.dummy.Array) + +# +# Functions used to create test cases from the base ones in this module # -def setUpModule(): - if sys.platform.startswith("linux"): - try: - lock = multiprocessing.RLock() - except OSError: - raise unittest.SkipTest("OSError raises on RLock creation, " - "see issue 3111!") - check_enough_semaphores() - util.get_temp_dir() # creates temp directory for use by all processes - multiprocessing.get_logger().setLevel(LOG_LEVEL) +def install_tests_in_module_dict(remote_globs, start_method): + __module__ = remote_globs['__name__'] + local_globs = globals() + ALL_TYPES = {'processes', 'threads', 'manager'} + for name, base in local_globs.items(): + if not isinstance(base, type): + continue + if issubclass(base, BaseTestCase): + if base is BaseTestCase: + continue + assert set(base.ALLOWED_TYPES) <= ALL_TYPES, base.ALLOWED_TYPES + for type_ in base.ALLOWED_TYPES: + newname = 'With' + type_.capitalize() + name[1:] + Mixin = local_globs[type_.capitalize() + 'Mixin'] + class Temp(base, Mixin, unittest.TestCase): + pass + Temp.__name__ = Temp.__qualname__ = newname + Temp.__module__ = __module__ + remote_globs[newname] = Temp + elif issubclass(base, unittest.TestCase): + class Temp(base, object): + pass + Temp.__name__ = Temp.__qualname__ = name + Temp.__module__ = __module__ + remote_globs[name] = Temp -def tearDownModule(): - # pause a bit so we don't get warning about dangling threads/processes - time.sleep(0.5) + def setUpModule(): + multiprocessing.set_forkserver_preload(PRELOAD) + remote_globs['old_start_method'] = multiprocessing.get_start_method() + try: + multiprocessing.set_start_method(start_method) + except ValueError: + raise unittest.SkipTest(start_method + + ' start method not supported') + print('Using start method %r' % multiprocessing.get_start_method()) + if sys.platform.startswith("linux"): + try: + lock = multiprocessing.RLock() + except OSError: + raise unittest.SkipTest("OSError raises on RLock creation, " + "see issue 3111!") + check_enough_semaphores() + util.get_temp_dir() # creates temp directory + multiprocessing.get_logger().setLevel(LOG_LEVEL) + + def tearDownModule(): + multiprocessing.set_start_method(remote_globs['old_start_method']) + # pause a bit so we don't get warning about dangling threads/processes + time.sleep(0.5) -if __name__ == '__main__': - unittest.main() + remote_globs['setUpModule'] = setUpModule + remote_globs['tearDownModule'] = tearDownModule diff --git a/Lib/test/mp_fork_bomb.py b/Lib/test/mp_fork_bomb.py index 908afe3..017e010 100644 --- a/Lib/test/mp_fork_bomb.py +++ b/Lib/test/mp_fork_bomb.py @@ -7,6 +7,11 @@ def foo(): # correctly on Windows. However, we should get a RuntimeError rather # than the Windows equivalent of a fork bomb. +if len(sys.argv) > 1: + multiprocessing.set_start_method(sys.argv[1]) +else: + multiprocessing.set_start_method('spawn') + p = multiprocessing.Process(target=foo) p.start() p.join() diff --git a/Lib/test/regrtest.py b/Lib/test/regrtest.py index c8bbcb2..b9945d7 100755 --- a/Lib/test/regrtest.py +++ b/Lib/test/regrtest.py @@ -149,7 +149,7 @@ try: except ImportError: threading = None try: - import multiprocessing.process + import _multiprocessing, multiprocessing.process except ImportError: multiprocessing = None diff --git a/Lib/test/test_multiprocessing_fork.py b/Lib/test/test_multiprocessing_fork.py new file mode 100644 index 0000000..2bf4e75 --- /dev/null +++ b/Lib/test/test_multiprocessing_fork.py @@ -0,0 +1,7 @@ +import unittest +import test._test_multiprocessing + +test._test_multiprocessing.install_tests_in_module_dict(globals(), 'fork') + +if __name__ == '__main__': + unittest.main() diff --git a/Lib/test/test_multiprocessing_forkserver.py b/Lib/test/test_multiprocessing_forkserver.py new file mode 100644 index 0000000..193a04a --- /dev/null +++ b/Lib/test/test_multiprocessing_forkserver.py @@ -0,0 +1,7 @@ +import unittest +import test._test_multiprocessing + +test._test_multiprocessing.install_tests_in_module_dict(globals(), 'forkserver') + +if __name__ == '__main__': + unittest.main() diff --git a/Lib/test/test_multiprocessing_spawn.py b/Lib/test/test_multiprocessing_spawn.py new file mode 100644 index 0000000..334ae9e --- /dev/null +++ b/Lib/test/test_multiprocessing_spawn.py @@ -0,0 +1,7 @@ +import unittest +import test._test_multiprocessing + +test._test_multiprocessing.install_tests_in_module_dict(globals(), 'spawn') + +if __name__ == '__main__': + unittest.main() |