summaryrefslogtreecommitdiffstats
path: root/Lib/multiprocessing
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/multiprocessing')
-rw-r--r--Lib/multiprocessing/queues.py69
1 files changed, 26 insertions, 43 deletions
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py
index f6f02b6..ec188ee 100644
--- a/Lib/multiprocessing/queues.py
+++ b/Lib/multiprocessing/queues.py
@@ -22,7 +22,7 @@ import _multiprocessing
from multiprocessing.connection import Pipe
from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition
from multiprocessing.util import debug, info, Finalize, register_after_fork
-from multiprocessing.forking import assert_spawning
+from multiprocessing.forking import assert_spawning, ForkingPickler
#
# Queue type using a pipe, buffer and thread
@@ -69,8 +69,8 @@ class Queue(object):
self._joincancelled = False
self._closed = False
self._close = None
- self._send = self._writer.send
- self._recv = self._reader.recv
+ self._send_bytes = self._writer.send_bytes
+ self._recv_bytes = self._reader.recv_bytes
self._poll = self._reader.poll
def put(self, obj, block=True, timeout=None):
@@ -89,14 +89,9 @@ class Queue(object):
def get(self, block=True, timeout=None):
if block and timeout is None:
- self._rlock.acquire()
- try:
- res = self._recv()
- self._sem.release()
- return res
- finally:
- self._rlock.release()
-
+ with self._rlock:
+ res = self._recv_bytes()
+ self._sem.release()
else:
if block:
deadline = time.time() + timeout
@@ -109,11 +104,12 @@ class Queue(object):
raise Empty
elif not self._poll():
raise Empty
- res = self._recv()
+ res = self._recv_bytes()
self._sem.release()
- return res
finally:
self._rlock.release()
+ # unserialize the data after having released the lock
+ return ForkingPickler.loads(res)
def qsize(self):
# Raises NotImplementedError on Mac OSX because of broken sem_getvalue()
@@ -158,7 +154,7 @@ class Queue(object):
self._buffer.clear()
self._thread = threading.Thread(
target=Queue._feed,
- args=(self._buffer, self._notempty, self._send,
+ args=(self._buffer, self._notempty, self._send_bytes,
self._wlock, self._writer.close, self._ignore_epipe),
name='QueueFeederThread'
)
@@ -210,7 +206,7 @@ class Queue(object):
notempty.release()
@staticmethod
- def _feed(buffer, notempty, send, writelock, close, ignore_epipe):
+ def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe):
debug('starting thread to feed data to pipe')
from .util import is_exiting
@@ -241,16 +237,14 @@ class Queue(object):
close()
return
+ # serialize the data before acquiring the lock
+ obj = ForkingPickler.dumps(obj)
if wacquire is None:
- send(obj)
- # Delete references to object. See issue16284
- del obj
+ send_bytes(obj)
else:
wacquire()
try:
- send(obj)
- # Delete references to object. See issue16284
- del obj
+ send_bytes(obj)
finally:
wrelease()
except IndexError:
@@ -344,7 +338,6 @@ class SimpleQueue(object):
self._wlock = None
else:
self._wlock = Lock()
- self._make_methods()
def empty(self):
return not self._poll()
@@ -355,29 +348,19 @@ class SimpleQueue(object):
def __setstate__(self, state):
(self._reader, self._writer, self._rlock, self._wlock) = state
- self._make_methods()
- def _make_methods(self):
- recv = self._reader.recv
- racquire, rrelease = self._rlock.acquire, self._rlock.release
- def get():
- racquire()
- try:
- return recv()
- finally:
- rrelease()
- self.get = get
+ def get(self):
+ with self._rlock:
+ res = self._reader.recv_bytes()
+ # unserialize the data after having released the lock
+ return ForkingPickler.loads(res)
+ def put(self, obj):
+ # serialize the data before acquiring the lock
+ obj = ForkingPickler.dumps(obj)
if self._wlock is None:
# writes to a message oriented win32 pipe are atomic
- self.put = self._writer.send
+ self._writer.send_bytes(obj)
else:
- send = self._writer.send
- wacquire, wrelease = self._wlock.acquire, self._wlock.release
- def put(obj):
- wacquire()
- try:
- return send(obj)
- finally:
- wrelease()
- self.put = put
+ with self._wlock:
+ self._writer.send_bytes(obj)