diff options
author | Antoine Pitrou <pitrou@free.fr> | 2018-01-18 09:38:03 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-01-18 09:38:03 (GMT) |
commit | ab74504346a6e2569b3255b7b621c589716888c4 (patch) | |
tree | 200699b669a4cc8e6896a329db05a5040a19ca5a /Lib/multiprocessing | |
parent | 6027802ca7fae118bce6afead51d01a174600d40 (diff) | |
download | cpython-ab74504346a6e2569b3255b7b621c589716888c4.zip cpython-ab74504346a6e2569b3255b7b621c589716888c4.tar.gz cpython-ab74504346a6e2569b3255b7b621c589716888c4.tar.bz2 |
bpo-32576: use queue.SimpleQueue in critical places (#5216)
Where a queue may be invoked from a weakref callback, we need
to use the reentrant SimpleQueue.
Diffstat (limited to 'Lib/multiprocessing')
-rw-r--r-- | Lib/multiprocessing/pool.py | 19 |
1 files changed, 11 insertions, 8 deletions
diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index b1ee725..3e9a0d6 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -156,7 +156,7 @@ class Pool(object): maxtasksperchild=None, context=None): self._ctx = context or get_context() self._setup_queues() - self._taskqueue = queue.Queue() + self._taskqueue = queue.SimpleQueue() self._cache = {} self._state = RUN self._maxtasksperchild = maxtasksperchild @@ -802,15 +802,18 @@ class ThreadPool(Pool): Pool.__init__(self, processes, initializer, initargs) def _setup_queues(self): - self._inqueue = queue.Queue() - self._outqueue = queue.Queue() + self._inqueue = queue.SimpleQueue() + self._outqueue = queue.SimpleQueue() self._quick_put = self._inqueue.put self._quick_get = self._outqueue.get @staticmethod def _help_stuff_finish(inqueue, task_handler, size): - # put sentinels at head of inqueue to make workers finish - with inqueue.not_empty: - inqueue.queue.clear() - inqueue.queue.extend([None] * size) - inqueue.not_empty.notify_all() + # drain inqueue, and put sentinels at its head to make workers finish + try: + while True: + inqueue.get(block=False) + except queue.Empty: + pass + for i in range(size): + inqueue.put(None) |