summaryrefslogtreecommitdiffstats
path: root/Lib/multiprocessing
diff options
context:
space:
mode:
authorAntoine Pitrou <solipsis@pitrou.net>2012-04-27 21:51:03 (GMT)
committerAntoine Pitrou <solipsis@pitrou.net>2012-04-27 21:51:03 (GMT)
commit92ff4e196bfd5361f231ab8629025d28af1decab (patch)
tree238c48b79e5f733e80ce37ac4d78c90a4910e47a /Lib/multiprocessing
parentd0880d57b053179a8dd91f2b6fbcb5b5ddf56a1d (diff)
downloadcpython-92ff4e196bfd5361f231ab8629025d28af1decab.zip
cpython-92ff4e196bfd5361f231ab8629025d28af1decab.tar.gz
cpython-92ff4e196bfd5361f231ab8629025d28af1decab.tar.bz2
Issue #14666: stop multiprocessing's resource-sharing thread after the tests are done.
Also, block delivery of signals to that thread. Patch by Richard Oudkerk. This will hopefully fix sporadic freezes on the FreeBSD 9.0 buildbot.
Diffstat (limited to 'Lib/multiprocessing')
-rw-r--r--Lib/multiprocessing/reduction.py29
1 files changed, 28 insertions, 1 deletions
diff --git a/Lib/multiprocessing/reduction.py b/Lib/multiprocessing/reduction.py
index ce38fe3..cef445b 100644
--- a/Lib/multiprocessing/reduction.py
+++ b/Lib/multiprocessing/reduction.py
@@ -40,6 +40,7 @@ import sys
import socket
import threading
import struct
+import signal
from multiprocessing import current_process
from multiprocessing.util import register_after_fork, debug, sub_debug
@@ -209,6 +210,7 @@ class ResourceSharer(object):
self._lock = threading.Lock()
self._listener = None
self._address = None
+ self._thread = None
register_after_fork(self, ResourceSharer._afterfork)
def register(self, send, close):
@@ -227,6 +229,24 @@ class ResourceSharer(object):
c.send((key, os.getpid()))
return c
+ def stop(self, timeout=None):
+ from .connection import Client
+ with self._lock:
+ if self._address is not None:
+ c = Client(self._address, authkey=current_process().authkey)
+ c.send(None)
+ c.close()
+ self._thread.join(timeout)
+ if self._thread.is_alive():
+ sub_warn('ResourceSharer thread did not stop when asked')
+ self._listener.close()
+ self._thread = None
+ self._address = None
+ self._listener = None
+ for key, (send, close) in self._cache.items():
+ close()
+ self._cache.clear()
+
def _afterfork(self):
for key, (send, close) in self._cache.items():
close()
@@ -239,6 +259,7 @@ class ResourceSharer(object):
self._listener.close()
self._listener = None
self._address = None
+ self._thread = None
def _start(self):
from .connection import Listener
@@ -249,12 +270,18 @@ class ResourceSharer(object):
t = threading.Thread(target=self._serve)
t.daemon = True
t.start()
+ self._thread = t
def _serve(self):
+ if hasattr(signal, 'pthread_sigmask'):
+ signal.pthread_sigmask(signal.SIG_BLOCK, range(1, signal.NSIG))
while 1:
try:
conn = self._listener.accept()
- key, destination_pid = conn.recv()
+ msg = conn.recv()
+ if msg is None:
+ break
+ key, destination_pid = msg
send, close = self._cache.pop(key)
send(conn, destination_pid)
close()