diff options
Diffstat (limited to 'Lib/multiprocessing/queues.py')
| -rw-r--r-- | Lib/multiprocessing/queues.py | 27 | 
1 files changed, 5 insertions, 22 deletions
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index f650771..c07ad40 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -81,14 +81,11 @@ class Queue(object):          if not self._sem.acquire(block, timeout):              raise Full -        self._notempty.acquire() -        try: +        with self._notempty:              if self._thread is None:                  self._start_thread()              self._buffer.append(obj)              self._notempty.notify() -        finally: -            self._notempty.release()      def get(self, block=True, timeout=None):          if block and timeout is None: @@ -201,12 +198,9 @@ class Queue(object):      @staticmethod      def _finalize_close(buffer, notempty):          debug('telling queue thread to quit') -        notempty.acquire() -        try: +        with notempty:              buffer.append(_sentinel)              notempty.notify() -        finally: -            notempty.release()      @staticmethod      def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe): @@ -295,35 +289,24 @@ class JoinableQueue(Queue):          if not self._sem.acquire(block, timeout):              raise Full -        self._notempty.acquire() -        self._cond.acquire() -        try: +        with self._notempty, self._cond:              if self._thread is None:                  self._start_thread()              self._buffer.append(obj)              self._unfinished_tasks.release()              self._notempty.notify() -        finally: -            self._cond.release() -            self._notempty.release()      def task_done(self): -        self._cond.acquire() -        try: +        with self._cond:              if not self._unfinished_tasks.acquire(False):                  raise ValueError('task_done() called too many times')              if self._unfinished_tasks._semlock._is_zero():                  self._cond.notify_all() -        finally: -            self._cond.release()      def join(self): -        self._cond.acquire() -        try: +        with self._cond:              if not self._unfinished_tasks._semlock._is_zero():                  self._cond.wait() -        finally: -            self._cond.release()  #  # Simplified Queue type -- really just a locked pipe  | 
