summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Lib/multiprocessing/managers.py4
-rw-r--r--Lib/multiprocessing/synchronize.py24
-rw-r--r--Lib/test/_test_multiprocessing.py66
-rw-r--r--Misc/NEWS.d/next/Library/2017-06-29-00-07-22.bpo-29293.Z6WZjD.rst5
4 files changed, 71 insertions, 28 deletions
diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py
index 43dd02a..cae1c10 100644
--- a/Lib/multiprocessing/managers.py
+++ b/Lib/multiprocessing/managers.py
@@ -999,8 +999,8 @@ class ConditionProxy(AcquirerProxy):
_exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
def wait(self, timeout=None):
return self._callmethod('wait', (timeout,))
- def notify(self):
- return self._callmethod('notify')
+ def notify(self, n=1):
+ return self._callmethod('notify', (n,))
def notify_all(self):
return self._callmethod('notify_all')
def wait_for(self, predicate, timeout=None):
diff --git a/Lib/multiprocessing/synchronize.py b/Lib/multiprocessing/synchronize.py
index d4bdf0e..0590ed6 100644
--- a/Lib/multiprocessing/synchronize.py
+++ b/Lib/multiprocessing/synchronize.py
@@ -268,24 +268,7 @@ class Condition(object):
for i in range(count):
self._lock.acquire()
- def notify(self):
- assert self._lock._semlock._is_mine(), 'lock is not owned'
- assert not self._wait_semaphore.acquire(False)
-
- # to take account of timeouts since last notify() we subtract
- # woken_count from sleeping_count and rezero woken_count
- while self._woken_count.acquire(False):
- res = self._sleeping_count.acquire(False)
- assert res
-
- if self._sleeping_count.acquire(False): # try grabbing a sleeper
- self._wait_semaphore.release() # wake up one sleeper
- self._woken_count.acquire() # wait for the sleeper to wake
-
- # rezero _wait_semaphore in case a timeout just happened
- self._wait_semaphore.acquire(False)
-
- def notify_all(self):
+ def notify(self, n=1):
assert self._lock._semlock._is_mine(), 'lock is not owned'
assert not self._wait_semaphore.acquire(False)
@@ -296,7 +279,7 @@ class Condition(object):
assert res
sleepers = 0
- while self._sleeping_count.acquire(False):
+ while sleepers < n and self._sleeping_count.acquire(False):
self._wait_semaphore.release() # wake up one sleeper
sleepers += 1
@@ -308,6 +291,9 @@ class Condition(object):
while self._wait_semaphore.acquire(False):
pass
+ def notify_all(self):
+ self.notify(n=sys.maxsize)
+
def wait_for(self, predicate, timeout=None):
result = predicate()
if result:
diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py
index d0a5446..d83b5a7 100644
--- a/Lib/test/_test_multiprocessing.py
+++ b/Lib/test/_test_multiprocessing.py
@@ -948,6 +948,17 @@ class _TestCondition(BaseTestCase):
woken.release()
cond.release()
+ def assertReachesEventually(self, func, value):
+ for i in range(10):
+ try:
+ if func() == value:
+ break
+ except NotImplementedError:
+ break
+ time.sleep(DELTA)
+ time.sleep(DELTA)
+ self.assertReturnsIfImplemented(value, func)
+
def check_invariant(self, cond):
# this is only supposed to succeed when there are no sleepers
if self.TYPE == 'processes':
@@ -1055,13 +1066,54 @@ class _TestCondition(BaseTestCase):
cond.release()
# check they have all woken
- for i in range(10):
- try:
- if get_value(woken) == 6:
- break
- except NotImplementedError:
- break
- time.sleep(DELTA)
+ self.assertReachesEventually(lambda: get_value(woken), 6)
+
+ # check state is not mucked up
+ self.check_invariant(cond)
+
+ def test_notify_n(self):
+ cond = self.Condition()
+ sleeping = self.Semaphore(0)
+ woken = self.Semaphore(0)
+
+ # start some threads/processes
+ for i in range(3):
+ p = self.Process(target=self.f, args=(cond, sleeping, woken))
+ p.daemon = True
+ p.start()
+
+ t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
+ t.daemon = True
+ t.start()
+
+ # wait for them to all sleep
+ for i in range(6):
+ sleeping.acquire()
+
+ # check no process/thread has woken up
+ time.sleep(DELTA)
+ self.assertReturnsIfImplemented(0, get_value, woken)
+
+ # wake some of them up
+ cond.acquire()
+ cond.notify(n=2)
+ cond.release()
+
+ # check 2 have woken
+ self.assertReachesEventually(lambda: get_value(woken), 2)
+
+ # wake the rest of them
+ cond.acquire()
+ cond.notify(n=4)
+ cond.release()
+
+ self.assertReachesEventually(lambda: get_value(woken), 6)
+
+ # doesn't do anything more
+ cond.acquire()
+ cond.notify(n=3)
+ cond.release()
+
self.assertReturnsIfImplemented(6, get_value, woken)
# check state is not mucked up
diff --git a/Misc/NEWS.d/next/Library/2017-06-29-00-07-22.bpo-29293.Z6WZjD.rst b/Misc/NEWS.d/next/Library/2017-06-29-00-07-22.bpo-29293.Z6WZjD.rst
new file mode 100644
index 0000000..9ef3ace
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2017-06-29-00-07-22.bpo-29293.Z6WZjD.rst
@@ -0,0 +1,5 @@
+Add missing parameter "n" on multiprocessing.Condition.notify().
+
+The doc claims multiprocessing.Condition behaves like threading.Condition,
+but its notify() method lacked the optional "n" argument (to specify the
+number of sleepers to wake up) that threading.Condition.notify() accepts.