diff options
Diffstat (limited to 'Lib/multiprocessing/queues.py')
-rw-r--r-- | Lib/multiprocessing/queues.py | 27 |
1 files changed, 5 insertions, 22 deletions
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index f650771..c07ad40 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -81,14 +81,11 @@ class Queue(object): if not self._sem.acquire(block, timeout): raise Full - self._notempty.acquire() - try: + with self._notempty: if self._thread is None: self._start_thread() self._buffer.append(obj) self._notempty.notify() - finally: - self._notempty.release() def get(self, block=True, timeout=None): if block and timeout is None: @@ -201,12 +198,9 @@ class Queue(object): @staticmethod def _finalize_close(buffer, notempty): debug('telling queue thread to quit') - notempty.acquire() - try: + with notempty: buffer.append(_sentinel) notempty.notify() - finally: - notempty.release() @staticmethod def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe): @@ -295,35 +289,24 @@ class JoinableQueue(Queue): if not self._sem.acquire(block, timeout): raise Full - self._notempty.acquire() - self._cond.acquire() - try: + with self._notempty, self._cond: if self._thread is None: self._start_thread() self._buffer.append(obj) self._unfinished_tasks.release() self._notempty.notify() - finally: - self._cond.release() - self._notempty.release() def task_done(self): - self._cond.acquire() - try: + with self._cond: if not self._unfinished_tasks.acquire(False): raise ValueError('task_done() called too many times') if self._unfinished_tasks._semlock._is_zero(): self._cond.notify_all() - finally: - self._cond.release() def join(self): - self._cond.acquire() - try: + with self._cond: if not self._unfinished_tasks._semlock._is_zero(): self._cond.wait() - finally: - self._cond.release() # # Simplified Queue type -- really just a locked pipe |