summaryrefslogtreecommitdiffstats
path: root/Lib/multiprocessing
diff options
context:
space:
mode:
authorRichard Oudkerk <shibturn@gmail.com>2012-06-15 17:26:07 (GMT)
committerRichard Oudkerk <shibturn@gmail.com>2012-06-15 17:26:07 (GMT)
commit3730a17a58d9058b0880202050a8c1e4ea193e8f (patch)
tree4dcc6136ee97567e2a9098e10865bff76929823b /Lib/multiprocessing
parenta6bc4b4c85dff9ccd0f91a1e10ac107e59c487a5 (diff)
downloadcpython-3730a17a58d9058b0880202050a8c1e4ea193e8f.zip
cpython-3730a17a58d9058b0880202050a8c1e4ea193e8f.tar.gz
cpython-3730a17a58d9058b0880202050a8c1e4ea193e8f.tar.bz2
Issue #14059: Implement multiprocessing.Barrier
Diffstat (limited to 'Lib/multiprocessing')
-rw-r--r--Lib/multiprocessing/__init__.py11
-rw-r--r--Lib/multiprocessing/dummy/__init__.py4
-rw-r--r--Lib/multiprocessing/managers.py21
-rw-r--r--Lib/multiprocessing/synchronize.py40
4 files changed, 72 insertions, 4 deletions
diff --git a/Lib/multiprocessing/__init__.py b/Lib/multiprocessing/__init__.py
index 02460f0..1f3e67c 100644
--- a/Lib/multiprocessing/__init__.py
+++ b/Lib/multiprocessing/__init__.py
@@ -23,8 +23,8 @@ __all__ = [
'Manager', 'Pipe', 'cpu_count', 'log_to_stderr', 'get_logger',
'allow_connection_pickling', 'BufferTooShort', 'TimeoutError',
'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition',
- 'Event', 'Queue', 'SimpleQueue', 'JoinableQueue', 'Pool', 'Value', 'Array',
- 'RawValue', 'RawArray', 'SUBDEBUG', 'SUBWARNING',
+ 'Event', 'Barrier', 'Queue', 'SimpleQueue', 'JoinableQueue', 'Pool',
+ 'Value', 'Array', 'RawValue', 'RawArray', 'SUBDEBUG', 'SUBWARNING',
]
__author__ = 'R. Oudkerk (r.m.oudkerk@gmail.com)'
@@ -186,6 +186,13 @@ def Event():
from multiprocessing.synchronize import Event
return Event()
+def Barrier(parties, action=None, timeout=None):
+ '''
+ Returns a barrier object
+ '''
+ from multiprocessing.synchronize import Barrier
+ return Barrier(parties, action, timeout)
+
def Queue(maxsize=0):
'''
Returns a queue object
diff --git a/Lib/multiprocessing/dummy/__init__.py b/Lib/multiprocessing/dummy/__init__.py
index 9bf8f6b..e31fc61 100644
--- a/Lib/multiprocessing/dummy/__init__.py
+++ b/Lib/multiprocessing/dummy/__init__.py
@@ -35,7 +35,7 @@
__all__ = [
'Process', 'current_process', 'active_children', 'freeze_support',
'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition',
- 'Event', 'Queue', 'Manager', 'Pipe', 'Pool', 'JoinableQueue'
+ 'Event', 'Barrier', 'Queue', 'Manager', 'Pipe', 'Pool', 'JoinableQueue'
]
#
@@ -49,7 +49,7 @@ import array
from multiprocessing.dummy.connection import Pipe
from threading import Lock, RLock, Semaphore, BoundedSemaphore
-from threading import Event, Condition
+from threading import Event, Condition, Barrier
from queue import Queue
#
diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py
index 817d232..cded4f3 100644
--- a/Lib/multiprocessing/managers.py
+++ b/Lib/multiprocessing/managers.py
@@ -993,6 +993,26 @@ class EventProxy(BaseProxy):
def wait(self, timeout=None):
return self._callmethod('wait', (timeout,))
+
+class BarrierProxy(BaseProxy):
+ _exposed_ = ('__getattribute__', 'wait', 'abort', 'reset')
+ def wait(self, timeout=None):
+ return self._callmethod('wait', (timeout,))
+ def abort(self):
+ return self._callmethod('abort')
+ def reset(self):
+ return self._callmethod('reset')
+ @property
+ def parties(self):
+ return self._callmethod('__getattribute__', ('parties',))
+ @property
+ def n_waiting(self):
+ return self._callmethod('__getattribute__', ('n_waiting',))
+ @property
+ def broken(self):
+ return self._callmethod('__getattribute__', ('broken',))
+
+
class NamespaceProxy(BaseProxy):
_exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
def __getattr__(self, key):
@@ -1084,6 +1104,7 @@ SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
AcquirerProxy)
SyncManager.register('Condition', threading.Condition, ConditionProxy)
+SyncManager.register('Barrier', threading.Barrier, BarrierProxy)
SyncManager.register('Pool', Pool, PoolProxy)
SyncManager.register('list', list, ListProxy)
SyncManager.register('dict', dict, DictProxy)
diff --git a/Lib/multiprocessing/synchronize.py b/Lib/multiprocessing/synchronize.py
index 4502a97..22eabe5 100644
--- a/Lib/multiprocessing/synchronize.py
+++ b/Lib/multiprocessing/synchronize.py
@@ -333,3 +333,43 @@ class Event(object):
return False
finally:
self._cond.release()
+
+#
+# Barrier
+#
+
+class Barrier(threading.Barrier):
+
+ def __init__(self, parties, action=None, timeout=None):
+ import struct
+ from multiprocessing.heap import BufferWrapper
+ wrapper = BufferWrapper(struct.calcsize('i') * 2)
+ cond = Condition()
+ self.__setstate__((parties, action, timeout, cond, wrapper))
+ self._state = 0
+ self._count = 0
+
+ def __setstate__(self, state):
+ (self._parties, self._action, self._timeout,
+ self._cond, self._wrapper) = state
+ self._array = self._wrapper.create_memoryview().cast('i')
+
+ def __getstate__(self):
+ return (self._parties, self._action, self._timeout,
+ self._cond, self._wrapper)
+
+ @property
+ def _state(self):
+ return self._array[0]
+
+ @_state.setter
+ def _state(self, value):
+ self._array[0] = value
+
+ @property
+ def _count(self):
+ return self._array[1]
+
+ @_count.setter
+ def _count(self, value):
+ self._array[1] = value