summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_multiprocessing.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_multiprocessing.py')
-rw-r--r--Lib/test/test_multiprocessing.py91
1 files changed, 88 insertions, 3 deletions
diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py
index 8edb420..f141bd4 100644
--- a/Lib/test/test_multiprocessing.py
+++ b/Lib/test/test_multiprocessing.py
@@ -8,6 +8,7 @@ import unittest
import queue as pyqueue
import time
import io
+import itertools
import sys
import os
import gc
@@ -82,6 +83,23 @@ HAVE_GETVALUE = not getattr(_multiprocessing,
'HAVE_BROKEN_SEM_GETVALUE', False)
WIN32 = (sys.platform == "win32")
+if WIN32:
+ from _subprocess import WaitForSingleObject, INFINITE, WAIT_OBJECT_0
+
+ def wait_for_handle(handle, timeout):
+ if timeout is None or timeout < 0.0:
+ timeout = INFINITE
+ else:
+ timeout = int(1000 * timeout)
+ return WaitForSingleObject(handle, timeout) == WAIT_OBJECT_0
+else:
+ from select import select
+ _select = util._eintr_retry(select)
+
+ def wait_for_handle(handle, timeout):
+ if timeout is not None and timeout < 0.0:
+ timeout = None
+ return handle in _select([handle], [], [], timeout)[0]
try:
MAXFD = os.sysconf("SC_OPEN_MAX")
@@ -196,6 +214,18 @@ class _TestProcess(BaseTestCase):
self.assertEqual(current.ident, os.getpid())
self.assertEqual(current.exitcode, None)
+ def test_daemon_argument(self):
+ if self.TYPE == "threads":
+ return
+
+ # By default uses the current process's daemon flag.
+ proc0 = self.Process(target=self._test)
+ self.assertEqual(proc0.daemon, self.current_process().daemon)
+ proc1 = self.Process(target=self._test, daemon=True)
+ self.assertTrue(proc1.daemon)
+ proc2 = self.Process(target=self._test, daemon=False)
+ self.assertFalse(proc2.daemon)
+
@classmethod
def _test(cls, q, *args, **kwds):
current = cls.current_process()
@@ -328,6 +358,26 @@ class _TestProcess(BaseTestCase):
]
self.assertEqual(result, expected)
+ @classmethod
+ def _test_sentinel(cls, event):
+ event.wait(10.0)
+
+ def test_sentinel(self):
+ if self.TYPE == "threads":
+ return
+ event = self.Event()
+ p = self.Process(target=self._test_sentinel, args=(event,))
+ with self.assertRaises(ValueError):
+ p.sentinel
+ p.start()
+ self.addCleanup(p.join)
+ sentinel = p.sentinel
+ self.assertIsInstance(sentinel, int)
+ self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
+ event.set()
+ p.join()
+ self.assertTrue(wait_for_handle(sentinel, timeout=DELTA))
+
#
#
#
@@ -1099,6 +1149,9 @@ def sqr(x, wait=0.0):
time.sleep(wait)
return x*x
+def mul(x, y):
+ return x*y
+
class _TestPool(BaseTestCase):
def test_apply(self):
@@ -1112,6 +1165,20 @@ class _TestPool(BaseTestCase):
self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
list(map(sqr, list(range(100)))))
+ def test_starmap(self):
+ psmap = self.pool.starmap
+ tuples = list(zip(range(10), range(9,-1, -1)))
+ self.assertEqual(psmap(mul, tuples),
+ list(itertools.starmap(mul, tuples)))
+ tuples = list(zip(range(100), range(99,-1, -1)))
+ self.assertEqual(psmap(mul, tuples, chunksize=20),
+ list(itertools.starmap(mul, tuples)))
+
+ def test_starmap_async(self):
+ tuples = list(zip(range(100), range(99,-1, -1)))
+ self.assertEqual(self.pool.starmap_async(mul, tuples).get(),
+ list(itertools.starmap(mul, tuples)))
+
def test_map_chunksize(self):
try:
self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
@@ -1712,6 +1779,17 @@ class _TestConnection(BaseTestCase):
self.assertRaises(RuntimeError, reduction.recv_handle, conn)
p.join()
+class _TestListener(BaseTestCase):
+
+ ALLOWED_TYPES = ('processes')
+
+ def test_multiple_bind(self):
+ for family in self.connection.families:
+ l = self.connection.Listener(family=family)
+ self.addCleanup(l.close)
+ self.assertRaises(OSError, self.connection.Listener,
+ l.address, family)
+
class _TestListenerClient(BaseTestCase):
ALLOWED_TYPES = ('processes', 'threads')
@@ -1732,6 +1810,7 @@ class _TestListenerClient(BaseTestCase):
self.assertEqual(conn.recv(), 'hello')
p.join()
l.close()
+
#
# Test of sending connection and socket objects between processes
#
@@ -2114,9 +2193,15 @@ class TestInvalidHandle(unittest.TestCase):
@unittest.skipIf(WIN32, "skipped on Windows")
def test_invalid_handles(self):
- conn = _multiprocessing.Connection(44977608)
- self.assertRaises(IOError, conn.poll)
- self.assertRaises(IOError, _multiprocessing.Connection, -1)
+ conn = multiprocessing.connection.Connection(44977608)
+ try:
+ self.assertRaises((ValueError, IOError), conn.poll)
+ finally:
+ # Hack private attribute _handle to avoid printing an error
+ # in conn.__del__
+ conn._handle = None
+ self.assertRaises((ValueError, IOError),
+ multiprocessing.connection.Connection, -1)
#
# Functions used to create test cases from the base ones in this module