diff options
author | Pierre Glaser <pierreglaser@msn.com> | 2019-05-10 20:59:08 (GMT) |
---|---|---|
committer | Antoine Pitrou <antoine@python.org> | 2019-05-10 20:59:08 (GMT) |
commit | f22cc69b012f52882d434a5c44a004bc3aa5c33c (patch) | |
tree | 03dc6cf6c5449ab30ac1243140b6b8f3b2359089 /Lib/multiprocessing | |
parent | d0d64ad1f5f1dc1630004091d7f8209546c1220a (diff) | |
download | cpython-f22cc69b012f52882d434a5c44a004bc3aa5c33c.zip cpython-f22cc69b012f52882d434a5c44a004bc3aa5c33c.tar.gz cpython-f22cc69b012f52882d434a5c44a004bc3aa5c33c.tar.bz2 |
bpo-36867: Make semaphore_tracker track other system resources (GH-13222)
The multiprocessing.resource_tracker replaces the multiprocessing.semaphore_tracker module. Other than semaphores, resource_tracker also tracks shared_memory segments. Patch by Pierre Glaser.
Diffstat (limited to 'Lib/multiprocessing')
-rw-r--r-- | Lib/multiprocessing/forkserver.py | 8 | ||||
-rw-r--r-- | Lib/multiprocessing/popen_spawn_posix.py | 4 | ||||
-rw-r--r-- | Lib/multiprocessing/resource_tracker.py (renamed from Lib/multiprocessing/semaphore_tracker.py) | 133 | ||||
-rw-r--r-- | Lib/multiprocessing/shared_memory.py | 5 | ||||
-rw-r--r-- | Lib/multiprocessing/spawn.py | 4 | ||||
-rw-r--r-- | Lib/multiprocessing/synchronize.py | 8 |
6 files changed, 92 insertions, 70 deletions
diff --git a/Lib/multiprocessing/forkserver.py b/Lib/multiprocessing/forkserver.py index 040b46e..dabf7bc 100644 --- a/Lib/multiprocessing/forkserver.py +++ b/Lib/multiprocessing/forkserver.py @@ -11,7 +11,7 @@ import warnings from . import connection from . import process from .context import reduction -from . import semaphore_tracker +from . import resource_tracker from . import spawn from . import util @@ -69,7 +69,7 @@ class ForkServer(object): parent_r, child_w = os.pipe() child_r, parent_w = os.pipe() allfds = [child_r, child_w, self._forkserver_alive_fd, - semaphore_tracker.getfd()] + resource_tracker.getfd()] allfds += fds try: reduction.sendfds(client, allfds) @@ -90,7 +90,7 @@ class ForkServer(object): ensure_running() will do nothing. ''' with self._lock: - semaphore_tracker.ensure_running() + resource_tracker.ensure_running() if self._forkserver_pid is not None: # forkserver was launched before, is it still running? pid, status = os.waitpid(self._forkserver_pid, os.WNOHANG) @@ -290,7 +290,7 @@ def _serve_one(child_r, fds, unused_fds, handlers): os.close(fd) (_forkserver._forkserver_alive_fd, - semaphore_tracker._semaphore_tracker._fd, + resource_tracker._resource_tracker._fd, *_forkserver._inherited_fds) = fds # Run process object received over pipe diff --git a/Lib/multiprocessing/popen_spawn_posix.py b/Lib/multiprocessing/popen_spawn_posix.py index 3815106..59f8e45 100644 --- a/Lib/multiprocessing/popen_spawn_posix.py +++ b/Lib/multiprocessing/popen_spawn_posix.py @@ -36,8 +36,8 @@ class Popen(popen_fork.Popen): return fd def _launch(self, process_obj): - from . import semaphore_tracker - tracker_fd = semaphore_tracker.getfd() + from . import resource_tracker + tracker_fd = resource_tracker.getfd() self._fds.append(tracker_fd) prep_data = spawn.get_preparation_data(process_obj._name) fp = io.BytesIO() diff --git a/Lib/multiprocessing/semaphore_tracker.py b/Lib/multiprocessing/resource_tracker.py index 3c2c3ad..e67e0b2 100644 --- a/Lib/multiprocessing/semaphore_tracker.py +++ b/Lib/multiprocessing/resource_tracker.py @@ -1,15 +1,19 @@ +############################################################################### +# Server process to keep track of unlinked resources (like shared memory +# segments, semaphores etc.) and clean them. # # On Unix we run a server process which keeps track of unlinked -# semaphores. The server ignores SIGINT and SIGTERM and reads from a +# resources. The server ignores SIGINT and SIGTERM and reads from a # pipe. Every other process of the program has a copy of the writable # end of the pipe, so we get EOF when all other processes have exited. -# Then the server process unlinks any remaining semaphore names. -# -# This is important because the system only supports a limited number -# of named semaphores, and they will not be automatically removed till -# the next reboot. Without this semaphore tracker process, "killall -# python" would probably leave unlinked semaphores. +# Then the server process unlinks any remaining resource names. # +# This is important because there may be system limits for such resources: for +# instance, the system only supports a limited number of named semaphores, and +# shared-memory segments live in the RAM. If a python process leaks such a +# resource, this resource will not be removed till the next reboot. Without +# this resource tracker process, "killall python" would probably leave unlinked +# resources. import os import signal @@ -17,6 +21,7 @@ import sys import threading import warnings import _multiprocessing +import _posixshmem from . import spawn from . import util @@ -26,8 +31,14 @@ __all__ = ['ensure_running', 'register', 'unregister'] _HAVE_SIGMASK = hasattr(signal, 'pthread_sigmask') _IGNORED_SIGNALS = (signal.SIGINT, signal.SIGTERM) +_CLEANUP_FUNCS = { + 'noop': lambda: None, + 'semaphore': _multiprocessing.sem_unlink, + 'shared_memory': _posixshmem.shm_unlink +} + -class SemaphoreTracker(object): +class ResourceTracker(object): def __init__(self): self._lock = threading.Lock() @@ -39,13 +50,13 @@ class SemaphoreTracker(object): return self._fd def ensure_running(self): - '''Make sure that semaphore tracker process is running. + '''Make sure that resource tracker process is running. This can be run from any process. Usually a child process will use - the semaphore created by its parent.''' + the resource created by its parent.''' with self._lock: if self._fd is not None: - # semaphore tracker was launched before, is it still running? + # resource tracker was launched before, is it still running? if self._check_alive(): # => still alive return @@ -55,24 +66,24 @@ class SemaphoreTracker(object): # Clean-up to avoid dangling processes. try: # _pid can be None if this process is a child from another - # python process, which has started the semaphore_tracker. + # python process, which has started the resource_tracker. if self._pid is not None: os.waitpid(self._pid, 0) except ChildProcessError: - # The semaphore_tracker has already been terminated. + # The resource_tracker has already been terminated. pass self._fd = None self._pid = None - warnings.warn('semaphore_tracker: process died unexpectedly, ' - 'relaunching. Some semaphores might leak.') + warnings.warn('resource_tracker: process died unexpectedly, ' + 'relaunching. Some resources might leak.') fds_to_pass = [] try: fds_to_pass.append(sys.stderr.fileno()) except Exception: pass - cmd = 'from multiprocessing.semaphore_tracker import main;main(%d)' + cmd = 'from multiprocessing.resource_tracker import main;main(%d)' r, w = os.pipe() try: fds_to_pass.append(r) @@ -107,23 +118,23 @@ class SemaphoreTracker(object): try: # We cannot use send here as it calls ensure_running, creating # a cycle. - os.write(self._fd, b'PROBE:0\n') + os.write(self._fd, b'PROBE:0:noop\n') except OSError: return False else: return True - def register(self, name): - '''Register name of semaphore with semaphore tracker.''' - self._send('REGISTER', name) + def register(self, name, rtype): + '''Register name of resource with resource tracker.''' + self._send('REGISTER', name, rtype) - def unregister(self, name): - '''Unregister name of semaphore with semaphore tracker.''' - self._send('UNREGISTER', name) + def unregister(self, name, rtype): + '''Unregister name of resource with resource tracker.''' + self._send('UNREGISTER', name, rtype) - def _send(self, cmd, name): + def _send(self, cmd, name, rtype): self.ensure_running() - msg = '{0}:{1}\n'.format(cmd, name).encode('ascii') + msg = '{0}:{1}:{2}\n'.format(cmd, name, rtype).encode('ascii') if len(name) > 512: # posix guarantees that writes to a pipe of less than PIPE_BUF # bytes are atomic, and that PIPE_BUF >= 512 @@ -133,14 +144,14 @@ class SemaphoreTracker(object): nbytes, len(msg)) -_semaphore_tracker = SemaphoreTracker() -ensure_running = _semaphore_tracker.ensure_running -register = _semaphore_tracker.register -unregister = _semaphore_tracker.unregister -getfd = _semaphore_tracker.getfd +_resource_tracker = ResourceTracker() +ensure_running = _resource_tracker.ensure_running +register = _resource_tracker.register +unregister = _resource_tracker.unregister +getfd = _resource_tracker.getfd def main(fd): - '''Run semaphore tracker.''' + '''Run resource tracker.''' # protect the process from ^C and "killall python" etc signal.signal(signal.SIGINT, signal.SIG_IGN) signal.signal(signal.SIGTERM, signal.SIG_IGN) @@ -153,18 +164,24 @@ def main(fd): except Exception: pass - cache = set() + cache = {rtype: set() for rtype in _CLEANUP_FUNCS.keys()} try: - # keep track of registered/unregistered semaphores + # keep track of registered/unregistered resources with open(fd, 'rb') as f: for line in f: try: - cmd, name = line.strip().split(b':') - if cmd == b'REGISTER': - cache.add(name) - elif cmd == b'UNREGISTER': - cache.remove(name) - elif cmd == b'PROBE': + cmd, name, rtype = line.strip().decode('ascii').split(':') + cleanup_func = _CLEANUP_FUNCS.get(rtype, None) + if cleanup_func is None: + raise ValueError( + f'Cannot register {name} for automatic cleanup: ' + f'unknown resource type {rtype}') + + if cmd == 'REGISTER': + cache[rtype].add(name) + elif cmd == 'UNREGISTER': + cache[rtype].remove(name) + elif cmd == 'PROBE': pass else: raise RuntimeError('unrecognized command %r' % cmd) @@ -174,23 +191,23 @@ def main(fd): except: pass finally: - # all processes have terminated; cleanup any remaining semaphores - if cache: - try: - warnings.warn('semaphore_tracker: There appear to be %d ' - 'leaked semaphores to clean up at shutdown' % - len(cache)) - except Exception: - pass - for name in cache: - # For some reason the process which created and registered this - # semaphore has failed to unregister it. Presumably it has died. - # We therefore unlink it. - try: - name = name.decode('ascii') + # all processes have terminated; cleanup any remaining resources + for rtype, rtype_cache in cache.items(): + if rtype_cache: try: - _multiprocessing.sem_unlink(name) - except Exception as e: - warnings.warn('semaphore_tracker: %r: %s' % (name, e)) - finally: - pass + warnings.warn('resource_tracker: There appear to be %d ' + 'leaked %s objects to clean up at shutdown' % + (len(rtype_cache), rtype)) + except Exception: + pass + for name in rtype_cache: + # For some reason the process which created and registered this + # resource has failed to unregister it. Presumably it has + # died. We therefore unlink it. + try: + try: + _CLEANUP_FUNCS[rtype](name) + except Exception as e: + warnings.warn('resource_tracker: %r: %s' % (name, e)) + finally: + pass diff --git a/Lib/multiprocessing/shared_memory.py b/Lib/multiprocessing/shared_memory.py index ebc8885..184e367 100644 --- a/Lib/multiprocessing/shared_memory.py +++ b/Lib/multiprocessing/shared_memory.py @@ -113,6 +113,9 @@ class SharedMemory: self.unlink() raise + from .resource_tracker import register + register(self._name, "shared_memory") + else: # Windows Named Shared Memory @@ -231,7 +234,9 @@ class SharedMemory: called once (and only once) across all processes which have access to the shared memory block.""" if _USE_POSIX and self._name: + from .resource_tracker import unregister _posixshmem.shm_unlink(self._name) + unregister(self._name, "shared_memory") _encoding = "utf8" diff --git a/Lib/multiprocessing/spawn.py b/Lib/multiprocessing/spawn.py index 6759351..f66b5aa 100644 --- a/Lib/multiprocessing/spawn.py +++ b/Lib/multiprocessing/spawn.py @@ -111,8 +111,8 @@ def spawn_main(pipe_handle, parent_pid=None, tracker_fd=None): _winapi.CloseHandle(source_process) fd = msvcrt.open_osfhandle(new_handle, os.O_RDONLY) else: - from . import semaphore_tracker - semaphore_tracker._semaphore_tracker._fd = tracker_fd + from . import resource_tracker + resource_tracker._resource_tracker._fd = tracker_fd fd = pipe_handle exitcode = _main(fd) sys.exit(exitcode) diff --git a/Lib/multiprocessing/synchronize.py b/Lib/multiprocessing/synchronize.py index 5137c49..4fcbefc 100644 --- a/Lib/multiprocessing/synchronize.py +++ b/Lib/multiprocessing/synchronize.py @@ -76,16 +76,16 @@ class SemLock(object): # We only get here if we are on Unix with forking # disabled. When the object is garbage collected or the # process shuts down we unlink the semaphore name - from .semaphore_tracker import register - register(self._semlock.name) + from .resource_tracker import register + register(self._semlock.name, "semaphore") util.Finalize(self, SemLock._cleanup, (self._semlock.name,), exitpriority=0) @staticmethod def _cleanup(name): - from .semaphore_tracker import unregister + from .resource_tracker import unregister sem_unlink(name) - unregister(name) + unregister(name, "semaphore") def _make_methods(self): self.acquire = self._semlock.acquire |