diff options
Diffstat (limited to 'Lib/test/test_multiprocessing.py')
-rw-r--r-- | Lib/test/test_multiprocessing.py | 79 |
1 files changed, 76 insertions, 3 deletions
diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py index de894f3..93cc11d 100644 --- a/Lib/test/test_multiprocessing.py +++ b/Lib/test/test_multiprocessing.py @@ -8,6 +8,7 @@ import unittest import queue as pyqueue import time import io +import itertools import sys import os import gc @@ -82,6 +83,23 @@ HAVE_GETVALUE = not getattr(_multiprocessing, 'HAVE_BROKEN_SEM_GETVALUE', False) WIN32 = (sys.platform == "win32") +if WIN32: + from _subprocess import WaitForSingleObject, INFINITE, WAIT_OBJECT_0 + + def wait_for_handle(handle, timeout): + if timeout is None or timeout < 0.0: + timeout = INFINITE + else: + timeout = int(1000 * timeout) + return WaitForSingleObject(handle, timeout) == WAIT_OBJECT_0 +else: + from select import select + _select = util._eintr_retry(select) + + def wait_for_handle(handle, timeout): + if timeout is not None and timeout < 0.0: + timeout = None + return handle in _select([handle], [], [], timeout)[0] try: MAXFD = os.sysconf("SC_OPEN_MAX") @@ -196,6 +214,18 @@ class _TestProcess(BaseTestCase): self.assertEqual(current.ident, os.getpid()) self.assertEqual(current.exitcode, None) + def test_daemon_argument(self): + if self.TYPE == "threads": + return + + # By default uses the current process's daemon flag. + proc0 = self.Process(target=self._test) + self.assertEqual(proc0.daemon, self.current_process().daemon) + proc1 = self.Process(target=self._test, daemon=True) + self.assertTrue(proc1.daemon) + proc2 = self.Process(target=self._test, daemon=False) + self.assertFalse(proc2.daemon) + @classmethod def _test(cls, q, *args, **kwds): current = cls.current_process() @@ -328,6 +358,26 @@ class _TestProcess(BaseTestCase): ] self.assertEqual(result, expected) + @classmethod + def _test_sentinel(cls, event): + event.wait(10.0) + + def test_sentinel(self): + if self.TYPE == "threads": + return + event = self.Event() + p = self.Process(target=self._test_sentinel, args=(event,)) + with self.assertRaises(ValueError): + p.sentinel + p.start() + self.addCleanup(p.join) + sentinel = p.sentinel + self.assertIsInstance(sentinel, int) + self.assertFalse(wait_for_handle(sentinel, timeout=0.0)) + event.set() + p.join() + self.assertTrue(wait_for_handle(sentinel, timeout=DELTA)) + # # # @@ -1076,6 +1126,9 @@ def sqr(x, wait=0.0): time.sleep(wait) return x*x +def mul(x, y): + return x*y + class _TestPool(BaseTestCase): def test_apply(self): @@ -1089,6 +1142,20 @@ class _TestPool(BaseTestCase): self.assertEqual(pmap(sqr, list(range(100)), chunksize=20), list(map(sqr, list(range(100))))) + def test_starmap(self): + psmap = self.pool.starmap + tuples = list(zip(range(10), range(9,-1, -1))) + self.assertEqual(psmap(mul, tuples), + list(itertools.starmap(mul, tuples))) + tuples = list(zip(range(100), range(99,-1, -1))) + self.assertEqual(psmap(mul, tuples, chunksize=20), + list(itertools.starmap(mul, tuples))) + + def test_starmap_async(self): + tuples = list(zip(range(100), range(99,-1, -1))) + self.assertEqual(self.pool.starmap_async(mul, tuples).get(), + list(itertools.starmap(mul, tuples))) + def test_map_chunksize(self): try: self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1) @@ -2091,9 +2158,15 @@ class TestInvalidHandle(unittest.TestCase): @unittest.skipIf(WIN32, "skipped on Windows") def test_invalid_handles(self): - conn = _multiprocessing.Connection(44977608) - self.assertRaises(IOError, conn.poll) - self.assertRaises(IOError, _multiprocessing.Connection, -1) + conn = multiprocessing.connection.Connection(44977608) + try: + self.assertRaises((ValueError, IOError), conn.poll) + finally: + # Hack private attribute _handle to avoid printing an error + # in conn.__del__ + conn._handle = None + self.assertRaises((ValueError, IOError), + multiprocessing.connection.Connection, -1) # # Functions used to create test cases from the base ones in this module |