summaryrefslogtreecommitdiffstats
path: root/Lib/multiprocessing/queues.py
diff options
context:
space:
mode:
authorRichard Oudkerk <shibturn@gmail.com>2013-10-16 15:41:56 (GMT)
committerRichard Oudkerk <shibturn@gmail.com>2013-10-16 15:41:56 (GMT)
commitb1694cf588ca915c003b9b79c9fdeab82deb9476 (patch)
tree923f376dd771dbc25815cad0708c753ef3af3b14 /Lib/multiprocessing/queues.py
parent3e4b52875e0162b536817da93f54b1988195c3ab (diff)
downloadcpython-b1694cf588ca915c003b9b79c9fdeab82deb9476.zip
cpython-b1694cf588ca915c003b9b79c9fdeab82deb9476.tar.gz
cpython-b1694cf588ca915c003b9b79c9fdeab82deb9476.tar.bz2
Issue #18999: Make multiprocessing use context objects.
This allows different parts of a program to use different methods for starting processes without interfering with each other.
Diffstat (limited to 'Lib/multiprocessing/queues.py')
-rw-r--r--Lib/multiprocessing/queues.py29
1 files changed, 14 insertions, 15 deletions
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py
index 10e40a5..f650771 100644
--- a/Lib/multiprocessing/queues.py
+++ b/Lib/multiprocessing/queues.py
@@ -22,8 +22,7 @@ from queue import Empty, Full
import _multiprocessing
from . import connection
-from . import popen
-from . import synchronize
+from . import context
from .util import debug, info, Finalize, register_after_fork, is_exiting
from .reduction import ForkingPickler
@@ -34,18 +33,18 @@ from .reduction import ForkingPickler
class Queue(object):
- def __init__(self, maxsize=0):
+ def __init__(self, maxsize=0, *, ctx):
if maxsize <= 0:
maxsize = _multiprocessing.SemLock.SEM_VALUE_MAX
self._maxsize = maxsize
self._reader, self._writer = connection.Pipe(duplex=False)
- self._rlock = synchronize.Lock()
+ self._rlock = ctx.Lock()
self._opid = os.getpid()
if sys.platform == 'win32':
self._wlock = None
else:
- self._wlock = synchronize.Lock()
- self._sem = synchronize.BoundedSemaphore(maxsize)
+ self._wlock = ctx.Lock()
+ self._sem = ctx.BoundedSemaphore(maxsize)
# For use by concurrent.futures
self._ignore_epipe = False
@@ -55,7 +54,7 @@ class Queue(object):
register_after_fork(self, Queue._after_fork)
def __getstate__(self):
- popen.assert_spawning(self)
+ context.assert_spawning(self)
return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid)
@@ -279,10 +278,10 @@ _sentinel = object()
class JoinableQueue(Queue):
- def __init__(self, maxsize=0):
- Queue.__init__(self, maxsize)
- self._unfinished_tasks = synchronize.Semaphore(0)
- self._cond = synchronize.Condition()
+ def __init__(self, maxsize=0, *, ctx):
+ Queue.__init__(self, maxsize, ctx=ctx)
+ self._unfinished_tasks = ctx.Semaphore(0)
+ self._cond = ctx.Condition()
def __getstate__(self):
return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks)
@@ -332,20 +331,20 @@ class JoinableQueue(Queue):
class SimpleQueue(object):
- def __init__(self):
+ def __init__(self, *, ctx):
self._reader, self._writer = connection.Pipe(duplex=False)
- self._rlock = synchronize.Lock()
+ self._rlock = ctx.Lock()
self._poll = self._reader.poll
if sys.platform == 'win32':
self._wlock = None
else:
- self._wlock = synchronize.Lock()
+ self._wlock = ctx.Lock()
def empty(self):
return not self._poll()
def __getstate__(self):
- popen.assert_spawning(self)
+ context.assert_spawning(self)
return (self._reader, self._writer, self._rlock, self._wlock)
def __setstate__(self, state):