diff options
Diffstat (limited to 'Lib/multiprocessing/reduction.py')
-rw-r--r-- | Lib/multiprocessing/reduction.py | 29 |
1 files changed, 28 insertions, 1 deletions
diff --git a/Lib/multiprocessing/reduction.py b/Lib/multiprocessing/reduction.py index ce38fe3..cef445b 100644 --- a/Lib/multiprocessing/reduction.py +++ b/Lib/multiprocessing/reduction.py @@ -40,6 +40,7 @@ import sys import socket import threading import struct +import signal from multiprocessing import current_process from multiprocessing.util import register_after_fork, debug, sub_debug @@ -209,6 +210,7 @@ class ResourceSharer(object): self._lock = threading.Lock() self._listener = None self._address = None + self._thread = None register_after_fork(self, ResourceSharer._afterfork) def register(self, send, close): @@ -227,6 +229,24 @@ class ResourceSharer(object): c.send((key, os.getpid())) return c + def stop(self, timeout=None): + from .connection import Client + with self._lock: + if self._address is not None: + c = Client(self._address, authkey=current_process().authkey) + c.send(None) + c.close() + self._thread.join(timeout) + if self._thread.is_alive(): + sub_warn('ResourceSharer thread did not stop when asked') + self._listener.close() + self._thread = None + self._address = None + self._listener = None + for key, (send, close) in self._cache.items(): + close() + self._cache.clear() + def _afterfork(self): for key, (send, close) in self._cache.items(): close() @@ -239,6 +259,7 @@ class ResourceSharer(object): self._listener.close() self._listener = None self._address = None + self._thread = None def _start(self): from .connection import Listener @@ -249,12 +270,18 @@ class ResourceSharer(object): t = threading.Thread(target=self._serve) t.daemon = True t.start() + self._thread = t def _serve(self): + if hasattr(signal, 'pthread_sigmask'): + signal.pthread_sigmask(signal.SIG_BLOCK, range(1, signal.NSIG)) while 1: try: conn = self._listener.accept() - key, destination_pid = conn.recv() + msg = conn.recv() + if msg is None: + break + key, destination_pid = msg send, close = self._cache.pop(key) send(conn, destination_pid) close() |