diff options
Diffstat (limited to 'Lib/test/test_multiprocessing.py')
| -rw-r--r-- | Lib/test/test_multiprocessing.py | 337 |
1 files changed, 318 insertions, 19 deletions
diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py index e9d0329..8edb420 100644 --- a/Lib/test/test_multiprocessing.py +++ b/Lib/test/test_multiprocessing.py @@ -1,11 +1,10 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # # Unit tests for the multiprocessing package # import unittest -import threading import queue as pyqueue import time import io @@ -25,6 +24,10 @@ import test.support _multiprocessing = test.support.import_module('_multiprocessing') # Skip tests if sem_open implementation is broken. test.support.import_module('multiprocessing.synchronize') +# import threading after _multiprocessing to raise a more revelant error +# message: "No module named _multiprocessing". _multiprocessing is not compiled +# without thread support. +import threading import multiprocessing.dummy import multiprocessing.connection @@ -35,11 +38,22 @@ import multiprocessing.pool from multiprocessing import util try: + from multiprocessing import reduction + HAS_REDUCTION = True +except ImportError: + HAS_REDUCTION = False + +try: from multiprocessing.sharedctypes import Value, copy HAS_SHAREDCTYPES = True except ImportError: HAS_SHAREDCTYPES = False +try: + import msvcrt +except ImportError: + msvcrt = None + # # # @@ -52,7 +66,7 @@ def latin(s): # LOG_LEVEL = util.SUBWARNING -#LOG_LEVEL = logging.WARNING +#LOG_LEVEL = logging.DEBUG DELTA = 0.1 CHECK_TIMINGS = False # making true makes tests take a lot longer @@ -69,6 +83,11 @@ HAVE_GETVALUE = not getattr(_multiprocessing, WIN32 = (sys.platform == "win32") +try: + MAXFD = os.sysconf("SC_OPEN_MAX") +except: + MAXFD = 256 + # # Some tests require ctypes # @@ -79,6 +98,22 @@ except ImportError: Structure = object c_int = c_double = None + +def check_enough_semaphores(): + """Check that the system supports enough semaphores to run the test.""" + # minimum number of semaphores available according to POSIX + nsems_min = 256 + try: + nsems = os.sysconf("SC_SEM_NSEMS_MAX") + except (AttributeError, ValueError): + # sysconf not available or setting not available + return + if nsems == -1 or nsems >= nsems_min: + return + raise unittest.SkipTest("The OS doesn't support enough semaphores " + "to run the test (required: %d)." % nsems_min) + + # # Creates a wrapper for a function which records the time it takes to finish # @@ -156,7 +191,7 @@ class _TestProcess(BaseTestCase): self.assertTrue(current.is_alive()) self.assertTrue(not current.daemon) - self.assertTrue(isinstance(authkey, bytes)) + self.assertIsInstance(authkey, bytes) self.assertTrue(len(authkey) > 0) self.assertEqual(current.ident, os.getpid()) self.assertEqual(current.exitcode, None) @@ -187,7 +222,7 @@ class _TestProcess(BaseTestCase): self.assertEqual(p.authkey, current.authkey) self.assertEqual(p.is_alive(), False) self.assertEqual(p.daemon, True) - self.assertTrue(p not in self.active_children()) + self.assertNotIn(p, self.active_children()) self.assertTrue(type(self.active_children()) is list) self.assertEqual(p.exitcode, None) @@ -195,7 +230,7 @@ class _TestProcess(BaseTestCase): self.assertEqual(p.exitcode, None) self.assertEqual(p.is_alive(), True) - self.assertTrue(p in self.active_children()) + self.assertIn(p, self.active_children()) self.assertEqual(q.get(), args[1:]) self.assertEqual(q.get(), kwargs) @@ -208,7 +243,7 @@ class _TestProcess(BaseTestCase): self.assertEqual(p.exitcode, 0) self.assertEqual(p.is_alive(), False) - self.assertTrue(p not in self.active_children()) + self.assertNotIn(p, self.active_children()) @classmethod def _test_terminate(cls): @@ -223,7 +258,7 @@ class _TestProcess(BaseTestCase): p.start() self.assertEqual(p.is_alive(), True) - self.assertTrue(p in self.active_children()) + self.assertIn(p, self.active_children()) self.assertEqual(p.exitcode, None) p.terminate() @@ -233,7 +268,7 @@ class _TestProcess(BaseTestCase): self.assertTimingAlmostEqual(join.elapsed, 0.0) self.assertEqual(p.is_alive(), False) - self.assertTrue(p not in self.active_children()) + self.assertNotIn(p, self.active_children()) p.join() @@ -252,13 +287,14 @@ class _TestProcess(BaseTestCase): self.assertEqual(type(self.active_children()), list) p = self.Process(target=time.sleep, args=(DELTA,)) - self.assertTrue(p not in self.active_children()) + self.assertNotIn(p, self.active_children()) + p.daemon = True p.start() - self.assertTrue(p in self.active_children()) + self.assertIn(p, self.active_children()) p.join() - self.assertTrue(p not in self.active_children()) + self.assertNotIn(p, self.active_children()) @classmethod def _test_recursion(cls, wconn, id): @@ -324,12 +360,36 @@ class _TestSubclassingProcess(BaseTestCase): def test_subclassing(self): uppercaser = _UpperCaser() + uppercaser.daemon = True uppercaser.start() self.assertEqual(uppercaser.submit('hello'), 'HELLO') self.assertEqual(uppercaser.submit('world'), 'WORLD') uppercaser.stop() uppercaser.join() + def test_stderr_flush(self): + # sys.stderr is flushed at process shutdown (issue #13812) + if self.TYPE == "threads": + return + + testfn = test.support.TESTFN + self.addCleanup(test.support.unlink, testfn) + proc = self.Process(target=self._test_stderr_flush, args=(testfn,)) + proc.start() + proc.join() + with open(testfn, 'r') as f: + err = f.read() + # The whole traceback was printed + self.assertIn("ZeroDivisionError", err) + self.assertIn("test_multiprocessing.py", err) + self.assertIn("1/0 # MARKER", err) + + @classmethod + def _test_stderr_flush(cls, testfn): + sys.stderr = open(testfn, 'w') + 1/0 # MARKER + + # # # @@ -502,6 +562,7 @@ class _TestQueue(BaseTestCase): # fork process p = self.Process(target=self._test_fork, args=(queue,)) + p.daemon = True p.start() # check that all expected items are in the queue @@ -542,6 +603,7 @@ class _TestQueue(BaseTestCase): for i in range(4)] for p in workers: + p.daemon = True p.start() for i in range(10): @@ -754,7 +816,13 @@ class _TestCondition(BaseTestCase): cond.release() # check they have all woken - time.sleep(DELTA) + for i in range(10): + try: + if get_value(woken) == 6: + break + except NotImplementedError: + break + time.sleep(DELTA) self.assertReturnsIfImplemented(6, get_value, woken) # check state is not mucked up @@ -766,7 +834,7 @@ class _TestCondition(BaseTestCase): cond.acquire() res = wait(TIMEOUT1) cond.release() - self.assertEqual(res, None) + self.assertEqual(res, False) self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1) @@ -806,7 +874,9 @@ class _TestEvent(BaseTestCase): #self.assertEqual(event.is_set(), False) - self.Process(target=self._test_event, args=(event,)).start() + p = self.Process(target=self._test_event, args=(event,)) + p.daemon = True + p.start() self.assertEqual(wait(), True) # @@ -846,6 +916,7 @@ class _TestValue(BaseTestCase): self.assertEqual(sv.value, cv[1]) proc = self.Process(target=self._test, args=(values,)) + proc.daemon = True proc.start() proc.join() @@ -909,6 +980,7 @@ class _TestArray(BaseTestCase): self.f(seq) p = self.Process(target=self.f, args=(arr,)) + p.daemon = True p.start() p.join() @@ -1026,6 +1098,7 @@ class _TestContainers(BaseTestCase): def sqr(x, wait=0.0): time.sleep(wait) return x*x + class _TestPool(BaseTestCase): def test_apply(self): @@ -1079,6 +1152,9 @@ class _TestPool(BaseTestCase): self.assertEqual(sorted(it), list(map(sqr, list(range(1000))))) def test_make_pool(self): + self.assertRaises(ValueError, multiprocessing.Pool, -1) + self.assertRaises(ValueError, multiprocessing.Pool, 0) + p = multiprocessing.Pool(3) self.assertEqual(3, len(p._pool)) p.close() @@ -1100,7 +1176,100 @@ class _TestPool(BaseTestCase): self.pool.terminate() join = TimingWrapper(self.pool.join) join() - self.assertLess(join.elapsed, 0.2) + self.assertLess(join.elapsed, 0.5) + +def raising(): + raise KeyError("key") + +def unpickleable_result(): + return lambda: 42 + +class _TestPoolWorkerErrors(BaseTestCase): + ALLOWED_TYPES = ('processes', ) + + def test_async_error_callback(self): + p = multiprocessing.Pool(2) + + scratchpad = [None] + def errback(exc): + scratchpad[0] = exc + + res = p.apply_async(raising, error_callback=errback) + self.assertRaises(KeyError, res.get) + self.assertTrue(scratchpad[0]) + self.assertIsInstance(scratchpad[0], KeyError) + + p.close() + p.join() + + def test_unpickleable_result(self): + from multiprocessing.pool import MaybeEncodingError + p = multiprocessing.Pool(2) + + # Make sure we don't lose pool processes because of encoding errors. + for iteration in range(20): + + scratchpad = [None] + def errback(exc): + scratchpad[0] = exc + + res = p.apply_async(unpickleable_result, error_callback=errback) + self.assertRaises(MaybeEncodingError, res.get) + wrapped = scratchpad[0] + self.assertTrue(wrapped) + self.assertIsInstance(scratchpad[0], MaybeEncodingError) + self.assertIsNotNone(wrapped.exc) + self.assertIsNotNone(wrapped.value) + + p.close() + p.join() + +class _TestPoolWorkerLifetime(BaseTestCase): + ALLOWED_TYPES = ('processes', ) + + def test_pool_worker_lifetime(self): + p = multiprocessing.Pool(3, maxtasksperchild=10) + self.assertEqual(3, len(p._pool)) + origworkerpids = [w.pid for w in p._pool] + # Run many tasks so each worker gets replaced (hopefully) + results = [] + for i in range(100): + results.append(p.apply_async(sqr, (i, ))) + # Fetch the results and verify we got the right answers, + # also ensuring all the tasks have completed. + for (j, res) in enumerate(results): + self.assertEqual(res.get(), sqr(j)) + # Refill the pool + p._repopulate_pool() + # Wait until all workers are alive + # (countdown * DELTA = 5 seconds max startup process time) + countdown = 50 + while countdown and not all(w.is_alive() for w in p._pool): + countdown -= 1 + time.sleep(DELTA) + finalworkerpids = [w.pid for w in p._pool] + # All pids should be assigned. See issue #7805. + self.assertNotIn(None, origworkerpids) + self.assertNotIn(None, finalworkerpids) + # Finally, check that the worker pids have changed + self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids)) + p.close() + p.join() + + def test_pool_worker_lifetime_early_close(self): + # Issue #10332: closing a pool whose workers have limited lifetimes + # before all the tasks completed would make join() hang. + p = multiprocessing.Pool(3, maxtasksperchild=1) + results = [] + for i in range(6): + results.append(p.apply_async(sqr, (i, 0.3))) + p.close() + p.join() + # check the results + for (j, res) in enumerate(results): + self.assertEqual(res.get(), sqr(j)) + + # # Test that manager has expected number of shared objects left # @@ -1230,6 +1399,7 @@ class _TestRemoteManager(BaseTestCase): manager.start() p = self.Process(target=self._putter, args=(manager.address, authkey)) + p.daemon = True p.start() manager2 = QueueManager2( @@ -1271,6 +1441,7 @@ class _TestManagerRestart(BaseTestCase): manager.start() p = self.Process(target=self._putter, args=(manager.address, authkey)) + p.daemon = True p.start() queue = manager.get_queue() self.assertEqual(queue.get(), 'hello world') @@ -1403,6 +1574,7 @@ class _TestConnection(BaseTestCase): conn, child_conn = self.Pipe() p = self.Process(target=self._echo, args=(child_conn,)) + p.daemon = True p.start() child_conn.close() # this might complete before child initializes @@ -1446,6 +1618,100 @@ class _TestConnection(BaseTestCase): self.assertRaises(ValueError, a.send_bytes, msg, 4, -1) + @classmethod + def _is_fd_assigned(cls, fd): + try: + os.fstat(fd) + except OSError as e: + if e.errno == errno.EBADF: + return False + raise + else: + return True + + @classmethod + def _writefd(cls, conn, data, create_dummy_fds=False): + if create_dummy_fds: + for i in range(0, 256): + if not cls._is_fd_assigned(i): + os.dup2(conn.fileno(), i) + fd = reduction.recv_handle(conn) + if msvcrt: + fd = msvcrt.open_osfhandle(fd, os.O_WRONLY) + os.write(fd, data) + os.close(fd) + + @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") + def test_fd_transfer(self): + if self.TYPE != 'processes': + self.skipTest("only makes sense with processes") + conn, child_conn = self.Pipe(duplex=True) + + p = self.Process(target=self._writefd, args=(child_conn, b"foo")) + p.daemon = True + p.start() + self.addCleanup(test.support.unlink, test.support.TESTFN) + with open(test.support.TESTFN, "wb") as f: + fd = f.fileno() + if msvcrt: + fd = msvcrt.get_osfhandle(fd) + reduction.send_handle(conn, fd, p.pid) + p.join() + with open(test.support.TESTFN, "rb") as f: + self.assertEqual(f.read(), b"foo") + + @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") + @unittest.skipIf(sys.platform == "win32", + "test semantics don't make sense on Windows") + @unittest.skipIf(MAXFD <= 256, + "largest assignable fd number is too small") + @unittest.skipUnless(hasattr(os, "dup2"), + "test needs os.dup2()") + def test_large_fd_transfer(self): + # With fd > 256 (issue #11657) + if self.TYPE != 'processes': + self.skipTest("only makes sense with processes") + conn, child_conn = self.Pipe(duplex=True) + + p = self.Process(target=self._writefd, args=(child_conn, b"bar", True)) + p.daemon = True + p.start() + self.addCleanup(test.support.unlink, test.support.TESTFN) + with open(test.support.TESTFN, "wb") as f: + fd = f.fileno() + for newfd in range(256, MAXFD): + if not self._is_fd_assigned(newfd): + break + else: + self.fail("could not find an unassigned large file descriptor") + os.dup2(fd, newfd) + try: + reduction.send_handle(conn, newfd, p.pid) + finally: + os.close(newfd) + p.join() + with open(test.support.TESTFN, "rb") as f: + self.assertEqual(f.read(), b"bar") + + @classmethod + def _send_data_without_fd(self, conn): + os.write(conn.fileno(), b"\0") + + @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") + @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows") + def test_missing_fd_transfer(self): + # Check that exception is raised when received data is not + # accompanied by a file descriptor in ancillary data. + if self.TYPE != 'processes': + self.skipTest("only makes sense with processes") + conn, child_conn = self.Pipe(duplex=True) + + p = self.Process(target=self._send_data_without_fd, args=(child_conn,)) + p.daemon = True + p.start() + self.assertRaises(RuntimeError, reduction.recv_handle, conn) + p.join() + class _TestListenerClient(BaseTestCase): ALLOWED_TYPES = ('processes', 'threads') @@ -1516,11 +1782,13 @@ class _TestPicklingConnections(BaseTestCase): lconn, lconn0 = self.Pipe() lp = self.Process(target=self._listener, args=(lconn0, families)) + lp.daemon = True lp.start() lconn0.close() rconn, rconn0 = self.Pipe() rp = self.Process(target=self._remote, args=(rconn0,)) + rp.daemon = True rp.start() rconn0.close() @@ -1658,6 +1926,7 @@ class _TestSharedCTypes(BaseTestCase): string.value = latin('hello') p = self.Process(target=self._double, args=(x, y, foo, arr, string)) + p.daemon = True p.start() p.join() @@ -1730,6 +1999,7 @@ class _TestFinalize(BaseTestCase): conn, child_conn = self.Pipe() p = self.Process(target=self._test_finalize, args=(child_conn,)) + p.daemon = True p.start() p.join() @@ -1749,10 +2019,12 @@ class _TestImportStar(BaseTestCase): 'multiprocessing', 'multiprocessing.connection', 'multiprocessing.heap', 'multiprocessing.managers', 'multiprocessing.pool', 'multiprocessing.process', - 'multiprocessing.reduction', 'multiprocessing.synchronize', 'multiprocessing.util' ] + if HAS_REDUCTION: + modules.append('multiprocessing.reduction') + if c_int is not None: # This module requires _ctypes modules.append('multiprocessing.sharedctypes') @@ -1799,17 +2071,41 @@ class _TestLogging(BaseTestCase): reader, writer = multiprocessing.Pipe(duplex=False) logger.setLevel(LEVEL1) - self.Process(target=self._test_level, args=(writer,)).start() + p = self.Process(target=self._test_level, args=(writer,)) + p.daemon = True + p.start() self.assertEqual(LEVEL1, reader.recv()) logger.setLevel(logging.NOTSET) root_logger.setLevel(LEVEL2) - self.Process(target=self._test_level, args=(writer,)).start() + p = self.Process(target=self._test_level, args=(writer,)) + p.daemon = True + p.start() self.assertEqual(LEVEL2, reader.recv()) root_logger.setLevel(root_level) logger.setLevel(level=LOG_LEVEL) + +# class _TestLoggingProcessName(BaseTestCase): +# +# def handle(self, record): +# assert record.processName == multiprocessing.current_process().name +# self.__handled = True +# +# def test_logging(self): +# handler = logging.Handler() +# handler.handle = self.handle +# self.__handled = False +# # Bypass getLogger() and side-effects +# logger = logging.getLoggerClass()( +# 'multiprocessing.test.TestLoggingProcessName') +# logger.addHandler(handler) +# logger.propagate = False +# +# logger.warn('foo') +# assert self.__handled + # # Test to verify handle verification, see issue 3321 # @@ -1970,6 +2266,7 @@ def _ThisSubProcess(q): def _TestProcess(q): queue = multiprocessing.Queue() subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,)) + subProc.daemon = True subProc.start() subProc.join() @@ -2036,6 +2333,8 @@ def test_main(run=None): except OSError: raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!") + check_enough_semaphores() + if run is None: from test.support import run_unittest as run |
