summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorCharles-François Natali <cf.natali@gmail.com>2014-05-25 13:12:12 (GMT)
committerCharles-François Natali <cf.natali@gmail.com>2014-05-25 13:12:12 (GMT)
commita924fc7abc2d8788a4a9fa2cbef2caa5c5992ebd (patch)
treecb29ab708c0a0c1f8ea51d20cdc7dbdd19a77046
parent1691e35953858ae06b5198bf12c72a6cd0e0234b (diff)
downloadcpython-a924fc7abc2d8788a4a9fa2cbef2caa5c5992ebd.zip
cpython-a924fc7abc2d8788a4a9fa2cbef2caa5c5992ebd.tar.gz
cpython-a924fc7abc2d8788a4a9fa2cbef2caa5c5992ebd.tar.bz2
Issue #21565: multiprocessing: use contex-manager protocol for synchronization
primitives.
-rw-r--r--Doc/library/multiprocessing.rst3
-rw-r--r--Lib/multiprocessing/dummy/connection.py5
-rw-r--r--Lib/multiprocessing/heap.py7
-rw-r--r--Lib/multiprocessing/managers.py25
-rw-r--r--Lib/multiprocessing/pool.py25
-rw-r--r--Lib/multiprocessing/queues.py27
-rw-r--r--Lib/multiprocessing/sharedctypes.py26
-rw-r--r--Lib/multiprocessing/synchronize.py20
-rw-r--r--Lib/multiprocessing/util.py7
9 files changed, 43 insertions, 102 deletions
diff --git a/Doc/library/multiprocessing.rst b/Doc/library/multiprocessing.rst
index 5fac730..409b2cb 100644
--- a/Doc/library/multiprocessing.rst
+++ b/Doc/library/multiprocessing.rst
@@ -1320,6 +1320,9 @@ processes.
Note that accessing the ctypes object through the wrapper can be a lot slower
than accessing the raw ctypes object.
+ .. versionchanged:: 3.5
+ Synchronized objects support the :term:`context manager` protocol.
+
The table below compares the syntax for creating shared ctypes objects from
shared memory with the normal ctypes syntax. (In the table ``MyStruct`` is some
diff --git a/Lib/multiprocessing/dummy/connection.py b/Lib/multiprocessing/dummy/connection.py
index 694ef96..1984375 100644
--- a/Lib/multiprocessing/dummy/connection.py
+++ b/Lib/multiprocessing/dummy/connection.py
@@ -59,9 +59,8 @@ class Connection(object):
return True
if timeout <= 0.0:
return False
- self._in.not_empty.acquire()
- self._in.not_empty.wait(timeout)
- self._in.not_empty.release()
+ with self._in.not_empty:
+ self._in.not_empty.wait(timeout)
return self._in.qsize() > 0
def close(self):
diff --git a/Lib/multiprocessing/heap.py b/Lib/multiprocessing/heap.py
index 344a45f..9e3016c 100644
--- a/Lib/multiprocessing/heap.py
+++ b/Lib/multiprocessing/heap.py
@@ -216,9 +216,8 @@ class Heap(object):
assert 0 <= size < sys.maxsize
if os.getpid() != self._lastpid:
self.__init__() # reinitialize after fork
- self._lock.acquire()
- self._free_pending_blocks()
- try:
+ with self._lock:
+ self._free_pending_blocks()
size = self._roundup(max(size,1), self._alignment)
(arena, start, stop) = self._malloc(size)
new_stop = start + size
@@ -227,8 +226,6 @@ class Heap(object):
block = (arena, start, new_stop)
self._allocated_blocks.add(block)
return block
- finally:
- self._lock.release()
#
# Class representing a chunk of an mmap -- can be inherited by child process
diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py
index 66d46fc..820ae91 100644
--- a/Lib/multiprocessing/managers.py
+++ b/Lib/multiprocessing/managers.py
@@ -306,8 +306,7 @@ class Server(object):
'''
Return some info --- useful to spot problems with refcounting
'''
- self.mutex.acquire()
- try:
+ with self.mutex:
result = []
keys = list(self.id_to_obj.keys())
keys.sort()
@@ -317,8 +316,6 @@ class Server(object):
(ident, self.id_to_refcount[ident],
str(self.id_to_obj[ident][0])[:75]))
return '\n'.join(result)
- finally:
- self.mutex.release()
def number_of_objects(self, c):
'''
@@ -343,8 +340,7 @@ class Server(object):
'''
Create a new shared object and return its id
'''
- self.mutex.acquire()
- try:
+ with self.mutex:
callable, exposed, method_to_typeid, proxytype = \
self.registry[typeid]
@@ -374,8 +370,6 @@ class Server(object):
# has been created.
self.incref(c, ident)
return ident, tuple(exposed)
- finally:
- self.mutex.release()
def get_methods(self, c, token):
'''
@@ -392,22 +386,16 @@ class Server(object):
self.serve_client(c)
def incref(self, c, ident):
- self.mutex.acquire()
- try:
+ with self.mutex:
self.id_to_refcount[ident] += 1
- finally:
- self.mutex.release()
def decref(self, c, ident):
- self.mutex.acquire()
- try:
+ with self.mutex:
assert self.id_to_refcount[ident] >= 1
self.id_to_refcount[ident] -= 1
if self.id_to_refcount[ident] == 0:
del self.id_to_obj[ident], self.id_to_refcount[ident]
util.debug('disposing of obj with id %r', ident)
- finally:
- self.mutex.release()
#
# Class to represent state of a manager
@@ -671,14 +659,11 @@ class BaseProxy(object):
def __init__(self, token, serializer, manager=None,
authkey=None, exposed=None, incref=True):
- BaseProxy._mutex.acquire()
- try:
+ with BaseProxy._mutex:
tls_idset = BaseProxy._address_to_local.get(token.address, None)
if tls_idset is None:
tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
BaseProxy._address_to_local[token.address] = tls_idset
- finally:
- BaseProxy._mutex.release()
# self._tls is used to record the connection used by this
# thread to communicate with the manager at token.address
diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py
index 8832a5c..77eb817 100644
--- a/Lib/multiprocessing/pool.py
+++ b/Lib/multiprocessing/pool.py
@@ -666,8 +666,7 @@ class IMapIterator(object):
return self
def next(self, timeout=None):
- self._cond.acquire()
- try:
+ with self._cond:
try:
item = self._items.popleft()
except IndexError:
@@ -680,8 +679,6 @@ class IMapIterator(object):
if self._index == self._length:
raise StopIteration
raise TimeoutError
- finally:
- self._cond.release()
success, value = item
if success:
@@ -691,8 +688,7 @@ class IMapIterator(object):
__next__ = next # XXX
def _set(self, i, obj):
- self._cond.acquire()
- try:
+ with self._cond:
if self._index == i:
self._items.append(obj)
self._index += 1
@@ -706,18 +702,13 @@ class IMapIterator(object):
if self._index == self._length:
del self._cache[self._job]
- finally:
- self._cond.release()
def _set_length(self, length):
- self._cond.acquire()
- try:
+ with self._cond:
self._length = length
if self._index == self._length:
self._cond.notify()
del self._cache[self._job]
- finally:
- self._cond.release()
#
# Class whose instances are returned by `Pool.imap_unordered()`
@@ -726,15 +717,12 @@ class IMapIterator(object):
class IMapUnorderedIterator(IMapIterator):
def _set(self, i, obj):
- self._cond.acquire()
- try:
+ with self._cond:
self._items.append(obj)
self._index += 1
self._cond.notify()
if self._index == self._length:
del self._cache[self._job]
- finally:
- self._cond.release()
#
#
@@ -760,10 +748,7 @@ class ThreadPool(Pool):
@staticmethod
def _help_stuff_finish(inqueue, task_handler, size):
# put sentinels at head of inqueue to make workers finish
- inqueue.not_empty.acquire()
- try:
+ with inqueue.not_empty:
inqueue.queue.clear()
inqueue.queue.extend([None] * size)
inqueue.not_empty.notify_all()
- finally:
- inqueue.not_empty.release()
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py
index f650771..c07ad40 100644
--- a/Lib/multiprocessing/queues.py
+++ b/Lib/multiprocessing/queues.py
@@ -81,14 +81,11 @@ class Queue(object):
if not self._sem.acquire(block, timeout):
raise Full
- self._notempty.acquire()
- try:
+ with self._notempty:
if self._thread is None:
self._start_thread()
self._buffer.append(obj)
self._notempty.notify()
- finally:
- self._notempty.release()
def get(self, block=True, timeout=None):
if block and timeout is None:
@@ -201,12 +198,9 @@ class Queue(object):
@staticmethod
def _finalize_close(buffer, notempty):
debug('telling queue thread to quit')
- notempty.acquire()
- try:
+ with notempty:
buffer.append(_sentinel)
notempty.notify()
- finally:
- notempty.release()
@staticmethod
def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe):
@@ -295,35 +289,24 @@ class JoinableQueue(Queue):
if not self._sem.acquire(block, timeout):
raise Full
- self._notempty.acquire()
- self._cond.acquire()
- try:
+ with self._notempty, self._cond:
if self._thread is None:
self._start_thread()
self._buffer.append(obj)
self._unfinished_tasks.release()
self._notempty.notify()
- finally:
- self._cond.release()
- self._notempty.release()
def task_done(self):
- self._cond.acquire()
- try:
+ with self._cond:
if not self._unfinished_tasks.acquire(False):
raise ValueError('task_done() called too many times')
if self._unfinished_tasks._semlock._is_zero():
self._cond.notify_all()
- finally:
- self._cond.release()
def join(self):
- self._cond.acquire()
- try:
+ with self._cond:
if not self._unfinished_tasks._semlock._is_zero():
self._cond.wait()
- finally:
- self._cond.release()
#
# Simplified Queue type -- really just a locked pipe
diff --git a/Lib/multiprocessing/sharedctypes.py b/Lib/multiprocessing/sharedctypes.py
index 0c17825..4258f59 100644
--- a/Lib/multiprocessing/sharedctypes.py
+++ b/Lib/multiprocessing/sharedctypes.py
@@ -188,6 +188,12 @@ class SynchronizedBase(object):
self.acquire = self._lock.acquire
self.release = self._lock.release
+ def __enter__(self):
+ return self._lock.__enter__()
+
+ def __exit__(self, *args):
+ return self._lock.__exit__(*args)
+
def __reduce__(self):
assert_spawning(self)
return synchronized, (self._obj, self._lock)
@@ -212,32 +218,20 @@ class SynchronizedArray(SynchronizedBase):
return len(self._obj)
def __getitem__(self, i):
- self.acquire()
- try:
+ with self:
return self._obj[i]
- finally:
- self.release()
def __setitem__(self, i, value):
- self.acquire()
- try:
+ with self:
self._obj[i] = value
- finally:
- self.release()
def __getslice__(self, start, stop):
- self.acquire()
- try:
+ with self:
return self._obj[start:stop]
- finally:
- self.release()
def __setslice__(self, start, stop, values):
- self.acquire()
- try:
+ with self:
self._obj[start:stop] = values
- finally:
- self.release()
class SynchronizedString(SynchronizedArray):
diff --git a/Lib/multiprocessing/synchronize.py b/Lib/multiprocessing/synchronize.py
index dea1cbd..7d44330 100644
--- a/Lib/multiprocessing/synchronize.py
+++ b/Lib/multiprocessing/synchronize.py
@@ -337,34 +337,24 @@ class Event(object):
self._flag = ctx.Semaphore(0)
def is_set(self):
- self._cond.acquire()
- try:
+ with self._cond:
if self._flag.acquire(False):
self._flag.release()
return True
return False
- finally:
- self._cond.release()
def set(self):
- self._cond.acquire()
- try:
+ with self._cond:
self._flag.acquire(False)
self._flag.release()
self._cond.notify_all()
- finally:
- self._cond.release()
def clear(self):
- self._cond.acquire()
- try:
+ with self._cond:
self._flag.acquire(False)
- finally:
- self._cond.release()
def wait(self, timeout=None):
- self._cond.acquire()
- try:
+ with self._cond:
if self._flag.acquire(False):
self._flag.release()
else:
@@ -374,8 +364,6 @@ class Event(object):
self._flag.release()
return True
return False
- finally:
- self._cond.release()
#
# Barrier
diff --git a/Lib/multiprocessing/util.py b/Lib/multiprocessing/util.py
index 0b695e4..8760c82 100644
--- a/Lib/multiprocessing/util.py
+++ b/Lib/multiprocessing/util.py
@@ -327,6 +327,13 @@ class ForkAwareThreadLock(object):
self.acquire = self._lock.acquire
self.release = self._lock.release
+ def __enter__(self):
+ return self._lock.__enter__()
+
+ def __exit__(self, *args):
+ return self._lock.__exit__(*args)
+
+
class ForkAwareLocal(threading.local):
def __init__(self):
register_after_fork(self, lambda obj : obj.__dict__.clear())