diff options
Diffstat (limited to 'Lib/multiprocessing/queues.py')
-rw-r--r-- | Lib/multiprocessing/queues.py | 29 |
1 files changed, 14 insertions, 15 deletions
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index 10e40a5..f650771 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -22,8 +22,7 @@ from queue import Empty, Full import _multiprocessing from . import connection -from . import popen -from . import synchronize +from . import context from .util import debug, info, Finalize, register_after_fork, is_exiting from .reduction import ForkingPickler @@ -34,18 +33,18 @@ from .reduction import ForkingPickler class Queue(object): - def __init__(self, maxsize=0): + def __init__(self, maxsize=0, *, ctx): if maxsize <= 0: maxsize = _multiprocessing.SemLock.SEM_VALUE_MAX self._maxsize = maxsize self._reader, self._writer = connection.Pipe(duplex=False) - self._rlock = synchronize.Lock() + self._rlock = ctx.Lock() self._opid = os.getpid() if sys.platform == 'win32': self._wlock = None else: - self._wlock = synchronize.Lock() - self._sem = synchronize.BoundedSemaphore(maxsize) + self._wlock = ctx.Lock() + self._sem = ctx.BoundedSemaphore(maxsize) # For use by concurrent.futures self._ignore_epipe = False @@ -55,7 +54,7 @@ class Queue(object): register_after_fork(self, Queue._after_fork) def __getstate__(self): - popen.assert_spawning(self) + context.assert_spawning(self) return (self._ignore_epipe, self._maxsize, self._reader, self._writer, self._rlock, self._wlock, self._sem, self._opid) @@ -279,10 +278,10 @@ _sentinel = object() class JoinableQueue(Queue): - def __init__(self, maxsize=0): - Queue.__init__(self, maxsize) - self._unfinished_tasks = synchronize.Semaphore(0) - self._cond = synchronize.Condition() + def __init__(self, maxsize=0, *, ctx): + Queue.__init__(self, maxsize, ctx=ctx) + self._unfinished_tasks = ctx.Semaphore(0) + self._cond = ctx.Condition() def __getstate__(self): return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks) @@ -332,20 +331,20 @@ class JoinableQueue(Queue): class SimpleQueue(object): - def __init__(self): + def __init__(self, *, ctx): self._reader, self._writer = connection.Pipe(duplex=False) - self._rlock = synchronize.Lock() + self._rlock = ctx.Lock() self._poll = self._reader.poll if sys.platform == 'win32': self._wlock = None else: - self._wlock = synchronize.Lock() + self._wlock = ctx.Lock() def empty(self): return not self._poll() def __getstate__(self): - popen.assert_spawning(self) + context.assert_spawning(self) return (self._reader, self._writer, self._rlock, self._wlock) def __setstate__(self, state): |