diff options
author | Antoine Pitrou <pitrou@free.fr> | 2018-01-18 09:38:03 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-01-18 09:38:03 (GMT) |
commit | ab74504346a6e2569b3255b7b621c589716888c4 (patch) | |
tree | 200699b669a4cc8e6896a329db05a5040a19ca5a | |
parent | 6027802ca7fae118bce6afead51d01a174600d40 (diff) | |
download | cpython-ab74504346a6e2569b3255b7b621c589716888c4.zip cpython-ab74504346a6e2569b3255b7b621c589716888c4.tar.gz cpython-ab74504346a6e2569b3255b7b621c589716888c4.tar.bz2 |
bpo-32576: use queue.SimpleQueue in critical places (#5216)
Where a queue may be invoked from a weakref callback, we need
to use the reentrant SimpleQueue.
-rw-r--r-- | Lib/concurrent/futures/thread.py | 2 | ||||
-rw-r--r-- | Lib/multiprocessing/pool.py | 19 | ||||
-rw-r--r-- | Misc/NEWS.d/next/Library/2018-01-17-13-04-16.bpo-32576.iDL09t.rst | 2 |
3 files changed, 14 insertions, 9 deletions
diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py index 2e7100b..6e22950 100644 --- a/Lib/concurrent/futures/thread.py +++ b/Lib/concurrent/futures/thread.py @@ -128,7 +128,7 @@ class ThreadPoolExecutor(_base.Executor): raise TypeError("initializer must be a callable") self._max_workers = max_workers - self._work_queue = queue.Queue() + self._work_queue = queue.SimpleQueue() self._threads = set() self._broken = False self._shutdown = False diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index b1ee725..3e9a0d6 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -156,7 +156,7 @@ class Pool(object): maxtasksperchild=None, context=None): self._ctx = context or get_context() self._setup_queues() - self._taskqueue = queue.Queue() + self._taskqueue = queue.SimpleQueue() self._cache = {} self._state = RUN self._maxtasksperchild = maxtasksperchild @@ -802,15 +802,18 @@ class ThreadPool(Pool): Pool.__init__(self, processes, initializer, initargs) def _setup_queues(self): - self._inqueue = queue.Queue() - self._outqueue = queue.Queue() + self._inqueue = queue.SimpleQueue() + self._outqueue = queue.SimpleQueue() self._quick_put = self._inqueue.put self._quick_get = self._outqueue.get @staticmethod def _help_stuff_finish(inqueue, task_handler, size): - # put sentinels at head of inqueue to make workers finish - with inqueue.not_empty: - inqueue.queue.clear() - inqueue.queue.extend([None] * size) - inqueue.not_empty.notify_all() + # drain inqueue, and put sentinels at its head to make workers finish + try: + while True: + inqueue.get(block=False) + except queue.Empty: + pass + for i in range(size): + inqueue.put(None) diff --git a/Misc/NEWS.d/next/Library/2018-01-17-13-04-16.bpo-32576.iDL09t.rst b/Misc/NEWS.d/next/Library/2018-01-17-13-04-16.bpo-32576.iDL09t.rst new file mode 100644 index 0000000..143a83e --- /dev/null +++ b/Misc/NEWS.d/next/Library/2018-01-17-13-04-16.bpo-32576.iDL09t.rst @@ -0,0 +1,2 @@ +Use queue.SimpleQueue() in places where it can be invoked from a weakref +callback. |