diff options
-rw-r--r-- | Lib/test/_test_multiprocessing.py | 246 | ||||
-rw-r--r-- | Misc/NEWS.d/next/Tests/2019-02-06-18-06-16.bpo-35917.-Clv1L.rst | 3 |
2 files changed, 249 insertions, 0 deletions
diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 7341131..2f839b9 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -4706,6 +4706,252 @@ class TestPoolNotLeakOnFailure(unittest.TestCase): any(process.is_alive() for process in forked_processes)) +class TestSyncManagerTypes(unittest.TestCase): + """Test all the types which can be shared between a parent and a + child process by using a manager which acts as an intermediary + between them. + + In the following unit-tests the base type is created in the parent + process, the @classmethod represents the worker process and the + shared object is readable and editable between the two. + + # The child. + @classmethod + def _test_list(cls, obj): + assert obj[0] == 5 + assert obj.append(6) + + # The parent. + def test_list(self): + o = self.manager.list() + o.append(5) + self.run_worker(self._test_list, o) + assert o[1] == 6 + """ + manager_class = multiprocessing.managers.SyncManager + + def setUp(self): + self.manager = self.manager_class() + self.manager.start() + self.proc = None + + def tearDown(self): + if self.proc is not None and self.proc.is_alive(): + self.proc.terminate() + self.proc.join() + self.manager.shutdown() + + @classmethod + def setUpClass(cls): + support.reap_children() + + tearDownClass = setUpClass + + def wait_proc_exit(self): + # Only the manager process should be returned by active_children() + # but this can take a bit on slow machines, so wait a few seconds + # if there are other children too (see #17395). + join_process(self.proc) + start_time = time.monotonic() + t = 0.01 + while len(multiprocessing.active_children()) > 1: + time.sleep(t) + t *= 2 + dt = time.monotonic() - start_time + if dt >= 5.0: + test.support.environment_altered = True + print("Warning -- multiprocessing.Manager still has %s active " + "children after %s seconds" + % (multiprocessing.active_children(), dt), + file=sys.stderr) + break + + def run_worker(self, worker, obj): + self.proc = multiprocessing.Process(target=worker, args=(obj, )) + self.proc.daemon = True + self.proc.start() + self.wait_proc_exit() + self.assertEqual(self.proc.exitcode, 0) + + @classmethod + def _test_queue(cls, obj): + assert obj.qsize() == 2 + assert obj.full() + assert not obj.empty() + assert obj.get() == 5 + assert not obj.empty() + assert obj.get() == 6 + assert obj.empty() + + def test_queue(self, qname="Queue"): + o = getattr(self.manager, qname)(2) + o.put(5) + o.put(6) + self.run_worker(self._test_queue, o) + assert o.empty() + assert not o.full() + + def test_joinable_queue(self): + self.test_queue("JoinableQueue") + + @classmethod + def _test_event(cls, obj): + assert obj.is_set() + obj.wait() + obj.clear() + obj.wait(0.001) + + def test_event(self): + o = self.manager.Event() + o.set() + self.run_worker(self._test_event, o) + assert not o.is_set() + o.wait(0.001) + + @classmethod + def _test_lock(cls, obj): + obj.acquire() + + def test_lock(self, lname="Lock"): + o = getattr(self.manager, lname)() + self.run_worker(self._test_lock, o) + o.release() + self.assertRaises(RuntimeError, o.release) # already released + + @classmethod + def _test_rlock(cls, obj): + obj.acquire() + obj.release() + + def test_rlock(self, lname="Lock"): + o = getattr(self.manager, lname)() + self.run_worker(self._test_rlock, o) + + @classmethod + def _test_semaphore(cls, obj): + obj.acquire() + + def test_semaphore(self, sname="Semaphore"): + o = getattr(self.manager, sname)() + self.run_worker(self._test_semaphore, o) + o.release() + + def test_bounded_semaphore(self): + self.test_semaphore(sname="BoundedSemaphore") + + @classmethod + def _test_condition(cls, obj): + obj.acquire() + obj.release() + + def test_condition(self): + o = self.manager.Condition() + self.run_worker(self._test_condition, o) + + @classmethod + def _test_barrier(cls, obj): + assert obj.parties == 5 + obj.reset() + + def test_barrier(self): + o = self.manager.Barrier(5) + self.run_worker(self._test_barrier, o) + + @classmethod + def _test_pool(cls, obj): + # TODO: fix https://bugs.python.org/issue35919 + with obj: + pass + + def test_pool(self): + o = self.manager.Pool(processes=4) + self.run_worker(self._test_pool, o) + + @classmethod + def _test_list(cls, obj): + assert obj[0] == 5 + assert obj.count(5) == 1 + assert obj.index(5) == 0 + obj.sort() + obj.reverse() + for x in obj: + pass + assert len(obj) == 1 + assert obj.pop(0) == 5 + + def test_list(self): + o = self.manager.list() + o.append(5) + self.run_worker(self._test_list, o) + assert not o + self.assertEqual(len(o), 0) + + @classmethod + def _test_dict(cls, obj): + assert len(obj) == 1 + assert obj['foo'] == 5 + assert obj.get('foo') == 5 + # TODO: fix https://bugs.python.org/issue35918 + # assert obj.has_key('foo') + assert list(obj.items()) == [('foo', 5)] + assert list(obj.keys()) == ['foo'] + assert list(obj.values()) == [5] + assert obj.copy() == {'foo': 5} + assert obj.popitem() == ('foo', 5) + + def test_dict(self): + o = self.manager.dict() + o['foo'] = 5 + self.run_worker(self._test_dict, o) + assert not o + self.assertEqual(len(o), 0) + + @classmethod + def _test_value(cls, obj): + assert obj.value == 1 + assert obj.get() == 1 + obj.set(2) + + def test_value(self): + o = self.manager.Value('i', 1) + self.run_worker(self._test_value, o) + self.assertEqual(o.value, 2) + self.assertEqual(o.get(), 2) + + @classmethod + def _test_array(cls, obj): + assert obj[0] == 0 + assert obj[1] == 1 + assert len(obj) == 2 + assert list(obj) == [0, 1] + + def test_array(self): + o = self.manager.Array('i', [0, 1]) + self.run_worker(self._test_array, o) + + @classmethod + def _test_namespace(cls, obj): + assert obj.x == 0 + assert obj.y == 1 + + def test_namespace(self): + o = self.manager.Namespace() + o.x = 0 + o.y = 1 + self.run_worker(self._test_namespace, o) + + +try: + import multiprocessing.shared_memory +except ImportError: + @unittest.skip("SharedMemoryManager not available on this platform") + class TestSharedMemoryManagerTypes(TestSyncManagerTypes): + pass +else: + class TestSharedMemoryManagerTypes(TestSyncManagerTypes): + """Same as above but by using SharedMemoryManager.""" + manager_class = multiprocessing.shared_memory.SharedMemoryManager + class MiscTestCase(unittest.TestCase): def test__all__(self): diff --git a/Misc/NEWS.d/next/Tests/2019-02-06-18-06-16.bpo-35917.-Clv1L.rst b/Misc/NEWS.d/next/Tests/2019-02-06-18-06-16.bpo-35917.-Clv1L.rst new file mode 100644 index 0000000..546d47e --- /dev/null +++ b/Misc/NEWS.d/next/Tests/2019-02-06-18-06-16.bpo-35917.-Clv1L.rst @@ -0,0 +1,3 @@ +multiprocessing: provide unit tests for SyncManager and SharedMemoryManager +classes + all the shareable types which are supposed to be supported by +them. (patch by Giampaolo Rodola) |