diff options
Diffstat (limited to 'Lib')
-rw-r--r-- | Lib/multiprocessing/queues.py | 69 |
1 files changed, 26 insertions, 43 deletions
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index f6f02b6..ec188ee 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -22,7 +22,7 @@ import _multiprocessing from multiprocessing.connection import Pipe from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition from multiprocessing.util import debug, info, Finalize, register_after_fork -from multiprocessing.forking import assert_spawning +from multiprocessing.forking import assert_spawning, ForkingPickler # # Queue type using a pipe, buffer and thread @@ -69,8 +69,8 @@ class Queue(object): self._joincancelled = False self._closed = False self._close = None - self._send = self._writer.send - self._recv = self._reader.recv + self._send_bytes = self._writer.send_bytes + self._recv_bytes = self._reader.recv_bytes self._poll = self._reader.poll def put(self, obj, block=True, timeout=None): @@ -89,14 +89,9 @@ class Queue(object): def get(self, block=True, timeout=None): if block and timeout is None: - self._rlock.acquire() - try: - res = self._recv() - self._sem.release() - return res - finally: - self._rlock.release() - + with self._rlock: + res = self._recv_bytes() + self._sem.release() else: if block: deadline = time.time() + timeout @@ -109,11 +104,12 @@ class Queue(object): raise Empty elif not self._poll(): raise Empty - res = self._recv() + res = self._recv_bytes() self._sem.release() - return res finally: self._rlock.release() + # unserialize the data after having released the lock + return ForkingPickler.loads(res) def qsize(self): # Raises NotImplementedError on Mac OSX because of broken sem_getvalue() @@ -158,7 +154,7 @@ class Queue(object): self._buffer.clear() self._thread = threading.Thread( target=Queue._feed, - args=(self._buffer, self._notempty, self._send, + args=(self._buffer, self._notempty, self._send_bytes, self._wlock, self._writer.close, self._ignore_epipe), name='QueueFeederThread' ) @@ -210,7 +206,7 @@ class Queue(object): notempty.release() @staticmethod - def _feed(buffer, notempty, send, writelock, close, ignore_epipe): + def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe): debug('starting thread to feed data to pipe') from .util import is_exiting @@ -241,16 +237,14 @@ class Queue(object): close() return + # serialize the data before acquiring the lock + obj = ForkingPickler.dumps(obj) if wacquire is None: - send(obj) - # Delete references to object. See issue16284 - del obj + send_bytes(obj) else: wacquire() try: - send(obj) - # Delete references to object. See issue16284 - del obj + send_bytes(obj) finally: wrelease() except IndexError: @@ -344,7 +338,6 @@ class SimpleQueue(object): self._wlock = None else: self._wlock = Lock() - self._make_methods() def empty(self): return not self._poll() @@ -355,29 +348,19 @@ class SimpleQueue(object): def __setstate__(self, state): (self._reader, self._writer, self._rlock, self._wlock) = state - self._make_methods() - def _make_methods(self): - recv = self._reader.recv - racquire, rrelease = self._rlock.acquire, self._rlock.release - def get(): - racquire() - try: - return recv() - finally: - rrelease() - self.get = get + def get(self): + with self._rlock: + res = self._reader.recv_bytes() + # unserialize the data after having released the lock + return ForkingPickler.loads(res) + def put(self, obj): + # serialize the data before acquiring the lock + obj = ForkingPickler.dumps(obj) if self._wlock is None: # writes to a message oriented win32 pipe are atomic - self.put = self._writer.send + self._writer.send_bytes(obj) else: - send = self._writer.send - wacquire, wrelease = self._wlock.acquire, self._wlock.release - def put(obj): - wacquire() - try: - return send(obj) - finally: - wrelease() - self.put = put + with self._wlock: + self._writer.send_bytes(obj) |