diff options
-rw-r--r-- | Lib/multiprocessing/reduction.py | 29 | ||||
-rw-r--r-- | Lib/test/test_multiprocessing.py | 5 |
2 files changed, 33 insertions, 1 deletions
diff --git a/Lib/multiprocessing/reduction.py b/Lib/multiprocessing/reduction.py index ce38fe3..cef445b 100644 --- a/Lib/multiprocessing/reduction.py +++ b/Lib/multiprocessing/reduction.py @@ -40,6 +40,7 @@ import sys import socket import threading import struct +import signal from multiprocessing import current_process from multiprocessing.util import register_after_fork, debug, sub_debug @@ -209,6 +210,7 @@ class ResourceSharer(object): self._lock = threading.Lock() self._listener = None self._address = None + self._thread = None register_after_fork(self, ResourceSharer._afterfork) def register(self, send, close): @@ -227,6 +229,24 @@ class ResourceSharer(object): c.send((key, os.getpid())) return c + def stop(self, timeout=None): + from .connection import Client + with self._lock: + if self._address is not None: + c = Client(self._address, authkey=current_process().authkey) + c.send(None) + c.close() + self._thread.join(timeout) + if self._thread.is_alive(): + sub_warn('ResourceSharer thread did not stop when asked') + self._listener.close() + self._thread = None + self._address = None + self._listener = None + for key, (send, close) in self._cache.items(): + close() + self._cache.clear() + def _afterfork(self): for key, (send, close) in self._cache.items(): close() @@ -239,6 +259,7 @@ class ResourceSharer(object): self._listener.close() self._listener = None self._address = None + self._thread = None def _start(self): from .connection import Listener @@ -249,12 +270,18 @@ class ResourceSharer(object): t = threading.Thread(target=self._serve) t.daemon = True t.start() + self._thread = t def _serve(self): + if hasattr(signal, 'pthread_sigmask'): + signal.pthread_sigmask(signal.SIG_BLOCK, range(1, signal.NSIG)) while 1: try: conn = self._listener.accept() - key, destination_pid = conn.recv() + msg = conn.recv() + if msg is None: + break + key, destination_pid = msg send, close = self._cache.pop(key) send(conn, destination_pid) close() diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py index 4c22aef..799be70 100644 --- a/Lib/test/test_multiprocessing.py +++ b/Lib/test/test_multiprocessing.py @@ -1966,6 +1966,11 @@ class _TestPicklingConnections(BaseTestCase): ALLOWED_TYPES = ('processes',) @classmethod + def tearDownClass(cls): + from multiprocessing.reduction import resource_sharer + resource_sharer.stop(timeout=5) + + @classmethod def _listener(cls, conn, families): for fam in families: l = cls.connection.Listener(family=fam) |