diff options
Diffstat (limited to 'Lib/multiprocessing/semaphore_tracker.py')
-rw-r--r-- | Lib/multiprocessing/semaphore_tracker.py | 135 |
1 files changed, 135 insertions, 0 deletions
diff --git a/Lib/multiprocessing/semaphore_tracker.py b/Lib/multiprocessing/semaphore_tracker.py new file mode 100644 index 0000000..4a2d636 --- /dev/null +++ b/Lib/multiprocessing/semaphore_tracker.py @@ -0,0 +1,135 @@ +# +# On Unix we run a server process which keeps track of unlinked +# semaphores. The server ignores SIGINT and SIGTERM and reads from a +# pipe. Every other process of the program has a copy of the writable +# end of the pipe, so we get EOF when all other processes have exited. +# Then the server process unlinks any remaining semaphore names. +# +# This is important because the system only supports a limited number +# of named semaphores, and they will not be automatically removed till +# the next reboot. Without this semaphore tracker process, "killall +# python" would probably leave unlinked semaphores. +# + +import errno +import os +import signal +import sys +import threading +import warnings +import _multiprocessing + +from . import spawn +from . import util +from . import current_process + +__all__ = ['ensure_running', 'register', 'unregister'] + + +_lock = threading.Lock() + + +def ensure_running(): + '''Make sure that semaphore tracker process is running. + + This can be run from any process. Usually a child process will use + the semaphore created by its parent.''' + with _lock: + config = current_process()._config + if config.get('semaphore_tracker_fd') is not None: + return + fds_to_pass = [] + try: + fds_to_pass.append(sys.stderr.fileno()) + except Exception: + pass + cmd = 'from multiprocessing.semaphore_tracker import main; main(%d)' + r, semaphore_tracker_fd = util.pipe() + try: + fds_to_pass.append(r) + # process will out live us, so no need to wait on pid + exe = spawn.get_executable() + args = [exe] + util._args_from_interpreter_flags() + args += ['-c', cmd % r] + util.spawnv_passfds(exe, args, fds_to_pass) + except: + os.close(semaphore_tracker_fd) + raise + else: + config['semaphore_tracker_fd'] = semaphore_tracker_fd + finally: + os.close(r) + + +def register(name): + '''Register name of semaphore with semaphore tracker.''' + _send('REGISTER', name) + + +def unregister(name): + '''Unregister name of semaphore with semaphore tracker.''' + _send('UNREGISTER', name) + + +def _send(cmd, name): + msg = '{0}:{1}\n'.format(cmd, name).encode('ascii') + if len(name) > 512: + # posix guarantees that writes to a pipe of less than PIPE_BUF + # bytes are atomic, and that PIPE_BUF >= 512 + raise ValueError('name too long') + fd = current_process()._config['semaphore_tracker_fd'] + nbytes = os.write(fd, msg) + assert nbytes == len(msg) + + +def main(fd): + '''Run semaphore tracker.''' + # protect the process from ^C and "killall python" etc + signal.signal(signal.SIGINT, signal.SIG_IGN) + signal.signal(signal.SIGTERM, signal.SIG_IGN) + + for f in (sys.stdin, sys.stdout): + try: + f.close() + except Exception: + pass + + cache = set() + try: + # keep track of registered/unregistered semaphores + with open(fd, 'rb') as f: + for line in f: + try: + cmd, name = line.strip().split(b':') + if cmd == b'REGISTER': + cache.add(name) + elif cmd == b'UNREGISTER': + cache.remove(name) + else: + raise RuntimeError('unrecognized command %r' % cmd) + except Exception: + try: + sys.excepthook(*sys.exc_info()) + except: + pass + finally: + # all processes have terminated; cleanup any remaining semaphores + if cache: + try: + warnings.warn('semaphore_tracker: There appear to be %d ' + 'leaked semaphores to clean up at shutdown' % + len(cache)) + except Exception: + pass + for name in cache: + # For some reason the process which created and registered this + # semaphore has failed to unregister it. Presumably it has died. + # We therefore unlink it. + try: + name = name.decode('ascii') + try: + _multiprocessing.sem_unlink(name) + except Exception as e: + warnings.warn('semaphore_tracker: %r: %s' % (name, e)) + finally: + pass |