From d69cfe88eae5b177b1aaf51c39e85fb92c34cf22 Mon Sep 17 00:00:00 2001 From: Richard Oudkerk Date: Mon, 18 Jun 2012 17:47:52 +0100 Subject: Issue #15064: Implement context manager protocol for multiprocessing types --- Doc/library/multiprocessing.rst | 17 ++++++++++++++++ Lib/multiprocessing/connection.py | 18 ++++++++++++++++- Lib/multiprocessing/dummy/connection.py | 12 +++++++++++ Lib/multiprocessing/pool.py | 6 ++++++ Lib/test/test_multiprocessing.py | 35 +++++++++++++++++++++++++++++++++ Misc/NEWS | 2 ++ 6 files changed, 89 insertions(+), 1 deletion(-) diff --git a/Doc/library/multiprocessing.rst b/Doc/library/multiprocessing.rst index 4171977..2f64bb1 100644 --- a/Doc/library/multiprocessing.rst +++ b/Doc/library/multiprocessing.rst @@ -834,6 +834,10 @@ Connection objects are usually created using :func:`Pipe` -- see also Connection objects themselves can now be transferred between processes using :meth:`Connection.send` and :meth:`Connection.recv`. + .. versionadded:: 3.3 + Connection objects now support the context manager protocol -- see + :ref:`typecontextmanager`. :meth:`__enter__` returns the + connection object, and :meth:`__exit__` calls :meth:`close`. For example: @@ -1277,6 +1281,9 @@ their parent process exits. The manager classes are defined in the The address used by the manager. + Manager objects support the context manager protocol -- see + :ref:`typecontextmanager`. :meth:`__enter__` returns the + manager object, and :meth:`__exit__` calls :meth:`shutdown`. .. class:: SyncManager @@ -1747,6 +1754,11 @@ with the :class:`Pool` class. Wait for the worker processes to exit. One must call :meth:`close` or :meth:`terminate` before using :meth:`join`. + .. versionadded:: 3.3 + Pool objects now support the context manager protocol -- see + :ref:`typecontextmanager`. :meth:`__enter__` returns the pool + object, and :meth:`__exit__` calls :meth:`terminate`. + .. class:: AsyncResult @@ -1911,6 +1923,11 @@ multiple connections at the same time. The address from which the last accepted connection came. If this is unavailable then it is ``None``. + .. versionadded:: 3.3 + Listener objects now support the context manager protocol -- see + :ref:`typecontextmanager`. :meth:`__enter__` returns the + listener object, and :meth:`__exit__` calls :meth:`close`. + .. function:: wait(object_list, timeout=None) Wait till an object in *object_list* is ready. Returns the list of diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py index 56f375d..e5694e3 100644 --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -257,6 +257,12 @@ class _ConnectionBase: self._check_readable() return self._poll(timeout) + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, exc_tb): + self.close() + if _winapi: @@ -436,6 +442,8 @@ class Listener(object): Returns a `Connection` object. ''' + if self._listener is None: + raise IOError('listener is closed') c = self._listener.accept() if self._authkey: deliver_challenge(c, self._authkey) @@ -446,11 +454,19 @@ class Listener(object): ''' Close the bound socket or named pipe of `self`. ''' - return self._listener.close() + if self._listener is not None: + self._listener.close() + self._listener = None address = property(lambda self: self._listener._address) last_accepted = property(lambda self: self._listener._last_accepted) + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, exc_tb): + self.close() + def Client(address, family=None, authkey=None): ''' diff --git a/Lib/multiprocessing/dummy/connection.py b/Lib/multiprocessing/dummy/connection.py index af10579..874ec8e 100644 --- a/Lib/multiprocessing/dummy/connection.py +++ b/Lib/multiprocessing/dummy/connection.py @@ -53,6 +53,12 @@ class Listener(object): address = property(lambda self: self._backlog_queue) + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, exc_tb): + self.close() + def Client(address): _in, _out = Queue(), Queue() @@ -85,3 +91,9 @@ class Connection(object): def close(self): pass + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, exc_tb): + self.close() diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index 59e547a..9e07e32 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -522,6 +522,12 @@ class Pool(object): debug('cleaning up worker %d' % p.pid) p.join() + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.terminate() + # # Class whose instances are returned by `Pool.apply_async()` # diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py index 08f87cd..017e6b4 100644 --- a/Lib/test/test_multiprocessing.py +++ b/Lib/test/test_multiprocessing.py @@ -1719,6 +1719,15 @@ class _TestPool(BaseTestCase): p.close() p.join() + def test_context(self): + if self.TYPE == 'processes': + L = list(range(10)) + expected = [sqr(i) for i in L] + with multiprocessing.Pool(2) as p: + r = p.map_async(sqr, L) + self.assertEqual(r.get(), expected) + self.assertRaises(AssertionError, p.map_async, sqr, L) + def raising(): raise KeyError("key") @@ -2266,6 +2275,22 @@ class _TestConnection(BaseTestCase): self.assertRaises(RuntimeError, reduction.recv_handle, conn) p.join() + def test_context(self): + a, b = self.Pipe() + + with a, b: + a.send(1729) + self.assertEqual(b.recv(), 1729) + if self.TYPE == 'processes': + self.assertFalse(a.closed) + self.assertFalse(b.closed) + + if self.TYPE == 'processes': + self.assertTrue(a.closed) + self.assertTrue(b.closed) + self.assertRaises(IOError, a.recv) + self.assertRaises(IOError, b.recv) + class _TestListener(BaseTestCase): ALLOWED_TYPES = ('processes',) @@ -2277,6 +2302,16 @@ class _TestListener(BaseTestCase): self.assertRaises(OSError, self.connection.Listener, l.address, family) + def test_context(self): + with self.connection.Listener() as l: + with self.connection.Client(l.address) as c: + with l.accept() as d: + c.send(1729) + self.assertEqual(d.recv(), 1729) + + if self.TYPE == 'processes': + self.assertRaises(IOError, l.accept) + class _TestListenerClient(BaseTestCase): ALLOWED_TYPES = ('processes', 'threads') diff --git a/Misc/NEWS b/Misc/NEWS index bf4265c..d84da80 100644 --- a/Misc/NEWS +++ b/Misc/NEWS @@ -29,6 +29,8 @@ Core and Builtins Library ------- +- Issue #15064: Implement context manager protocol for multiprocessing types + - Issue #15101: Make pool finalizer avoid joining current thread. - Issue #14657: The frozen instance of importlib used for bootstrap is now -- cgit v0.12