summaryrefslogtreecommitdiffstats
path: root/Lib/multiprocessing/semaphore_tracker.py
diff options
context:
space:
mode:
authorRichard Oudkerk <shibturn@gmail.com>2013-08-14 14:35:41 (GMT)
committerRichard Oudkerk <shibturn@gmail.com>2013-08-14 14:35:41 (GMT)
commit84ed9a68bd9a13252b376b21a9167dabae254325 (patch)
treeec8daa39fcf64b658bddf52f56ae47c0bdc2b091 /Lib/multiprocessing/semaphore_tracker.py
parentd06eeb4a2492b59d34ab69a2046dcae1f10ec593 (diff)
downloadcpython-84ed9a68bd9a13252b376b21a9167dabae254325.zip
cpython-84ed9a68bd9a13252b376b21a9167dabae254325.tar.gz
cpython-84ed9a68bd9a13252b376b21a9167dabae254325.tar.bz2
Issue #8713: Support alternative start methods in multiprocessing on Unix.
See http://hg.python.org/sandbox/sbt#spawn
Diffstat (limited to 'Lib/multiprocessing/semaphore_tracker.py')
-rw-r--r--Lib/multiprocessing/semaphore_tracker.py135
1 files changed, 135 insertions, 0 deletions
diff --git a/Lib/multiprocessing/semaphore_tracker.py b/Lib/multiprocessing/semaphore_tracker.py
new file mode 100644
index 0000000..4a2d636
--- /dev/null
+++ b/Lib/multiprocessing/semaphore_tracker.py
@@ -0,0 +1,135 @@
+#
+# On Unix we run a server process which keeps track of unlinked
+# semaphores. The server ignores SIGINT and SIGTERM and reads from a
+# pipe. Every other process of the program has a copy of the writable
+# end of the pipe, so we get EOF when all other processes have exited.
+# Then the server process unlinks any remaining semaphore names.
+#
+# This is important because the system only supports a limited number
+# of named semaphores, and they will not be automatically removed till
+# the next reboot. Without this semaphore tracker process, "killall
+# python" would probably leave unlinked semaphores.
+#
+
+import errno
+import os
+import signal
+import sys
+import threading
+import warnings
+import _multiprocessing
+
+from . import spawn
+from . import util
+from . import current_process
+
+__all__ = ['ensure_running', 'register', 'unregister']
+
+
+_lock = threading.Lock()
+
+
+def ensure_running():
+ '''Make sure that semaphore tracker process is running.
+
+ This can be run from any process. Usually a child process will use
+ the semaphore created by its parent.'''
+ with _lock:
+ config = current_process()._config
+ if config.get('semaphore_tracker_fd') is not None:
+ return
+ fds_to_pass = []
+ try:
+ fds_to_pass.append(sys.stderr.fileno())
+ except Exception:
+ pass
+ cmd = 'from multiprocessing.semaphore_tracker import main; main(%d)'
+ r, semaphore_tracker_fd = util.pipe()
+ try:
+ fds_to_pass.append(r)
+ # process will out live us, so no need to wait on pid
+ exe = spawn.get_executable()
+ args = [exe] + util._args_from_interpreter_flags()
+ args += ['-c', cmd % r]
+ util.spawnv_passfds(exe, args, fds_to_pass)
+ except:
+ os.close(semaphore_tracker_fd)
+ raise
+ else:
+ config['semaphore_tracker_fd'] = semaphore_tracker_fd
+ finally:
+ os.close(r)
+
+
+def register(name):
+ '''Register name of semaphore with semaphore tracker.'''
+ _send('REGISTER', name)
+
+
+def unregister(name):
+ '''Unregister name of semaphore with semaphore tracker.'''
+ _send('UNREGISTER', name)
+
+
+def _send(cmd, name):
+ msg = '{0}:{1}\n'.format(cmd, name).encode('ascii')
+ if len(name) > 512:
+ # posix guarantees that writes to a pipe of less than PIPE_BUF
+ # bytes are atomic, and that PIPE_BUF >= 512
+ raise ValueError('name too long')
+ fd = current_process()._config['semaphore_tracker_fd']
+ nbytes = os.write(fd, msg)
+ assert nbytes == len(msg)
+
+
+def main(fd):
+ '''Run semaphore tracker.'''
+ # protect the process from ^C and "killall python" etc
+ signal.signal(signal.SIGINT, signal.SIG_IGN)
+ signal.signal(signal.SIGTERM, signal.SIG_IGN)
+
+ for f in (sys.stdin, sys.stdout):
+ try:
+ f.close()
+ except Exception:
+ pass
+
+ cache = set()
+ try:
+ # keep track of registered/unregistered semaphores
+ with open(fd, 'rb') as f:
+ for line in f:
+ try:
+ cmd, name = line.strip().split(b':')
+ if cmd == b'REGISTER':
+ cache.add(name)
+ elif cmd == b'UNREGISTER':
+ cache.remove(name)
+ else:
+ raise RuntimeError('unrecognized command %r' % cmd)
+ except Exception:
+ try:
+ sys.excepthook(*sys.exc_info())
+ except:
+ pass
+ finally:
+ # all processes have terminated; cleanup any remaining semaphores
+ if cache:
+ try:
+ warnings.warn('semaphore_tracker: There appear to be %d '
+ 'leaked semaphores to clean up at shutdown' %
+ len(cache))
+ except Exception:
+ pass
+ for name in cache:
+ # For some reason the process which created and registered this
+ # semaphore has failed to unregister it. Presumably it has died.
+ # We therefore unlink it.
+ try:
+ name = name.decode('ascii')
+ try:
+ _multiprocessing.sem_unlink(name)
+ except Exception as e:
+ warnings.warn('semaphore_tracker: %r: %s' % (name, e))
+ finally:
+ pass