summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAntoine Pitrou <pitrou@free.fr>2018-01-18 09:38:03 (GMT)
committerGitHub <noreply@github.com>2018-01-18 09:38:03 (GMT)
commitab74504346a6e2569b3255b7b621c589716888c4 (patch)
tree200699b669a4cc8e6896a329db05a5040a19ca5a
parent6027802ca7fae118bce6afead51d01a174600d40 (diff)
downloadcpython-ab74504346a6e2569b3255b7b621c589716888c4.zip
cpython-ab74504346a6e2569b3255b7b621c589716888c4.tar.gz
cpython-ab74504346a6e2569b3255b7b621c589716888c4.tar.bz2
bpo-32576: use queue.SimpleQueue in critical places (#5216)
Where a queue may be invoked from a weakref callback, we need to use the reentrant SimpleQueue.
-rw-r--r--Lib/concurrent/futures/thread.py2
-rw-r--r--Lib/multiprocessing/pool.py19
-rw-r--r--Misc/NEWS.d/next/Library/2018-01-17-13-04-16.bpo-32576.iDL09t.rst2
3 files changed, 14 insertions, 9 deletions
diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py
index 2e7100b..6e22950 100644
--- a/Lib/concurrent/futures/thread.py
+++ b/Lib/concurrent/futures/thread.py
@@ -128,7 +128,7 @@ class ThreadPoolExecutor(_base.Executor):
raise TypeError("initializer must be a callable")
self._max_workers = max_workers
- self._work_queue = queue.Queue()
+ self._work_queue = queue.SimpleQueue()
self._threads = set()
self._broken = False
self._shutdown = False
diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py
index b1ee725..3e9a0d6 100644
--- a/Lib/multiprocessing/pool.py
+++ b/Lib/multiprocessing/pool.py
@@ -156,7 +156,7 @@ class Pool(object):
maxtasksperchild=None, context=None):
self._ctx = context or get_context()
self._setup_queues()
- self._taskqueue = queue.Queue()
+ self._taskqueue = queue.SimpleQueue()
self._cache = {}
self._state = RUN
self._maxtasksperchild = maxtasksperchild
@@ -802,15 +802,18 @@ class ThreadPool(Pool):
Pool.__init__(self, processes, initializer, initargs)
def _setup_queues(self):
- self._inqueue = queue.Queue()
- self._outqueue = queue.Queue()
+ self._inqueue = queue.SimpleQueue()
+ self._outqueue = queue.SimpleQueue()
self._quick_put = self._inqueue.put
self._quick_get = self._outqueue.get
@staticmethod
def _help_stuff_finish(inqueue, task_handler, size):
- # put sentinels at head of inqueue to make workers finish
- with inqueue.not_empty:
- inqueue.queue.clear()
- inqueue.queue.extend([None] * size)
- inqueue.not_empty.notify_all()
+ # drain inqueue, and put sentinels at its head to make workers finish
+ try:
+ while True:
+ inqueue.get(block=False)
+ except queue.Empty:
+ pass
+ for i in range(size):
+ inqueue.put(None)
diff --git a/Misc/NEWS.d/next/Library/2018-01-17-13-04-16.bpo-32576.iDL09t.rst b/Misc/NEWS.d/next/Library/2018-01-17-13-04-16.bpo-32576.iDL09t.rst
new file mode 100644
index 0000000..143a83e
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2018-01-17-13-04-16.bpo-32576.iDL09t.rst
@@ -0,0 +1,2 @@
+Use queue.SimpleQueue() in places where it can be invoked from a weakref
+callback.