diff options
-rw-r--r-- | Lib/multiprocessing/forkserver.py | 8 | ||||
-rw-r--r-- | Lib/multiprocessing/popen_spawn_posix.py | 4 | ||||
-rw-r--r-- | Lib/multiprocessing/resource_tracker.py (renamed from Lib/multiprocessing/semaphore_tracker.py) | 133 | ||||
-rw-r--r-- | Lib/multiprocessing/shared_memory.py | 5 | ||||
-rw-r--r-- | Lib/multiprocessing/spawn.py | 4 | ||||
-rw-r--r-- | Lib/multiprocessing/synchronize.py | 8 | ||||
-rw-r--r-- | Lib/test/_test_multiprocessing.py | 170 | ||||
-rw-r--r-- | Misc/NEWS.d/next/Library/2019-05-09-18-12-55.bpo-36867.FuwVTi.rst | 1 | ||||
-rw-r--r-- | PCbuild/lib.pyproj | 2 |
9 files changed, 210 insertions, 125 deletions
diff --git a/Lib/multiprocessing/forkserver.py b/Lib/multiprocessing/forkserver.py index 040b46e..dabf7bc 100644 --- a/Lib/multiprocessing/forkserver.py +++ b/Lib/multiprocessing/forkserver.py @@ -11,7 +11,7 @@ import warnings from . import connection from . import process from .context import reduction -from . import semaphore_tracker +from . import resource_tracker from . import spawn from . import util @@ -69,7 +69,7 @@ class ForkServer(object): parent_r, child_w = os.pipe() child_r, parent_w = os.pipe() allfds = [child_r, child_w, self._forkserver_alive_fd, - semaphore_tracker.getfd()] + resource_tracker.getfd()] allfds += fds try: reduction.sendfds(client, allfds) @@ -90,7 +90,7 @@ class ForkServer(object): ensure_running() will do nothing. ''' with self._lock: - semaphore_tracker.ensure_running() + resource_tracker.ensure_running() if self._forkserver_pid is not None: # forkserver was launched before, is it still running? pid, status = os.waitpid(self._forkserver_pid, os.WNOHANG) @@ -290,7 +290,7 @@ def _serve_one(child_r, fds, unused_fds, handlers): os.close(fd) (_forkserver._forkserver_alive_fd, - semaphore_tracker._semaphore_tracker._fd, + resource_tracker._resource_tracker._fd, *_forkserver._inherited_fds) = fds # Run process object received over pipe diff --git a/Lib/multiprocessing/popen_spawn_posix.py b/Lib/multiprocessing/popen_spawn_posix.py index 3815106..59f8e45 100644 --- a/Lib/multiprocessing/popen_spawn_posix.py +++ b/Lib/multiprocessing/popen_spawn_posix.py @@ -36,8 +36,8 @@ class Popen(popen_fork.Popen): return fd def _launch(self, process_obj): - from . import semaphore_tracker - tracker_fd = semaphore_tracker.getfd() + from . import resource_tracker + tracker_fd = resource_tracker.getfd() self._fds.append(tracker_fd) prep_data = spawn.get_preparation_data(process_obj._name) fp = io.BytesIO() diff --git a/Lib/multiprocessing/semaphore_tracker.py b/Lib/multiprocessing/resource_tracker.py index 3c2c3ad..e67e0b2 100644 --- a/Lib/multiprocessing/semaphore_tracker.py +++ b/Lib/multiprocessing/resource_tracker.py @@ -1,15 +1,19 @@ +############################################################################### +# Server process to keep track of unlinked resources (like shared memory +# segments, semaphores etc.) and clean them. # # On Unix we run a server process which keeps track of unlinked -# semaphores. The server ignores SIGINT and SIGTERM and reads from a +# resources. The server ignores SIGINT and SIGTERM and reads from a # pipe. Every other process of the program has a copy of the writable # end of the pipe, so we get EOF when all other processes have exited. -# Then the server process unlinks any remaining semaphore names. -# -# This is important because the system only supports a limited number -# of named semaphores, and they will not be automatically removed till -# the next reboot. Without this semaphore tracker process, "killall -# python" would probably leave unlinked semaphores. +# Then the server process unlinks any remaining resource names. # +# This is important because there may be system limits for such resources: for +# instance, the system only supports a limited number of named semaphores, and +# shared-memory segments live in the RAM. If a python process leaks such a +# resource, this resource will not be removed till the next reboot. Without +# this resource tracker process, "killall python" would probably leave unlinked +# resources. import os import signal @@ -17,6 +21,7 @@ import sys import threading import warnings import _multiprocessing +import _posixshmem from . import spawn from . import util @@ -26,8 +31,14 @@ __all__ = ['ensure_running', 'register', 'unregister'] _HAVE_SIGMASK = hasattr(signal, 'pthread_sigmask') _IGNORED_SIGNALS = (signal.SIGINT, signal.SIGTERM) +_CLEANUP_FUNCS = { + 'noop': lambda: None, + 'semaphore': _multiprocessing.sem_unlink, + 'shared_memory': _posixshmem.shm_unlink +} + -class SemaphoreTracker(object): +class ResourceTracker(object): def __init__(self): self._lock = threading.Lock() @@ -39,13 +50,13 @@ class SemaphoreTracker(object): return self._fd def ensure_running(self): - '''Make sure that semaphore tracker process is running. + '''Make sure that resource tracker process is running. This can be run from any process. Usually a child process will use - the semaphore created by its parent.''' + the resource created by its parent.''' with self._lock: if self._fd is not None: - # semaphore tracker was launched before, is it still running? + # resource tracker was launched before, is it still running? if self._check_alive(): # => still alive return @@ -55,24 +66,24 @@ class SemaphoreTracker(object): # Clean-up to avoid dangling processes. try: # _pid can be None if this process is a child from another - # python process, which has started the semaphore_tracker. + # python process, which has started the resource_tracker. if self._pid is not None: os.waitpid(self._pid, 0) except ChildProcessError: - # The semaphore_tracker has already been terminated. + # The resource_tracker has already been terminated. pass self._fd = None self._pid = None - warnings.warn('semaphore_tracker: process died unexpectedly, ' - 'relaunching. Some semaphores might leak.') + warnings.warn('resource_tracker: process died unexpectedly, ' + 'relaunching. Some resources might leak.') fds_to_pass = [] try: fds_to_pass.append(sys.stderr.fileno()) except Exception: pass - cmd = 'from multiprocessing.semaphore_tracker import main;main(%d)' + cmd = 'from multiprocessing.resource_tracker import main;main(%d)' r, w = os.pipe() try: fds_to_pass.append(r) @@ -107,23 +118,23 @@ class SemaphoreTracker(object): try: # We cannot use send here as it calls ensure_running, creating # a cycle. - os.write(self._fd, b'PROBE:0\n') + os.write(self._fd, b'PROBE:0:noop\n') except OSError: return False else: return True - def register(self, name): - '''Register name of semaphore with semaphore tracker.''' - self._send('REGISTER', name) + def register(self, name, rtype): + '''Register name of resource with resource tracker.''' + self._send('REGISTER', name, rtype) - def unregister(self, name): - '''Unregister name of semaphore with semaphore tracker.''' - self._send('UNREGISTER', name) + def unregister(self, name, rtype): + '''Unregister name of resource with resource tracker.''' + self._send('UNREGISTER', name, rtype) - def _send(self, cmd, name): + def _send(self, cmd, name, rtype): self.ensure_running() - msg = '{0}:{1}\n'.format(cmd, name).encode('ascii') + msg = '{0}:{1}:{2}\n'.format(cmd, name, rtype).encode('ascii') if len(name) > 512: # posix guarantees that writes to a pipe of less than PIPE_BUF # bytes are atomic, and that PIPE_BUF >= 512 @@ -133,14 +144,14 @@ class SemaphoreTracker(object): nbytes, len(msg)) -_semaphore_tracker = SemaphoreTracker() -ensure_running = _semaphore_tracker.ensure_running -register = _semaphore_tracker.register -unregister = _semaphore_tracker.unregister -getfd = _semaphore_tracker.getfd +_resource_tracker = ResourceTracker() +ensure_running = _resource_tracker.ensure_running +register = _resource_tracker.register +unregister = _resource_tracker.unregister +getfd = _resource_tracker.getfd def main(fd): - '''Run semaphore tracker.''' + '''Run resource tracker.''' # protect the process from ^C and "killall python" etc signal.signal(signal.SIGINT, signal.SIG_IGN) signal.signal(signal.SIGTERM, signal.SIG_IGN) @@ -153,18 +164,24 @@ def main(fd): except Exception: pass - cache = set() + cache = {rtype: set() for rtype in _CLEANUP_FUNCS.keys()} try: - # keep track of registered/unregistered semaphores + # keep track of registered/unregistered resources with open(fd, 'rb') as f: for line in f: try: - cmd, name = line.strip().split(b':') - if cmd == b'REGISTER': - cache.add(name) - elif cmd == b'UNREGISTER': - cache.remove(name) - elif cmd == b'PROBE': + cmd, name, rtype = line.strip().decode('ascii').split(':') + cleanup_func = _CLEANUP_FUNCS.get(rtype, None) + if cleanup_func is None: + raise ValueError( + f'Cannot register {name} for automatic cleanup: ' + f'unknown resource type {rtype}') + + if cmd == 'REGISTER': + cache[rtype].add(name) + elif cmd == 'UNREGISTER': + cache[rtype].remove(name) + elif cmd == 'PROBE': pass else: raise RuntimeError('unrecognized command %r' % cmd) @@ -174,23 +191,23 @@ def main(fd): except: pass finally: - # all processes have terminated; cleanup any remaining semaphores - if cache: - try: - warnings.warn('semaphore_tracker: There appear to be %d ' - 'leaked semaphores to clean up at shutdown' % - len(cache)) - except Exception: - pass - for name in cache: - # For some reason the process which created and registered this - # semaphore has failed to unregister it. Presumably it has died. - # We therefore unlink it. - try: - name = name.decode('ascii') + # all processes have terminated; cleanup any remaining resources + for rtype, rtype_cache in cache.items(): + if rtype_cache: try: - _multiprocessing.sem_unlink(name) - except Exception as e: - warnings.warn('semaphore_tracker: %r: %s' % (name, e)) - finally: - pass + warnings.warn('resource_tracker: There appear to be %d ' + 'leaked %s objects to clean up at shutdown' % + (len(rtype_cache), rtype)) + except Exception: + pass + for name in rtype_cache: + # For some reason the process which created and registered this + # resource has failed to unregister it. Presumably it has + # died. We therefore unlink it. + try: + try: + _CLEANUP_FUNCS[rtype](name) + except Exception as e: + warnings.warn('resource_tracker: %r: %s' % (name, e)) + finally: + pass diff --git a/Lib/multiprocessing/shared_memory.py b/Lib/multiprocessing/shared_memory.py index ebc8885..184e367 100644 --- a/Lib/multiprocessing/shared_memory.py +++ b/Lib/multiprocessing/shared_memory.py @@ -113,6 +113,9 @@ class SharedMemory: self.unlink() raise + from .resource_tracker import register + register(self._name, "shared_memory") + else: # Windows Named Shared Memory @@ -231,7 +234,9 @@ class SharedMemory: called once (and only once) across all processes which have access to the shared memory block.""" if _USE_POSIX and self._name: + from .resource_tracker import unregister _posixshmem.shm_unlink(self._name) + unregister(self._name, "shared_memory") _encoding = "utf8" diff --git a/Lib/multiprocessing/spawn.py b/Lib/multiprocessing/spawn.py index 6759351..f66b5aa 100644 --- a/Lib/multiprocessing/spawn.py +++ b/Lib/multiprocessing/spawn.py @@ -111,8 +111,8 @@ def spawn_main(pipe_handle, parent_pid=None, tracker_fd=None): _winapi.CloseHandle(source_process) fd = msvcrt.open_osfhandle(new_handle, os.O_RDONLY) else: - from . import semaphore_tracker - semaphore_tracker._semaphore_tracker._fd = tracker_fd + from . import resource_tracker + resource_tracker._resource_tracker._fd = tracker_fd fd = pipe_handle exitcode = _main(fd) sys.exit(exitcode) diff --git a/Lib/multiprocessing/synchronize.py b/Lib/multiprocessing/synchronize.py index 5137c49..4fcbefc 100644 --- a/Lib/multiprocessing/synchronize.py +++ b/Lib/multiprocessing/synchronize.py @@ -76,16 +76,16 @@ class SemLock(object): # We only get here if we are on Unix with forking # disabled. When the object is garbage collected or the # process shuts down we unlink the semaphore name - from .semaphore_tracker import register - register(self._semlock.name) + from .resource_tracker import register + register(self._semlock.name, "semaphore") util.Finalize(self, SemLock._cleanup, (self._semlock.name,), exitpriority=0) @staticmethod def _cleanup(name): - from .semaphore_tracker import unregister + from .resource_tracker import unregister sem_unlink(name) - unregister(name) + unregister(name, "semaphore") def _make_methods(self): self.acquire = self._semlock.acquire diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index d97e423..a50293c 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -88,6 +88,13 @@ def join_process(process): support.join_thread(process, timeout=TIMEOUT) +if os.name == "posix": + from multiprocessing import resource_tracker + + def _resource_unlink(name, rtype): + resource_tracker._CLEANUP_FUNCS[rtype](name) + + # # Constants # @@ -3896,6 +3903,32 @@ class _TestSharedMemory(BaseTestCase): deserialized_sl.shm.close() sl.shm.close() + def test_shared_memory_cleaned_after_process_termination(self): + import subprocess + from multiprocessing import shared_memory + cmd = '''if 1: + import os, time, sys + from multiprocessing import shared_memory + + # Create a shared_memory segment, and send the segment name + sm = shared_memory.SharedMemory(create=True, size=10) + sys.stdout.write(sm._name + '\\n') + sys.stdout.flush() + time.sleep(100) + ''' + p = subprocess.Popen([sys.executable, '-E', '-c', cmd], + stdout=subprocess.PIPE) + name = p.stdout.readline().strip().decode() + + # killing abruptly processes holding reference to a shared memory + # segment should not leak the given memory segment. + p.terminate() + p.wait() + time.sleep(1.0) # wait for the OS to collect the segment + + with self.assertRaises(FileNotFoundError): + smm = shared_memory.SharedMemory(name, create=False) + # # # @@ -4827,57 +4860,86 @@ class TestStartMethod(unittest.TestCase): @unittest.skipIf(sys.platform == "win32", "test semantics don't make sense on Windows") -class TestSemaphoreTracker(unittest.TestCase): +class TestResourceTracker(unittest.TestCase): - def test_semaphore_tracker(self): + def test_resource_tracker(self): # # Check that killing process does not leak named semaphores # import subprocess cmd = '''if 1: - import multiprocessing as mp, time, os + import time, os, tempfile + import multiprocessing as mp + from multiprocessing import resource_tracker + from multiprocessing.shared_memory import SharedMemory + mp.set_start_method("spawn") - lock1 = mp.Lock() - lock2 = mp.Lock() - os.write(%d, lock1._semlock.name.encode("ascii") + b"\\n") - os.write(%d, lock2._semlock.name.encode("ascii") + b"\\n") + rand = tempfile._RandomNameSequence() + + + def create_and_register_resource(rtype): + if rtype == "semaphore": + lock = mp.Lock() + return lock, lock._semlock.name + elif rtype == "shared_memory": + sm = SharedMemory(create=True, size=10) + return sm, sm._name + else: + raise ValueError( + "Resource type {{}} not understood".format(rtype)) + + + resource1, rname1 = create_and_register_resource("{rtype}") + resource2, rname2 = create_and_register_resource("{rtype}") + + os.write({w}, rname1.encode("ascii") + b"\\n") + os.write({w}, rname2.encode("ascii") + b"\\n") + time.sleep(10) ''' - r, w = os.pipe() - p = subprocess.Popen([sys.executable, - '-E', '-c', cmd % (w, w)], - pass_fds=[w], - stderr=subprocess.PIPE) - os.close(w) - with open(r, 'rb', closefd=True) as f: - name1 = f.readline().rstrip().decode('ascii') - name2 = f.readline().rstrip().decode('ascii') - _multiprocessing.sem_unlink(name1) - p.terminate() - p.wait() - time.sleep(2.0) - with self.assertRaises(OSError) as ctx: - _multiprocessing.sem_unlink(name2) - # docs say it should be ENOENT, but OSX seems to give EINVAL - self.assertIn(ctx.exception.errno, (errno.ENOENT, errno.EINVAL)) - err = p.stderr.read().decode('utf-8') - p.stderr.close() - expected = 'semaphore_tracker: There appear to be 2 leaked semaphores' - self.assertRegex(err, expected) - self.assertRegex(err, r'semaphore_tracker: %r: \[Errno' % name1) - - def check_semaphore_tracker_death(self, signum, should_die): + for rtype in resource_tracker._CLEANUP_FUNCS: + with self.subTest(rtype=rtype): + if rtype == "noop": + # Artefact resource type used by the resource_tracker + continue + r, w = os.pipe() + p = subprocess.Popen([sys.executable, + '-E', '-c', cmd.format(w=w, rtype=rtype)], + pass_fds=[w], + stderr=subprocess.PIPE) + os.close(w) + with open(r, 'rb', closefd=True) as f: + name1 = f.readline().rstrip().decode('ascii') + name2 = f.readline().rstrip().decode('ascii') + _resource_unlink(name1, rtype) + p.terminate() + p.wait() + time.sleep(2.0) + with self.assertRaises(OSError) as ctx: + _resource_unlink(name2, rtype) + # docs say it should be ENOENT, but OSX seems to give EINVAL + self.assertIn( + ctx.exception.errno, (errno.ENOENT, errno.EINVAL)) + err = p.stderr.read().decode('utf-8') + p.stderr.close() + expected = ('resource_tracker: There appear to be 2 leaked {} ' + 'objects'.format( + rtype)) + self.assertRegex(err, expected) + self.assertRegex(err, r'resource_tracker: %r: \[Errno' % name1) + + def check_resource_tracker_death(self, signum, should_die): # bpo-31310: if the semaphore tracker process has died, it should # be restarted implicitly. - from multiprocessing.semaphore_tracker import _semaphore_tracker - pid = _semaphore_tracker._pid + from multiprocessing.resource_tracker import _resource_tracker + pid = _resource_tracker._pid if pid is not None: os.kill(pid, signal.SIGKILL) os.waitpid(pid, 0) with warnings.catch_warnings(): warnings.simplefilter("ignore") - _semaphore_tracker.ensure_running() - pid = _semaphore_tracker._pid + _resource_tracker.ensure_running() + pid = _resource_tracker._pid os.kill(pid, signum) time.sleep(1.0) # give it time to die @@ -4898,50 +4960,50 @@ class TestSemaphoreTracker(unittest.TestCase): self.assertEqual(len(all_warn), 1) the_warn = all_warn[0] self.assertTrue(issubclass(the_warn.category, UserWarning)) - self.assertTrue("semaphore_tracker: process died" + self.assertTrue("resource_tracker: process died" in str(the_warn.message)) else: self.assertEqual(len(all_warn), 0) - def test_semaphore_tracker_sigint(self): + def test_resource_tracker_sigint(self): # Catchable signal (ignored by semaphore tracker) - self.check_semaphore_tracker_death(signal.SIGINT, False) + self.check_resource_tracker_death(signal.SIGINT, False) - def test_semaphore_tracker_sigterm(self): + def test_resource_tracker_sigterm(self): # Catchable signal (ignored by semaphore tracker) - self.check_semaphore_tracker_death(signal.SIGTERM, False) + self.check_resource_tracker_death(signal.SIGTERM, False) - def test_semaphore_tracker_sigkill(self): + def test_resource_tracker_sigkill(self): # Uncatchable signal. - self.check_semaphore_tracker_death(signal.SIGKILL, True) + self.check_resource_tracker_death(signal.SIGKILL, True) @staticmethod - def _is_semaphore_tracker_reused(conn, pid): - from multiprocessing.semaphore_tracker import _semaphore_tracker - _semaphore_tracker.ensure_running() + def _is_resource_tracker_reused(conn, pid): + from multiprocessing.resource_tracker import _resource_tracker + _resource_tracker.ensure_running() # The pid should be None in the child process, expect for the fork # context. It should not be a new value. - reused = _semaphore_tracker._pid in (None, pid) - reused &= _semaphore_tracker._check_alive() + reused = _resource_tracker._pid in (None, pid) + reused &= _resource_tracker._check_alive() conn.send(reused) - def test_semaphore_tracker_reused(self): - from multiprocessing.semaphore_tracker import _semaphore_tracker - _semaphore_tracker.ensure_running() - pid = _semaphore_tracker._pid + def test_resource_tracker_reused(self): + from multiprocessing.resource_tracker import _resource_tracker + _resource_tracker.ensure_running() + pid = _resource_tracker._pid r, w = multiprocessing.Pipe(duplex=False) - p = multiprocessing.Process(target=self._is_semaphore_tracker_reused, + p = multiprocessing.Process(target=self._is_resource_tracker_reused, args=(w, pid)) p.start() - is_semaphore_tracker_reused = r.recv() + is_resource_tracker_reused = r.recv() # Clean up p.join() w.close() r.close() - self.assertTrue(is_semaphore_tracker_reused) + self.assertTrue(is_resource_tracker_reused) class TestSimpleQueue(unittest.TestCase): diff --git a/Misc/NEWS.d/next/Library/2019-05-09-18-12-55.bpo-36867.FuwVTi.rst b/Misc/NEWS.d/next/Library/2019-05-09-18-12-55.bpo-36867.FuwVTi.rst new file mode 100644 index 0000000..5eaf0a0 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2019-05-09-18-12-55.bpo-36867.FuwVTi.rst @@ -0,0 +1 @@ +The multiprocessing.resource_tracker replaces the multiprocessing.semaphore_tracker module. Other than semaphores, resource_tracker also tracks shared_memory segments.
\ No newline at end of file diff --git a/PCbuild/lib.pyproj b/PCbuild/lib.pyproj index ffb95c6..7ed71bd 100644 --- a/PCbuild/lib.pyproj +++ b/PCbuild/lib.pyproj @@ -678,7 +678,7 @@ <Compile Include="multiprocessing\queues.py" /> <Compile Include="multiprocessing\reduction.py" /> <Compile Include="multiprocessing\resource_sharer.py" /> - <Compile Include="multiprocessing\semaphore_tracker.py" /> + <Compile Include="multiprocessing\resource_tracker.py" /> <Compile Include="multiprocessing\sharedctypes.py" /> <Compile Include="multiprocessing\spawn.py" /> <Compile Include="multiprocessing\synchronize.py" /> |