diff options
-rw-r--r-- | Lib/multiprocessing/managers.py | 4 | ||||
-rw-r--r-- | Lib/multiprocessing/synchronize.py | 24 | ||||
-rw-r--r-- | Lib/test/_test_multiprocessing.py | 66 | ||||
-rw-r--r-- | Misc/NEWS.d/next/Library/2017-06-29-00-07-22.bpo-29293.Z6WZjD.rst | 5 |
4 files changed, 71 insertions, 28 deletions
diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py index 43dd02a..cae1c10 100644 --- a/Lib/multiprocessing/managers.py +++ b/Lib/multiprocessing/managers.py @@ -999,8 +999,8 @@ class ConditionProxy(AcquirerProxy): _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all') def wait(self, timeout=None): return self._callmethod('wait', (timeout,)) - def notify(self): - return self._callmethod('notify') + def notify(self, n=1): + return self._callmethod('notify', (n,)) def notify_all(self): return self._callmethod('notify_all') def wait_for(self, predicate, timeout=None): diff --git a/Lib/multiprocessing/synchronize.py b/Lib/multiprocessing/synchronize.py index d4bdf0e..0590ed6 100644 --- a/Lib/multiprocessing/synchronize.py +++ b/Lib/multiprocessing/synchronize.py @@ -268,24 +268,7 @@ class Condition(object): for i in range(count): self._lock.acquire() - def notify(self): - assert self._lock._semlock._is_mine(), 'lock is not owned' - assert not self._wait_semaphore.acquire(False) - - # to take account of timeouts since last notify() we subtract - # woken_count from sleeping_count and rezero woken_count - while self._woken_count.acquire(False): - res = self._sleeping_count.acquire(False) - assert res - - if self._sleeping_count.acquire(False): # try grabbing a sleeper - self._wait_semaphore.release() # wake up one sleeper - self._woken_count.acquire() # wait for the sleeper to wake - - # rezero _wait_semaphore in case a timeout just happened - self._wait_semaphore.acquire(False) - - def notify_all(self): + def notify(self, n=1): assert self._lock._semlock._is_mine(), 'lock is not owned' assert not self._wait_semaphore.acquire(False) @@ -296,7 +279,7 @@ class Condition(object): assert res sleepers = 0 - while self._sleeping_count.acquire(False): + while sleepers < n and self._sleeping_count.acquire(False): self._wait_semaphore.release() # wake up one sleeper sleepers += 1 @@ -308,6 +291,9 @@ class Condition(object): while self._wait_semaphore.acquire(False): pass + def notify_all(self): + self.notify(n=sys.maxsize) + def wait_for(self, predicate, timeout=None): result = predicate() if result: diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index d0a5446..d83b5a7 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -948,6 +948,17 @@ class _TestCondition(BaseTestCase): woken.release() cond.release() + def assertReachesEventually(self, func, value): + for i in range(10): + try: + if func() == value: + break + except NotImplementedError: + break + time.sleep(DELTA) + time.sleep(DELTA) + self.assertReturnsIfImplemented(value, func) + def check_invariant(self, cond): # this is only supposed to succeed when there are no sleepers if self.TYPE == 'processes': @@ -1055,13 +1066,54 @@ class _TestCondition(BaseTestCase): cond.release() # check they have all woken - for i in range(10): - try: - if get_value(woken) == 6: - break - except NotImplementedError: - break - time.sleep(DELTA) + self.assertReachesEventually(lambda: get_value(woken), 6) + + # check state is not mucked up + self.check_invariant(cond) + + def test_notify_n(self): + cond = self.Condition() + sleeping = self.Semaphore(0) + woken = self.Semaphore(0) + + # start some threads/processes + for i in range(3): + p = self.Process(target=self.f, args=(cond, sleeping, woken)) + p.daemon = True + p.start() + + t = threading.Thread(target=self.f, args=(cond, sleeping, woken)) + t.daemon = True + t.start() + + # wait for them to all sleep + for i in range(6): + sleeping.acquire() + + # check no process/thread has woken up + time.sleep(DELTA) + self.assertReturnsIfImplemented(0, get_value, woken) + + # wake some of them up + cond.acquire() + cond.notify(n=2) + cond.release() + + # check 2 have woken + self.assertReachesEventually(lambda: get_value(woken), 2) + + # wake the rest of them + cond.acquire() + cond.notify(n=4) + cond.release() + + self.assertReachesEventually(lambda: get_value(woken), 6) + + # doesn't do anything more + cond.acquire() + cond.notify(n=3) + cond.release() + self.assertReturnsIfImplemented(6, get_value, woken) # check state is not mucked up diff --git a/Misc/NEWS.d/next/Library/2017-06-29-00-07-22.bpo-29293.Z6WZjD.rst b/Misc/NEWS.d/next/Library/2017-06-29-00-07-22.bpo-29293.Z6WZjD.rst new file mode 100644 index 0000000..9ef3ace --- /dev/null +++ b/Misc/NEWS.d/next/Library/2017-06-29-00-07-22.bpo-29293.Z6WZjD.rst @@ -0,0 +1,5 @@ +Add missing parameter "n" on multiprocessing.Condition.notify(). + +The doc claims multiprocessing.Condition behaves like threading.Condition, +but its notify() method lacked the optional "n" argument (to specify the +number of sleepers to wake up) that threading.Condition.notify() accepts. |