summaryrefslogtreecommitdiffstats
path: root/Lib/multiprocessing/managers.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/multiprocessing/managers.py')
-rw-r--r--Lib/multiprocessing/managers.py94
1 files changed, 51 insertions, 43 deletions
diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py
index 6a7dccb..f6611af 100644
--- a/Lib/multiprocessing/managers.py
+++ b/Lib/multiprocessing/managers.py
@@ -22,7 +22,7 @@ import queue
from traceback import format_exc
from multiprocessing import Process, current_process, active_children, Pool, util, connection
from multiprocessing.process import AuthenticationString
-from multiprocessing.forking import exit, Popen, ForkingPickler
+from multiprocessing.forking import Popen, ForkingPickler
from time import time as _time
#
@@ -140,28 +140,38 @@ class Server(object):
self.id_to_obj = {'0': (None, ())}
self.id_to_refcount = {}
self.mutex = threading.RLock()
- self.stop = 0
def serve_forever(self):
'''
Run the server forever
'''
+ self.stop_event = threading.Event()
current_process()._manager_server = self
try:
+ accepter = threading.Thread(target=self.accepter)
+ accepter.daemon = True
+ accepter.start()
try:
- while 1:
- try:
- c = self.listener.accept()
- except (OSError, IOError):
- continue
- t = threading.Thread(target=self.handle_request, args=(c,))
- t.daemon = True
- t.start()
+ while not self.stop_event.is_set():
+ self.stop_event.wait(1)
except (KeyboardInterrupt, SystemExit):
pass
finally:
- self.stop = 999
- self.listener.close()
+ if sys.stdout != sys.__stdout__:
+ util.debug('resetting stdout, stderr')
+ sys.stdout = sys.__stdout__
+ sys.stderr = sys.__stderr__
+ sys.exit(0)
+
+ def accepter(self):
+ while True:
+ try:
+ c = self.listener.accept()
+ except (OSError, IOError):
+ continue
+ t = threading.Thread(target=self.handle_request, args=(c,))
+ t.daemon = True
+ t.start()
def handle_request(self, c):
'''
@@ -208,7 +218,7 @@ class Server(object):
send = conn.send
id_to_obj = self.id_to_obj
- while not self.stop:
+ while not self.stop_event.is_set():
try:
methodname = obj = None
@@ -318,32 +328,13 @@ class Server(object):
Shutdown this process
'''
try:
- try:
- util.debug('manager received shutdown message')
- c.send(('#RETURN', None))
-
- if sys.stdout != sys.__stdout__:
- util.debug('resetting stdout, stderr')
- sys.stdout = sys.__stdout__
- sys.stderr = sys.__stderr__
-
- util._run_finalizers(0)
-
- for p in active_children():
- util.debug('terminating a child process of manager')
- p.terminate()
-
- for p in active_children():
- util.debug('terminating a child process of manager')
- p.join()
-
- util._run_finalizers()
- util.info('manager exiting with exitcode 0')
- except:
- import traceback
- traceback.print_exc()
+ util.debug('manager received shutdown message')
+ c.send(('#RETURN', None))
+ except:
+ import traceback
+ traceback.print_exc()
finally:
- exit(0)
+ self.stop_event.set()
def create(self, c, typeid, *args, **kwds):
'''
@@ -455,10 +446,6 @@ class BaseManager(object):
self._serializer = serializer
self._Listener, self._Client = listener_client[serializer]
- def __reduce__(self):
- return type(self).from_address, \
- (self._address, self._authkey, self._serializer)
-
def get_server(self):
'''
Return server object with serve_forever() method and address attribute
@@ -595,7 +582,7 @@ class BaseManager(object):
except Exception:
pass
- process.join(timeout=0.2)
+ process.join(timeout=1.0)
if process.is_alive():
util.info('manager still alive')
if hasattr(process, 'terminate'):
@@ -1006,6 +993,26 @@ class EventProxy(BaseProxy):
def wait(self, timeout=None):
return self._callmethod('wait', (timeout,))
+
+class BarrierProxy(BaseProxy):
+ _exposed_ = ('__getattribute__', 'wait', 'abort', 'reset')
+ def wait(self, timeout=None):
+ return self._callmethod('wait', (timeout,))
+ def abort(self):
+ return self._callmethod('abort')
+ def reset(self):
+ return self._callmethod('reset')
+ @property
+ def parties(self):
+ return self._callmethod('__getattribute__', ('parties',))
+ @property
+ def n_waiting(self):
+ return self._callmethod('__getattribute__', ('n_waiting',))
+ @property
+ def broken(self):
+ return self._callmethod('__getattribute__', ('broken',))
+
+
class NamespaceProxy(BaseProxy):
_exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
def __getattr__(self, key):
@@ -1097,6 +1104,7 @@ SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
AcquirerProxy)
SyncManager.register('Condition', threading.Condition, ConditionProxy)
+SyncManager.register('Barrier', threading.Barrier, BarrierProxy)
SyncManager.register('Pool', Pool, PoolProxy)
SyncManager.register('list', list, ListProxy)
SyncManager.register('dict', dict, DictProxy)