summaryrefslogtreecommitdiffstats
path: root/Lib/multiprocessing/semaphore_tracker.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/multiprocessing/semaphore_tracker.py')
-rw-r--r--Lib/multiprocessing/semaphore_tracker.py143
1 files changed, 143 insertions, 0 deletions
diff --git a/Lib/multiprocessing/semaphore_tracker.py b/Lib/multiprocessing/semaphore_tracker.py
new file mode 100644
index 0000000..de7738e
--- /dev/null
+++ b/Lib/multiprocessing/semaphore_tracker.py
@@ -0,0 +1,143 @@
+#
+# On Unix we run a server process which keeps track of unlinked
+# semaphores. The server ignores SIGINT and SIGTERM and reads from a
+# pipe. Every other process of the program has a copy of the writable
+# end of the pipe, so we get EOF when all other processes have exited.
+# Then the server process unlinks any remaining semaphore names.
+#
+# This is important because the system only supports a limited number
+# of named semaphores, and they will not be automatically removed till
+# the next reboot. Without this semaphore tracker process, "killall
+# python" would probably leave unlinked semaphores.
+#
+
+import os
+import signal
+import sys
+import threading
+import warnings
+import _multiprocessing
+
+from . import spawn
+from . import util
+
+__all__ = ['ensure_running', 'register', 'unregister']
+
+
+class SemaphoreTracker(object):
+
+ def __init__(self):
+ self._lock = threading.Lock()
+ self._fd = None
+
+ def getfd(self):
+ self.ensure_running()
+ return self._fd
+
+ def ensure_running(self):
+ '''Make sure that semaphore tracker process is running.
+
+ This can be run from any process. Usually a child process will use
+ the semaphore created by its parent.'''
+ with self._lock:
+ if self._fd is not None:
+ return
+ fds_to_pass = []
+ try:
+ fds_to_pass.append(sys.stderr.fileno())
+ except Exception:
+ pass
+ cmd = 'from multiprocessing.semaphore_tracker import main;main(%d)'
+ r, w = os.pipe()
+ try:
+ fds_to_pass.append(r)
+ # process will out live us, so no need to wait on pid
+ exe = spawn.get_executable()
+ args = [exe] + util._args_from_interpreter_flags()
+ args += ['-c', cmd % r]
+ util.spawnv_passfds(exe, args, fds_to_pass)
+ except:
+ os.close(w)
+ raise
+ else:
+ self._fd = w
+ finally:
+ os.close(r)
+
+ def register(self, name):
+ '''Register name of semaphore with semaphore tracker.'''
+ self._send('REGISTER', name)
+
+ def unregister(self, name):
+ '''Unregister name of semaphore with semaphore tracker.'''
+ self._send('UNREGISTER', name)
+
+ def _send(self, cmd, name):
+ self.ensure_running()
+ msg = '{0}:{1}\n'.format(cmd, name).encode('ascii')
+ if len(name) > 512:
+ # posix guarantees that writes to a pipe of less than PIPE_BUF
+ # bytes are atomic, and that PIPE_BUF >= 512
+ raise ValueError('name too long')
+ nbytes = os.write(self._fd, msg)
+ assert nbytes == len(msg)
+
+
+_semaphore_tracker = SemaphoreTracker()
+ensure_running = _semaphore_tracker.ensure_running
+register = _semaphore_tracker.register
+unregister = _semaphore_tracker.unregister
+getfd = _semaphore_tracker.getfd
+
+
+def main(fd):
+ '''Run semaphore tracker.'''
+ # protect the process from ^C and "killall python" etc
+ signal.signal(signal.SIGINT, signal.SIG_IGN)
+ signal.signal(signal.SIGTERM, signal.SIG_IGN)
+
+ for f in (sys.stdin, sys.stdout):
+ try:
+ f.close()
+ except Exception:
+ pass
+
+ cache = set()
+ try:
+ # keep track of registered/unregistered semaphores
+ with open(fd, 'rb') as f:
+ for line in f:
+ try:
+ cmd, name = line.strip().split(b':')
+ if cmd == b'REGISTER':
+ cache.add(name)
+ elif cmd == b'UNREGISTER':
+ cache.remove(name)
+ else:
+ raise RuntimeError('unrecognized command %r' % cmd)
+ except Exception:
+ try:
+ sys.excepthook(*sys.exc_info())
+ except:
+ pass
+ finally:
+ # all processes have terminated; cleanup any remaining semaphores
+ if cache:
+ try:
+ warnings.warn('semaphore_tracker: There appear to be %d '
+ 'leaked semaphores to clean up at shutdown' %
+ len(cache))
+ except Exception:
+ pass
+ for name in cache:
+ # For some reason the process which created and registered this
+ # semaphore has failed to unregister it. Presumably it has died.
+ # We therefore unlink it.
+ try:
+ name = name.decode('ascii')
+ try:
+ _multiprocessing.sem_unlink(name)
+ except Exception as e:
+ warnings.warn('semaphore_tracker: %r: %s' % (name, e))
+ finally:
+ pass