diff options
Diffstat (limited to 'Lib/multiprocessing/queues.py')
-rw-r--r-- | Lib/multiprocessing/queues.py | 36 |
1 files changed, 19 insertions, 17 deletions
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index ec188ee..10e40a5 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -18,11 +18,15 @@ import weakref import errno from queue import Empty, Full + import _multiprocessing -from multiprocessing.connection import Pipe -from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition -from multiprocessing.util import debug, info, Finalize, register_after_fork -from multiprocessing.forking import assert_spawning, ForkingPickler + +from . import connection +from . import popen +from . import synchronize + +from .util import debug, info, Finalize, register_after_fork, is_exiting +from .reduction import ForkingPickler # # Queue type using a pipe, buffer and thread @@ -34,14 +38,14 @@ class Queue(object): if maxsize <= 0: maxsize = _multiprocessing.SemLock.SEM_VALUE_MAX self._maxsize = maxsize - self._reader, self._writer = Pipe(duplex=False) - self._rlock = Lock() + self._reader, self._writer = connection.Pipe(duplex=False) + self._rlock = synchronize.Lock() self._opid = os.getpid() if sys.platform == 'win32': self._wlock = None else: - self._wlock = Lock() - self._sem = BoundedSemaphore(maxsize) + self._wlock = synchronize.Lock() + self._sem = synchronize.BoundedSemaphore(maxsize) # For use by concurrent.futures self._ignore_epipe = False @@ -51,7 +55,7 @@ class Queue(object): register_after_fork(self, Queue._after_fork) def __getstate__(self): - assert_spawning(self) + popen.assert_spawning(self) return (self._ignore_epipe, self._maxsize, self._reader, self._writer, self._rlock, self._wlock, self._sem, self._opid) @@ -208,8 +212,6 @@ class Queue(object): @staticmethod def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe): debug('starting thread to feed data to pipe') - from .util import is_exiting - nacquire = notempty.acquire nrelease = notempty.release nwait = notempty.wait @@ -279,8 +281,8 @@ class JoinableQueue(Queue): def __init__(self, maxsize=0): Queue.__init__(self, maxsize) - self._unfinished_tasks = Semaphore(0) - self._cond = Condition() + self._unfinished_tasks = synchronize.Semaphore(0) + self._cond = synchronize.Condition() def __getstate__(self): return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks) @@ -331,19 +333,19 @@ class JoinableQueue(Queue): class SimpleQueue(object): def __init__(self): - self._reader, self._writer = Pipe(duplex=False) - self._rlock = Lock() + self._reader, self._writer = connection.Pipe(duplex=False) + self._rlock = synchronize.Lock() self._poll = self._reader.poll if sys.platform == 'win32': self._wlock = None else: - self._wlock = Lock() + self._wlock = synchronize.Lock() def empty(self): return not self._poll() def __getstate__(self): - assert_spawning(self) + popen.assert_spawning(self) return (self._reader, self._writer, self._rlock, self._wlock) def __setstate__(self, state): |