summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_multiprocessing.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_multiprocessing.py')
-rw-r--r--Lib/test/test_multiprocessing.py337
1 files changed, 318 insertions, 19 deletions
diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py
index e9d0329..8edb420 100644
--- a/Lib/test/test_multiprocessing.py
+++ b/Lib/test/test_multiprocessing.py
@@ -1,11 +1,10 @@
-#!/usr/bin/env python
+#!/usr/bin/env python3
#
# Unit tests for the multiprocessing package
#
import unittest
-import threading
import queue as pyqueue
import time
import io
@@ -25,6 +24,10 @@ import test.support
_multiprocessing = test.support.import_module('_multiprocessing')
# Skip tests if sem_open implementation is broken.
test.support.import_module('multiprocessing.synchronize')
+# import threading after _multiprocessing to raise a more revelant error
+# message: "No module named _multiprocessing". _multiprocessing is not compiled
+# without thread support.
+import threading
import multiprocessing.dummy
import multiprocessing.connection
@@ -35,11 +38,22 @@ import multiprocessing.pool
from multiprocessing import util
try:
+ from multiprocessing import reduction
+ HAS_REDUCTION = True
+except ImportError:
+ HAS_REDUCTION = False
+
+try:
from multiprocessing.sharedctypes import Value, copy
HAS_SHAREDCTYPES = True
except ImportError:
HAS_SHAREDCTYPES = False
+try:
+ import msvcrt
+except ImportError:
+ msvcrt = None
+
#
#
#
@@ -52,7 +66,7 @@ def latin(s):
#
LOG_LEVEL = util.SUBWARNING
-#LOG_LEVEL = logging.WARNING
+#LOG_LEVEL = logging.DEBUG
DELTA = 0.1
CHECK_TIMINGS = False # making true makes tests take a lot longer
@@ -69,6 +83,11 @@ HAVE_GETVALUE = not getattr(_multiprocessing,
WIN32 = (sys.platform == "win32")
+try:
+ MAXFD = os.sysconf("SC_OPEN_MAX")
+except:
+ MAXFD = 256
+
#
# Some tests require ctypes
#
@@ -79,6 +98,22 @@ except ImportError:
Structure = object
c_int = c_double = None
+
+def check_enough_semaphores():
+ """Check that the system supports enough semaphores to run the test."""
+ # minimum number of semaphores available according to POSIX
+ nsems_min = 256
+ try:
+ nsems = os.sysconf("SC_SEM_NSEMS_MAX")
+ except (AttributeError, ValueError):
+ # sysconf not available or setting not available
+ return
+ if nsems == -1 or nsems >= nsems_min:
+ return
+ raise unittest.SkipTest("The OS doesn't support enough semaphores "
+ "to run the test (required: %d)." % nsems_min)
+
+
#
# Creates a wrapper for a function which records the time it takes to finish
#
@@ -156,7 +191,7 @@ class _TestProcess(BaseTestCase):
self.assertTrue(current.is_alive())
self.assertTrue(not current.daemon)
- self.assertTrue(isinstance(authkey, bytes))
+ self.assertIsInstance(authkey, bytes)
self.assertTrue(len(authkey) > 0)
self.assertEqual(current.ident, os.getpid())
self.assertEqual(current.exitcode, None)
@@ -187,7 +222,7 @@ class _TestProcess(BaseTestCase):
self.assertEqual(p.authkey, current.authkey)
self.assertEqual(p.is_alive(), False)
self.assertEqual(p.daemon, True)
- self.assertTrue(p not in self.active_children())
+ self.assertNotIn(p, self.active_children())
self.assertTrue(type(self.active_children()) is list)
self.assertEqual(p.exitcode, None)
@@ -195,7 +230,7 @@ class _TestProcess(BaseTestCase):
self.assertEqual(p.exitcode, None)
self.assertEqual(p.is_alive(), True)
- self.assertTrue(p in self.active_children())
+ self.assertIn(p, self.active_children())
self.assertEqual(q.get(), args[1:])
self.assertEqual(q.get(), kwargs)
@@ -208,7 +243,7 @@ class _TestProcess(BaseTestCase):
self.assertEqual(p.exitcode, 0)
self.assertEqual(p.is_alive(), False)
- self.assertTrue(p not in self.active_children())
+ self.assertNotIn(p, self.active_children())
@classmethod
def _test_terminate(cls):
@@ -223,7 +258,7 @@ class _TestProcess(BaseTestCase):
p.start()
self.assertEqual(p.is_alive(), True)
- self.assertTrue(p in self.active_children())
+ self.assertIn(p, self.active_children())
self.assertEqual(p.exitcode, None)
p.terminate()
@@ -233,7 +268,7 @@ class _TestProcess(BaseTestCase):
self.assertTimingAlmostEqual(join.elapsed, 0.0)
self.assertEqual(p.is_alive(), False)
- self.assertTrue(p not in self.active_children())
+ self.assertNotIn(p, self.active_children())
p.join()
@@ -252,13 +287,14 @@ class _TestProcess(BaseTestCase):
self.assertEqual(type(self.active_children()), list)
p = self.Process(target=time.sleep, args=(DELTA,))
- self.assertTrue(p not in self.active_children())
+ self.assertNotIn(p, self.active_children())
+ p.daemon = True
p.start()
- self.assertTrue(p in self.active_children())
+ self.assertIn(p, self.active_children())
p.join()
- self.assertTrue(p not in self.active_children())
+ self.assertNotIn(p, self.active_children())
@classmethod
def _test_recursion(cls, wconn, id):
@@ -324,12 +360,36 @@ class _TestSubclassingProcess(BaseTestCase):
def test_subclassing(self):
uppercaser = _UpperCaser()
+ uppercaser.daemon = True
uppercaser.start()
self.assertEqual(uppercaser.submit('hello'), 'HELLO')
self.assertEqual(uppercaser.submit('world'), 'WORLD')
uppercaser.stop()
uppercaser.join()
+ def test_stderr_flush(self):
+ # sys.stderr is flushed at process shutdown (issue #13812)
+ if self.TYPE == "threads":
+ return
+
+ testfn = test.support.TESTFN
+ self.addCleanup(test.support.unlink, testfn)
+ proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
+ proc.start()
+ proc.join()
+ with open(testfn, 'r') as f:
+ err = f.read()
+ # The whole traceback was printed
+ self.assertIn("ZeroDivisionError", err)
+ self.assertIn("test_multiprocessing.py", err)
+ self.assertIn("1/0 # MARKER", err)
+
+ @classmethod
+ def _test_stderr_flush(cls, testfn):
+ sys.stderr = open(testfn, 'w')
+ 1/0 # MARKER
+
+
#
#
#
@@ -502,6 +562,7 @@ class _TestQueue(BaseTestCase):
# fork process
p = self.Process(target=self._test_fork, args=(queue,))
+ p.daemon = True
p.start()
# check that all expected items are in the queue
@@ -542,6 +603,7 @@ class _TestQueue(BaseTestCase):
for i in range(4)]
for p in workers:
+ p.daemon = True
p.start()
for i in range(10):
@@ -754,7 +816,13 @@ class _TestCondition(BaseTestCase):
cond.release()
# check they have all woken
- time.sleep(DELTA)
+ for i in range(10):
+ try:
+ if get_value(woken) == 6:
+ break
+ except NotImplementedError:
+ break
+ time.sleep(DELTA)
self.assertReturnsIfImplemented(6, get_value, woken)
# check state is not mucked up
@@ -766,7 +834,7 @@ class _TestCondition(BaseTestCase):
cond.acquire()
res = wait(TIMEOUT1)
cond.release()
- self.assertEqual(res, None)
+ self.assertEqual(res, False)
self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
@@ -806,7 +874,9 @@ class _TestEvent(BaseTestCase):
#self.assertEqual(event.is_set(), False)
- self.Process(target=self._test_event, args=(event,)).start()
+ p = self.Process(target=self._test_event, args=(event,))
+ p.daemon = True
+ p.start()
self.assertEqual(wait(), True)
#
@@ -846,6 +916,7 @@ class _TestValue(BaseTestCase):
self.assertEqual(sv.value, cv[1])
proc = self.Process(target=self._test, args=(values,))
+ proc.daemon = True
proc.start()
proc.join()
@@ -909,6 +980,7 @@ class _TestArray(BaseTestCase):
self.f(seq)
p = self.Process(target=self.f, args=(arr,))
+ p.daemon = True
p.start()
p.join()
@@ -1026,6 +1098,7 @@ class _TestContainers(BaseTestCase):
def sqr(x, wait=0.0):
time.sleep(wait)
return x*x
+
class _TestPool(BaseTestCase):
def test_apply(self):
@@ -1079,6 +1152,9 @@ class _TestPool(BaseTestCase):
self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
def test_make_pool(self):
+ self.assertRaises(ValueError, multiprocessing.Pool, -1)
+ self.assertRaises(ValueError, multiprocessing.Pool, 0)
+
p = multiprocessing.Pool(3)
self.assertEqual(3, len(p._pool))
p.close()
@@ -1100,7 +1176,100 @@ class _TestPool(BaseTestCase):
self.pool.terminate()
join = TimingWrapper(self.pool.join)
join()
- self.assertLess(join.elapsed, 0.2)
+ self.assertLess(join.elapsed, 0.5)
+
+def raising():
+ raise KeyError("key")
+
+def unpickleable_result():
+ return lambda: 42
+
+class _TestPoolWorkerErrors(BaseTestCase):
+ ALLOWED_TYPES = ('processes', )
+
+ def test_async_error_callback(self):
+ p = multiprocessing.Pool(2)
+
+ scratchpad = [None]
+ def errback(exc):
+ scratchpad[0] = exc
+
+ res = p.apply_async(raising, error_callback=errback)
+ self.assertRaises(KeyError, res.get)
+ self.assertTrue(scratchpad[0])
+ self.assertIsInstance(scratchpad[0], KeyError)
+
+ p.close()
+ p.join()
+
+ def test_unpickleable_result(self):
+ from multiprocessing.pool import MaybeEncodingError
+ p = multiprocessing.Pool(2)
+
+ # Make sure we don't lose pool processes because of encoding errors.
+ for iteration in range(20):
+
+ scratchpad = [None]
+ def errback(exc):
+ scratchpad[0] = exc
+
+ res = p.apply_async(unpickleable_result, error_callback=errback)
+ self.assertRaises(MaybeEncodingError, res.get)
+ wrapped = scratchpad[0]
+ self.assertTrue(wrapped)
+ self.assertIsInstance(scratchpad[0], MaybeEncodingError)
+ self.assertIsNotNone(wrapped.exc)
+ self.assertIsNotNone(wrapped.value)
+
+ p.close()
+ p.join()
+
+class _TestPoolWorkerLifetime(BaseTestCase):
+ ALLOWED_TYPES = ('processes', )
+
+ def test_pool_worker_lifetime(self):
+ p = multiprocessing.Pool(3, maxtasksperchild=10)
+ self.assertEqual(3, len(p._pool))
+ origworkerpids = [w.pid for w in p._pool]
+ # Run many tasks so each worker gets replaced (hopefully)
+ results = []
+ for i in range(100):
+ results.append(p.apply_async(sqr, (i, )))
+ # Fetch the results and verify we got the right answers,
+ # also ensuring all the tasks have completed.
+ for (j, res) in enumerate(results):
+ self.assertEqual(res.get(), sqr(j))
+ # Refill the pool
+ p._repopulate_pool()
+ # Wait until all workers are alive
+ # (countdown * DELTA = 5 seconds max startup process time)
+ countdown = 50
+ while countdown and not all(w.is_alive() for w in p._pool):
+ countdown -= 1
+ time.sleep(DELTA)
+ finalworkerpids = [w.pid for w in p._pool]
+ # All pids should be assigned. See issue #7805.
+ self.assertNotIn(None, origworkerpids)
+ self.assertNotIn(None, finalworkerpids)
+ # Finally, check that the worker pids have changed
+ self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
+ p.close()
+ p.join()
+
+ def test_pool_worker_lifetime_early_close(self):
+ # Issue #10332: closing a pool whose workers have limited lifetimes
+ # before all the tasks completed would make join() hang.
+ p = multiprocessing.Pool(3, maxtasksperchild=1)
+ results = []
+ for i in range(6):
+ results.append(p.apply_async(sqr, (i, 0.3)))
+ p.close()
+ p.join()
+ # check the results
+ for (j, res) in enumerate(results):
+ self.assertEqual(res.get(), sqr(j))
+
+
#
# Test that manager has expected number of shared objects left
#
@@ -1230,6 +1399,7 @@ class _TestRemoteManager(BaseTestCase):
manager.start()
p = self.Process(target=self._putter, args=(manager.address, authkey))
+ p.daemon = True
p.start()
manager2 = QueueManager2(
@@ -1271,6 +1441,7 @@ class _TestManagerRestart(BaseTestCase):
manager.start()
p = self.Process(target=self._putter, args=(manager.address, authkey))
+ p.daemon = True
p.start()
queue = manager.get_queue()
self.assertEqual(queue.get(), 'hello world')
@@ -1403,6 +1574,7 @@ class _TestConnection(BaseTestCase):
conn, child_conn = self.Pipe()
p = self.Process(target=self._echo, args=(child_conn,))
+ p.daemon = True
p.start()
child_conn.close() # this might complete before child initializes
@@ -1446,6 +1618,100 @@ class _TestConnection(BaseTestCase):
self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
+ @classmethod
+ def _is_fd_assigned(cls, fd):
+ try:
+ os.fstat(fd)
+ except OSError as e:
+ if e.errno == errno.EBADF:
+ return False
+ raise
+ else:
+ return True
+
+ @classmethod
+ def _writefd(cls, conn, data, create_dummy_fds=False):
+ if create_dummy_fds:
+ for i in range(0, 256):
+ if not cls._is_fd_assigned(i):
+ os.dup2(conn.fileno(), i)
+ fd = reduction.recv_handle(conn)
+ if msvcrt:
+ fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
+ os.write(fd, data)
+ os.close(fd)
+
+ @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
+ def test_fd_transfer(self):
+ if self.TYPE != 'processes':
+ self.skipTest("only makes sense with processes")
+ conn, child_conn = self.Pipe(duplex=True)
+
+ p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
+ p.daemon = True
+ p.start()
+ self.addCleanup(test.support.unlink, test.support.TESTFN)
+ with open(test.support.TESTFN, "wb") as f:
+ fd = f.fileno()
+ if msvcrt:
+ fd = msvcrt.get_osfhandle(fd)
+ reduction.send_handle(conn, fd, p.pid)
+ p.join()
+ with open(test.support.TESTFN, "rb") as f:
+ self.assertEqual(f.read(), b"foo")
+
+ @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
+ @unittest.skipIf(sys.platform == "win32",
+ "test semantics don't make sense on Windows")
+ @unittest.skipIf(MAXFD <= 256,
+ "largest assignable fd number is too small")
+ @unittest.skipUnless(hasattr(os, "dup2"),
+ "test needs os.dup2()")
+ def test_large_fd_transfer(self):
+ # With fd > 256 (issue #11657)
+ if self.TYPE != 'processes':
+ self.skipTest("only makes sense with processes")
+ conn, child_conn = self.Pipe(duplex=True)
+
+ p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
+ p.daemon = True
+ p.start()
+ self.addCleanup(test.support.unlink, test.support.TESTFN)
+ with open(test.support.TESTFN, "wb") as f:
+ fd = f.fileno()
+ for newfd in range(256, MAXFD):
+ if not self._is_fd_assigned(newfd):
+ break
+ else:
+ self.fail("could not find an unassigned large file descriptor")
+ os.dup2(fd, newfd)
+ try:
+ reduction.send_handle(conn, newfd, p.pid)
+ finally:
+ os.close(newfd)
+ p.join()
+ with open(test.support.TESTFN, "rb") as f:
+ self.assertEqual(f.read(), b"bar")
+
+ @classmethod
+ def _send_data_without_fd(self, conn):
+ os.write(conn.fileno(), b"\0")
+
+ @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
+ @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
+ def test_missing_fd_transfer(self):
+ # Check that exception is raised when received data is not
+ # accompanied by a file descriptor in ancillary data.
+ if self.TYPE != 'processes':
+ self.skipTest("only makes sense with processes")
+ conn, child_conn = self.Pipe(duplex=True)
+
+ p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
+ p.daemon = True
+ p.start()
+ self.assertRaises(RuntimeError, reduction.recv_handle, conn)
+ p.join()
+
class _TestListenerClient(BaseTestCase):
ALLOWED_TYPES = ('processes', 'threads')
@@ -1516,11 +1782,13 @@ class _TestPicklingConnections(BaseTestCase):
lconn, lconn0 = self.Pipe()
lp = self.Process(target=self._listener, args=(lconn0, families))
+ lp.daemon = True
lp.start()
lconn0.close()
rconn, rconn0 = self.Pipe()
rp = self.Process(target=self._remote, args=(rconn0,))
+ rp.daemon = True
rp.start()
rconn0.close()
@@ -1658,6 +1926,7 @@ class _TestSharedCTypes(BaseTestCase):
string.value = latin('hello')
p = self.Process(target=self._double, args=(x, y, foo, arr, string))
+ p.daemon = True
p.start()
p.join()
@@ -1730,6 +1999,7 @@ class _TestFinalize(BaseTestCase):
conn, child_conn = self.Pipe()
p = self.Process(target=self._test_finalize, args=(child_conn,))
+ p.daemon = True
p.start()
p.join()
@@ -1749,10 +2019,12 @@ class _TestImportStar(BaseTestCase):
'multiprocessing', 'multiprocessing.connection',
'multiprocessing.heap', 'multiprocessing.managers',
'multiprocessing.pool', 'multiprocessing.process',
- 'multiprocessing.reduction',
'multiprocessing.synchronize', 'multiprocessing.util'
]
+ if HAS_REDUCTION:
+ modules.append('multiprocessing.reduction')
+
if c_int is not None:
# This module requires _ctypes
modules.append('multiprocessing.sharedctypes')
@@ -1799,17 +2071,41 @@ class _TestLogging(BaseTestCase):
reader, writer = multiprocessing.Pipe(duplex=False)
logger.setLevel(LEVEL1)
- self.Process(target=self._test_level, args=(writer,)).start()
+ p = self.Process(target=self._test_level, args=(writer,))
+ p.daemon = True
+ p.start()
self.assertEqual(LEVEL1, reader.recv())
logger.setLevel(logging.NOTSET)
root_logger.setLevel(LEVEL2)
- self.Process(target=self._test_level, args=(writer,)).start()
+ p = self.Process(target=self._test_level, args=(writer,))
+ p.daemon = True
+ p.start()
self.assertEqual(LEVEL2, reader.recv())
root_logger.setLevel(root_level)
logger.setLevel(level=LOG_LEVEL)
+
+# class _TestLoggingProcessName(BaseTestCase):
+#
+# def handle(self, record):
+# assert record.processName == multiprocessing.current_process().name
+# self.__handled = True
+#
+# def test_logging(self):
+# handler = logging.Handler()
+# handler.handle = self.handle
+# self.__handled = False
+# # Bypass getLogger() and side-effects
+# logger = logging.getLoggerClass()(
+# 'multiprocessing.test.TestLoggingProcessName')
+# logger.addHandler(handler)
+# logger.propagate = False
+#
+# logger.warn('foo')
+# assert self.__handled
+
#
# Test to verify handle verification, see issue 3321
#
@@ -1970,6 +2266,7 @@ def _ThisSubProcess(q):
def _TestProcess(q):
queue = multiprocessing.Queue()
subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
+ subProc.daemon = True
subProc.start()
subProc.join()
@@ -2036,6 +2333,8 @@ def test_main(run=None):
except OSError:
raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
+ check_enough_semaphores()
+
if run is None:
from test.support import run_unittest as run