diff options
Diffstat (limited to 'Lib/multiprocessing/managers.py')
-rw-r--r-- | Lib/multiprocessing/managers.py | 171 |
1 files changed, 88 insertions, 83 deletions
diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py index 5588ead..1ab147e 100644 --- a/Lib/multiprocessing/managers.py +++ b/Lib/multiprocessing/managers.py @@ -5,32 +5,7 @@ # multiprocessing/managers.py # # Copyright (c) 2006-2008, R Oudkerk -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions -# are met: -# -# 1. Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in the -# documentation and/or other materials provided with the distribution. -# 3. Neither the name of author nor the names of any contributors may be -# used to endorse or promote products derived from this software -# without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND -# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS -# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) -# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY -# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF -# SUCH DAMAGE. +# Licensed to PSF under a Contributor Agreement. # __all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ] @@ -39,19 +14,16 @@ __all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ] # Imports # -import os import sys -import weakref import threading import array import queue from traceback import format_exc -from pickle import PicklingError from multiprocessing import Process, current_process, active_children, Pool, util, connection from multiprocessing.process import AuthenticationString -from multiprocessing.forking import exit, Popen, assert_spawning, ForkingPickler -from multiprocessing.util import Finalize, info +from multiprocessing.forking import Popen, ForkingPickler +from time import time as _time # # Register some things for pickling @@ -168,28 +140,38 @@ class Server(object): self.id_to_obj = {'0': (None, ())} self.id_to_refcount = {} self.mutex = threading.RLock() - self.stop = 0 def serve_forever(self): ''' Run the server forever ''' + self.stop_event = threading.Event() current_process()._manager_server = self try: + accepter = threading.Thread(target=self.accepter) + accepter.daemon = True + accepter.start() try: - while 1: - try: - c = self.listener.accept() - except (OSError, IOError): - continue - t = threading.Thread(target=self.handle_request, args=(c,)) - t.daemon = True - t.start() + while not self.stop_event.is_set(): + self.stop_event.wait(1) except (KeyboardInterrupt, SystemExit): pass finally: - self.stop = 999 - self.listener.close() + if sys.stdout != sys.__stdout__: + util.debug('resetting stdout, stderr') + sys.stdout = sys.__stdout__ + sys.stderr = sys.__stderr__ + sys.exit(0) + + def accepter(self): + while True: + try: + c = self.listener.accept() + except (OSError, IOError): + continue + t = threading.Thread(target=self.handle_request, args=(c,)) + t.daemon = True + t.start() def handle_request(self, c): ''' @@ -236,7 +218,7 @@ class Server(object): send = conn.send id_to_obj = self.id_to_obj - while not self.stop: + while not self.stop_event.is_set(): try: methodname = obj = None @@ -346,32 +328,13 @@ class Server(object): Shutdown this process ''' try: - try: - util.debug('manager received shutdown message') - c.send(('#RETURN', None)) - - if sys.stdout != sys.__stdout__: - util.debug('resetting stdout, stderr') - sys.stdout = sys.__stdout__ - sys.stderr = sys.__stderr__ - - util._run_finalizers(0) - - for p in active_children(): - util.debug('terminating a child process of manager') - p.terminate() - - for p in active_children(): - util.debug('terminating a child process of manager') - p.join() - - util._run_finalizers() - util.info('manager exiting with exitcode 0') - except: - import traceback - traceback.print_exc() + util.debug('manager received shutdown message') + c.send(('#RETURN', None)) + except: + import traceback + traceback.print_exc() finally: - exit(0) + self.stop_event.set() def create(self, c, typeid, *args, **kwds): ''' @@ -483,10 +446,6 @@ class BaseManager(object): self._serializer = serializer self._Listener, self._Client = listener_client[serializer] - def __reduce__(self): - return type(self).from_address, \ - (self._address, self._authkey, self._serializer) - def get_server(self): ''' Return server object with serve_forever() method and address attribute @@ -576,7 +535,10 @@ class BaseManager(object): ''' Join the manager process (if it has been spawned) ''' - self._process.join(timeout) + if self._process is not None: + self._process.join(timeout) + if not self._process.is_alive(): + self._process = None def _debug_info(self): ''' @@ -599,6 +561,9 @@ class BaseManager(object): conn.close() def __enter__(self): + if self._state.value == State.INITIAL: + self.start() + assert self._state.value == State.STARTED return self def __exit__(self, exc_type, exc_val, exc_tb): @@ -620,7 +585,7 @@ class BaseManager(object): except Exception: pass - process.join(timeout=0.2) + process.join(timeout=1.0) if process.is_alive(): util.info('manager still alive') if hasattr(process, 'terminate'): @@ -982,8 +947,9 @@ class IteratorProxy(BaseProxy): class AcquirerProxy(BaseProxy): _exposed_ = ('acquire', 'release') - def acquire(self, blocking=True): - return self._callmethod('acquire', (blocking,)) + def acquire(self, blocking=True, timeout=None): + args = (blocking,) if timeout is None else (blocking, timeout) + return self._callmethod('acquire', args) def release(self): return self._callmethod('release') def __enter__(self): @@ -1000,6 +966,24 @@ class ConditionProxy(AcquirerProxy): return self._callmethod('notify') def notify_all(self): return self._callmethod('notify_all') + def wait_for(self, predicate, timeout=None): + result = predicate() + if result: + return result + if timeout is not None: + endtime = _time() + timeout + else: + endtime = None + waittime = None + while not result: + if endtime is not None: + waittime = endtime - _time() + if waittime <= 0: + break + self.wait(waittime) + result = predicate() + return result + class EventProxy(BaseProxy): _exposed_ = ('is_set', 'set', 'clear', 'wait') @@ -1012,6 +996,26 @@ class EventProxy(BaseProxy): def wait(self, timeout=None): return self._callmethod('wait', (timeout,)) + +class BarrierProxy(BaseProxy): + _exposed_ = ('__getattribute__', 'wait', 'abort', 'reset') + def wait(self, timeout=None): + return self._callmethod('wait', (timeout,)) + def abort(self): + return self._callmethod('abort') + def reset(self): + return self._callmethod('reset') + @property + def parties(self): + return self._callmethod('__getattribute__', ('parties',)) + @property + def n_waiting(self): + return self._callmethod('__getattribute__', ('n_waiting',)) + @property + def broken(self): + return self._callmethod('__getattribute__', ('broken',)) + + class NamespaceProxy(BaseProxy): _exposed_ = ('__getattribute__', '__setattr__', '__delattr__') def __getattr__(self, key): @@ -1041,12 +1045,11 @@ class ValueProxy(BaseProxy): BaseListProxy = MakeProxyType('BaseListProxy', ( - '__add__', '__contains__', '__delitem__', '__delslice__', - '__getitem__', '__getslice__', '__len__', '__mul__', - '__reversed__', '__rmul__', '__setitem__', '__setslice__', + '__add__', '__contains__', '__delitem__', '__getitem__', '__len__', + '__mul__', '__reversed__', '__rmul__', '__setitem__', 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove', 'reverse', 'sort', '__imul__' - )) # XXX __getslice__ and __setslice__ unneeded in Py3.0 + )) class ListProxy(BaseListProxy): def __iadd__(self, value): self._callmethod('extend', (value,)) @@ -1064,17 +1067,18 @@ DictProxy = MakeProxyType('DictProxy', ( ArrayProxy = MakeProxyType('ArrayProxy', ( - '__len__', '__getitem__', '__setitem__', '__getslice__', '__setslice__' - )) # XXX __getslice__ and __setslice__ unneeded in Py3.0 + '__len__', '__getitem__', '__setitem__' + )) PoolProxy = MakeProxyType('PoolProxy', ( 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join', - 'map', 'map_async', 'terminate' + 'map', 'map_async', 'starmap', 'starmap_async', 'terminate' )) PoolProxy._method_to_typeid_ = { 'apply_async': 'AsyncResult', 'map_async': 'AsyncResult', + 'starmap_async': 'AsyncResult', 'imap': 'Iterator', 'imap_unordered': 'Iterator' } @@ -1103,6 +1107,7 @@ SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy) SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore, AcquirerProxy) SyncManager.register('Condition', threading.Condition, ConditionProxy) +SyncManager.register('Barrier', threading.Barrier, BarrierProxy) SyncManager.register('Pool', Pool, PoolProxy) SyncManager.register('list', list, ListProxy) SyncManager.register('dict', dict, DictProxy) |