summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Lib/multiprocessing/reduction.py29
-rw-r--r--Lib/test/test_multiprocessing.py5
2 files changed, 33 insertions, 1 deletions
diff --git a/Lib/multiprocessing/reduction.py b/Lib/multiprocessing/reduction.py
index ce38fe3..cef445b 100644
--- a/Lib/multiprocessing/reduction.py
+++ b/Lib/multiprocessing/reduction.py
@@ -40,6 +40,7 @@ import sys
import socket
import threading
import struct
+import signal
from multiprocessing import current_process
from multiprocessing.util import register_after_fork, debug, sub_debug
@@ -209,6 +210,7 @@ class ResourceSharer(object):
self._lock = threading.Lock()
self._listener = None
self._address = None
+ self._thread = None
register_after_fork(self, ResourceSharer._afterfork)
def register(self, send, close):
@@ -227,6 +229,24 @@ class ResourceSharer(object):
c.send((key, os.getpid()))
return c
+ def stop(self, timeout=None):
+ from .connection import Client
+ with self._lock:
+ if self._address is not None:
+ c = Client(self._address, authkey=current_process().authkey)
+ c.send(None)
+ c.close()
+ self._thread.join(timeout)
+ if self._thread.is_alive():
+ sub_warn('ResourceSharer thread did not stop when asked')
+ self._listener.close()
+ self._thread = None
+ self._address = None
+ self._listener = None
+ for key, (send, close) in self._cache.items():
+ close()
+ self._cache.clear()
+
def _afterfork(self):
for key, (send, close) in self._cache.items():
close()
@@ -239,6 +259,7 @@ class ResourceSharer(object):
self._listener.close()
self._listener = None
self._address = None
+ self._thread = None
def _start(self):
from .connection import Listener
@@ -249,12 +270,18 @@ class ResourceSharer(object):
t = threading.Thread(target=self._serve)
t.daemon = True
t.start()
+ self._thread = t
def _serve(self):
+ if hasattr(signal, 'pthread_sigmask'):
+ signal.pthread_sigmask(signal.SIG_BLOCK, range(1, signal.NSIG))
while 1:
try:
conn = self._listener.accept()
- key, destination_pid = conn.recv()
+ msg = conn.recv()
+ if msg is None:
+ break
+ key, destination_pid = msg
send, close = self._cache.pop(key)
send(conn, destination_pid)
close()
diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py
index 4c22aef..799be70 100644
--- a/Lib/test/test_multiprocessing.py
+++ b/Lib/test/test_multiprocessing.py
@@ -1966,6 +1966,11 @@ class _TestPicklingConnections(BaseTestCase):
ALLOWED_TYPES = ('processes',)
@classmethod
+ def tearDownClass(cls):
+ from multiprocessing.reduction import resource_sharer
+ resource_sharer.stop(timeout=5)
+
+ @classmethod
def _listener(cls, conn, families):
for fam in families:
l = cls.connection.Listener(family=fam)