summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorCharles-François Natali <neologix@free.fr>2012-04-17 16:45:57 (GMT)
committerCharles-François Natali <neologix@free.fr>2012-04-17 16:45:57 (GMT)
commitc8ce715a82fd8034ef1d809b262346c15f2490c4 (patch)
tree125522b5a5daa37ba405f81902aeebe8d91b6c7f
parenta3f4457b172a165fafa65d67e0293f89dbba06b7 (diff)
downloadcpython-c8ce715a82fd8034ef1d809b262346c15f2490c4.zip
cpython-c8ce715a82fd8034ef1d809b262346c15f2490c4.tar.gz
cpython-c8ce715a82fd8034ef1d809b262346c15f2490c4.tar.bz2
Issue #14087: multiprocessing: add Condition.wait_for(). Patch by sbt.
-rw-r--r--Doc/library/multiprocessing.rst6
-rw-r--r--Lib/multiprocessing/managers.py19
-rw-r--r--Lib/multiprocessing/synchronize.py19
-rw-r--r--Lib/test/test_multiprocessing.py67
-rw-r--r--Misc/NEWS2
5 files changed, 113 insertions, 0 deletions
diff --git a/Doc/library/multiprocessing.rst b/Doc/library/multiprocessing.rst
index 1c7b9b9..b9dfd19 100644
--- a/Doc/library/multiprocessing.rst
+++ b/Doc/library/multiprocessing.rst
@@ -897,6 +897,9 @@ object -- see :ref:`multiprocessing-managers`.
If *lock* is specified then it should be a :class:`Lock` or :class:`RLock`
object from :mod:`multiprocessing`.
+ .. versionchanged:: 3.3
+ The :meth:`wait_for` method was added.
+
.. class:: Event()
A clone of :class:`threading.Event`.
@@ -1281,6 +1284,9 @@ their parent process exits. The manager classes are defined in the
If *lock* is supplied then it should be a proxy for a
:class:`threading.Lock` or :class:`threading.RLock` object.
+ .. versionchanged:: 3.3
+ The :meth:`wait_for` method was added.
+
.. method:: Event()
Create a shared :class:`threading.Event` object and return a proxy for it.
diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py
index eaf912c..d1c9d45 100644
--- a/Lib/multiprocessing/managers.py
+++ b/Lib/multiprocessing/managers.py
@@ -48,6 +48,7 @@ from traceback import format_exc
from multiprocessing import Process, current_process, active_children, Pool, util, connection
from multiprocessing.process import AuthenticationString
from multiprocessing.forking import exit, Popen, ForkingPickler
+from time import time as _time
#
# Register some things for pickling
@@ -996,6 +997,24 @@ class ConditionProxy(AcquirerProxy):
return self._callmethod('notify')
def notify_all(self):
return self._callmethod('notify_all')
+ def wait_for(self, predicate, timeout=None):
+ result = predicate()
+ if result:
+ return result
+ if timeout is not None:
+ endtime = _time() + timeout
+ else:
+ endtime = None
+ waittime = None
+ while not result:
+ if endtime is not None:
+ waittime = endtime - _time()
+ if waittime <= 0:
+ break
+ self.wait(waittime)
+ result = predicate()
+ return result
+
class EventProxy(BaseProxy):
_exposed_ = ('is_set', 'set', 'clear', 'wait')
diff --git a/Lib/multiprocessing/synchronize.py b/Lib/multiprocessing/synchronize.py
index e35bbff..532ac5c 100644
--- a/Lib/multiprocessing/synchronize.py
+++ b/Lib/multiprocessing/synchronize.py
@@ -43,6 +43,7 @@ import _multiprocessing
from multiprocessing.process import current_process
from multiprocessing.util import register_after_fork, debug
from multiprocessing.forking import assert_spawning, Popen
+from time import time as _time
# Try to import the mp.synchronize module cleanly, if it fails
# raise ImportError for platforms lacking a working sem_open implementation.
@@ -290,6 +291,24 @@ class Condition(object):
while self._wait_semaphore.acquire(False):
pass
+ def wait_for(self, predicate, timeout=None):
+ result = predicate()
+ if result:
+ return result
+ if timeout is not None:
+ endtime = _time() + timeout
+ else:
+ endtime = None
+ waittime = None
+ while not result:
+ if endtime is not None:
+ waittime = endtime - _time()
+ if waittime <= 0:
+ break
+ self.wait(waittime)
+ result = predicate()
+ return result
+
#
# Event
#
diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py
index 2bcdb4e..bbde366 100644
--- a/Lib/test/test_multiprocessing.py
+++ b/Lib/test/test_multiprocessing.py
@@ -887,6 +887,73 @@ class _TestCondition(BaseTestCase):
self.assertEqual(res, False)
self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
+ @classmethod
+ def _test_waitfor_f(cls, cond, state):
+ with cond:
+ state.value = 0
+ cond.notify()
+ result = cond.wait_for(lambda : state.value==4)
+ if not result or state.value != 4:
+ sys.exit(1)
+
+ @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
+ def test_waitfor(self):
+ # based on test in test/lock_tests.py
+ cond = self.Condition()
+ state = self.Value('i', -1)
+
+ p = self.Process(target=self._test_waitfor_f, args=(cond, state))
+ p.daemon = True
+ p.start()
+
+ with cond:
+ result = cond.wait_for(lambda : state.value==0)
+ self.assertTrue(result)
+ self.assertEqual(state.value, 0)
+
+ for i in range(4):
+ time.sleep(0.01)
+ with cond:
+ state.value += 1
+ cond.notify()
+
+ p.join(5)
+ self.assertFalse(p.is_alive())
+ self.assertEqual(p.exitcode, 0)
+
+ @classmethod
+ def _test_waitfor_timeout_f(cls, cond, state, success):
+ with cond:
+ expected = 0.1
+ dt = time.time()
+ result = cond.wait_for(lambda : state.value==4, timeout=expected)
+ dt = time.time() - dt
+ # borrow logic in assertTimeout() from test/lock_tests.py
+ if not result and expected * 0.6 < dt < expected * 10.0:
+ success.value = True
+
+ @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
+ def test_waitfor_timeout(self):
+ # based on test in test/lock_tests.py
+ cond = self.Condition()
+ state = self.Value('i', 0)
+ success = self.Value('i', False)
+
+ p = self.Process(target=self._test_waitfor_timeout_f,
+ args=(cond, state, success))
+ p.daemon = True
+ p.start()
+
+ # Only increment 3 times, so state == 4 is never reached.
+ for i in range(3):
+ time.sleep(0.01)
+ with cond:
+ state.value += 1
+ cond.notify()
+
+ p.join(5)
+ self.assertTrue(success.value)
+
class _TestEvent(BaseTestCase):
diff --git a/Misc/NEWS b/Misc/NEWS
index 54da24d..f1837e7 100644
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -39,6 +39,8 @@ Core and Builtins
Library
-------
+- Issue #14087: multiprocessing: add Condition.wait_for(). Patch by sbt.
+
- Issue #14452: SysLogHandler no longer inserts a UTF-8 BOM into the message.
- Issue #14386: Expose the dict_proxy internal type as types.MappingProxyType.