summaryrefslogtreecommitdiffstats
path: root/Lib/multiprocessing/managers.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/multiprocessing/managers.py')
-rw-r--r--Lib/multiprocessing/managers.py171
1 files changed, 88 insertions, 83 deletions
diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py
index 5588ead..1ab147e 100644
--- a/Lib/multiprocessing/managers.py
+++ b/Lib/multiprocessing/managers.py
@@ -5,32 +5,7 @@
# multiprocessing/managers.py
#
# Copyright (c) 2006-2008, R Oudkerk
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions
-# are met:
-#
-# 1. Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# 2. Redistributions in binary form must reproduce the above copyright
-# notice, this list of conditions and the following disclaimer in the
-# documentation and/or other materials provided with the distribution.
-# 3. Neither the name of author nor the names of any contributors may be
-# used to endorse or promote products derived from this software
-# without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
-# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
-# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
-# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
-# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
-# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
-# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
-# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
-# SUCH DAMAGE.
+# Licensed to PSF under a Contributor Agreement.
#
__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
@@ -39,19 +14,16 @@ __all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
# Imports
#
-import os
import sys
-import weakref
import threading
import array
import queue
from traceback import format_exc
-from pickle import PicklingError
from multiprocessing import Process, current_process, active_children, Pool, util, connection
from multiprocessing.process import AuthenticationString
-from multiprocessing.forking import exit, Popen, assert_spawning, ForkingPickler
-from multiprocessing.util import Finalize, info
+from multiprocessing.forking import Popen, ForkingPickler
+from time import time as _time
#
# Register some things for pickling
@@ -168,28 +140,38 @@ class Server(object):
self.id_to_obj = {'0': (None, ())}
self.id_to_refcount = {}
self.mutex = threading.RLock()
- self.stop = 0
def serve_forever(self):
'''
Run the server forever
'''
+ self.stop_event = threading.Event()
current_process()._manager_server = self
try:
+ accepter = threading.Thread(target=self.accepter)
+ accepter.daemon = True
+ accepter.start()
try:
- while 1:
- try:
- c = self.listener.accept()
- except (OSError, IOError):
- continue
- t = threading.Thread(target=self.handle_request, args=(c,))
- t.daemon = True
- t.start()
+ while not self.stop_event.is_set():
+ self.stop_event.wait(1)
except (KeyboardInterrupt, SystemExit):
pass
finally:
- self.stop = 999
- self.listener.close()
+ if sys.stdout != sys.__stdout__:
+ util.debug('resetting stdout, stderr')
+ sys.stdout = sys.__stdout__
+ sys.stderr = sys.__stderr__
+ sys.exit(0)
+
+ def accepter(self):
+ while True:
+ try:
+ c = self.listener.accept()
+ except (OSError, IOError):
+ continue
+ t = threading.Thread(target=self.handle_request, args=(c,))
+ t.daemon = True
+ t.start()
def handle_request(self, c):
'''
@@ -236,7 +218,7 @@ class Server(object):
send = conn.send
id_to_obj = self.id_to_obj
- while not self.stop:
+ while not self.stop_event.is_set():
try:
methodname = obj = None
@@ -346,32 +328,13 @@ class Server(object):
Shutdown this process
'''
try:
- try:
- util.debug('manager received shutdown message')
- c.send(('#RETURN', None))
-
- if sys.stdout != sys.__stdout__:
- util.debug('resetting stdout, stderr')
- sys.stdout = sys.__stdout__
- sys.stderr = sys.__stderr__
-
- util._run_finalizers(0)
-
- for p in active_children():
- util.debug('terminating a child process of manager')
- p.terminate()
-
- for p in active_children():
- util.debug('terminating a child process of manager')
- p.join()
-
- util._run_finalizers()
- util.info('manager exiting with exitcode 0')
- except:
- import traceback
- traceback.print_exc()
+ util.debug('manager received shutdown message')
+ c.send(('#RETURN', None))
+ except:
+ import traceback
+ traceback.print_exc()
finally:
- exit(0)
+ self.stop_event.set()
def create(self, c, typeid, *args, **kwds):
'''
@@ -483,10 +446,6 @@ class BaseManager(object):
self._serializer = serializer
self._Listener, self._Client = listener_client[serializer]
- def __reduce__(self):
- return type(self).from_address, \
- (self._address, self._authkey, self._serializer)
-
def get_server(self):
'''
Return server object with serve_forever() method and address attribute
@@ -576,7 +535,10 @@ class BaseManager(object):
'''
Join the manager process (if it has been spawned)
'''
- self._process.join(timeout)
+ if self._process is not None:
+ self._process.join(timeout)
+ if not self._process.is_alive():
+ self._process = None
def _debug_info(self):
'''
@@ -599,6 +561,9 @@ class BaseManager(object):
conn.close()
def __enter__(self):
+ if self._state.value == State.INITIAL:
+ self.start()
+ assert self._state.value == State.STARTED
return self
def __exit__(self, exc_type, exc_val, exc_tb):
@@ -620,7 +585,7 @@ class BaseManager(object):
except Exception:
pass
- process.join(timeout=0.2)
+ process.join(timeout=1.0)
if process.is_alive():
util.info('manager still alive')
if hasattr(process, 'terminate'):
@@ -982,8 +947,9 @@ class IteratorProxy(BaseProxy):
class AcquirerProxy(BaseProxy):
_exposed_ = ('acquire', 'release')
- def acquire(self, blocking=True):
- return self._callmethod('acquire', (blocking,))
+ def acquire(self, blocking=True, timeout=None):
+ args = (blocking,) if timeout is None else (blocking, timeout)
+ return self._callmethod('acquire', args)
def release(self):
return self._callmethod('release')
def __enter__(self):
@@ -1000,6 +966,24 @@ class ConditionProxy(AcquirerProxy):
return self._callmethod('notify')
def notify_all(self):
return self._callmethod('notify_all')
+ def wait_for(self, predicate, timeout=None):
+ result = predicate()
+ if result:
+ return result
+ if timeout is not None:
+ endtime = _time() + timeout
+ else:
+ endtime = None
+ waittime = None
+ while not result:
+ if endtime is not None:
+ waittime = endtime - _time()
+ if waittime <= 0:
+ break
+ self.wait(waittime)
+ result = predicate()
+ return result
+
class EventProxy(BaseProxy):
_exposed_ = ('is_set', 'set', 'clear', 'wait')
@@ -1012,6 +996,26 @@ class EventProxy(BaseProxy):
def wait(self, timeout=None):
return self._callmethod('wait', (timeout,))
+
+class BarrierProxy(BaseProxy):
+ _exposed_ = ('__getattribute__', 'wait', 'abort', 'reset')
+ def wait(self, timeout=None):
+ return self._callmethod('wait', (timeout,))
+ def abort(self):
+ return self._callmethod('abort')
+ def reset(self):
+ return self._callmethod('reset')
+ @property
+ def parties(self):
+ return self._callmethod('__getattribute__', ('parties',))
+ @property
+ def n_waiting(self):
+ return self._callmethod('__getattribute__', ('n_waiting',))
+ @property
+ def broken(self):
+ return self._callmethod('__getattribute__', ('broken',))
+
+
class NamespaceProxy(BaseProxy):
_exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
def __getattr__(self, key):
@@ -1041,12 +1045,11 @@ class ValueProxy(BaseProxy):
BaseListProxy = MakeProxyType('BaseListProxy', (
- '__add__', '__contains__', '__delitem__', '__delslice__',
- '__getitem__', '__getslice__', '__len__', '__mul__',
- '__reversed__', '__rmul__', '__setitem__', '__setslice__',
+ '__add__', '__contains__', '__delitem__', '__getitem__', '__len__',
+ '__mul__', '__reversed__', '__rmul__', '__setitem__',
'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
'reverse', 'sort', '__imul__'
- )) # XXX __getslice__ and __setslice__ unneeded in Py3.0
+ ))
class ListProxy(BaseListProxy):
def __iadd__(self, value):
self._callmethod('extend', (value,))
@@ -1064,17 +1067,18 @@ DictProxy = MakeProxyType('DictProxy', (
ArrayProxy = MakeProxyType('ArrayProxy', (
- '__len__', '__getitem__', '__setitem__', '__getslice__', '__setslice__'
- )) # XXX __getslice__ and __setslice__ unneeded in Py3.0
+ '__len__', '__getitem__', '__setitem__'
+ ))
PoolProxy = MakeProxyType('PoolProxy', (
'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
- 'map', 'map_async', 'terminate'
+ 'map', 'map_async', 'starmap', 'starmap_async', 'terminate'
))
PoolProxy._method_to_typeid_ = {
'apply_async': 'AsyncResult',
'map_async': 'AsyncResult',
+ 'starmap_async': 'AsyncResult',
'imap': 'Iterator',
'imap_unordered': 'Iterator'
}
@@ -1103,6 +1107,7 @@ SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
AcquirerProxy)
SyncManager.register('Condition', threading.Condition, ConditionProxy)
+SyncManager.register('Barrier', threading.Barrier, BarrierProxy)
SyncManager.register('Pool', Pool, PoolProxy)
SyncManager.register('list', list, ListProxy)
SyncManager.register('dict', dict, DictProxy)