From f22cc69b012f52882d434a5c44a004bc3aa5c33c Mon Sep 17 00:00:00 2001 From: Pierre Glaser Date: Fri, 10 May 2019 22:59:08 +0200 Subject: bpo-36867: Make semaphore_tracker track other system resources (GH-13222) The multiprocessing.resource_tracker replaces the multiprocessing.semaphore_tracker module. Other than semaphores, resource_tracker also tracks shared_memory segments. Patch by Pierre Glaser. --- Lib/multiprocessing/forkserver.py | 8 +- Lib/multiprocessing/popen_spawn_posix.py | 4 +- Lib/multiprocessing/resource_tracker.py | 213 +++++++++++++++++++++ Lib/multiprocessing/semaphore_tracker.py | 196 ------------------- Lib/multiprocessing/shared_memory.py | 5 + Lib/multiprocessing/spawn.py | 4 +- Lib/multiprocessing/synchronize.py | 8 +- Lib/test/_test_multiprocessing.py | 170 ++++++++++------ .../2019-05-09-18-12-55.bpo-36867.FuwVTi.rst | 1 + PCbuild/lib.pyproj | 2 +- 10 files changed, 348 insertions(+), 263 deletions(-) create mode 100644 Lib/multiprocessing/resource_tracker.py delete mode 100644 Lib/multiprocessing/semaphore_tracker.py create mode 100644 Misc/NEWS.d/next/Library/2019-05-09-18-12-55.bpo-36867.FuwVTi.rst diff --git a/Lib/multiprocessing/forkserver.py b/Lib/multiprocessing/forkserver.py index 040b46e..dabf7bc 100644 --- a/Lib/multiprocessing/forkserver.py +++ b/Lib/multiprocessing/forkserver.py @@ -11,7 +11,7 @@ import warnings from . import connection from . import process from .context import reduction -from . import semaphore_tracker +from . import resource_tracker from . import spawn from . import util @@ -69,7 +69,7 @@ class ForkServer(object): parent_r, child_w = os.pipe() child_r, parent_w = os.pipe() allfds = [child_r, child_w, self._forkserver_alive_fd, - semaphore_tracker.getfd()] + resource_tracker.getfd()] allfds += fds try: reduction.sendfds(client, allfds) @@ -90,7 +90,7 @@ class ForkServer(object): ensure_running() will do nothing. ''' with self._lock: - semaphore_tracker.ensure_running() + resource_tracker.ensure_running() if self._forkserver_pid is not None: # forkserver was launched before, is it still running? pid, status = os.waitpid(self._forkserver_pid, os.WNOHANG) @@ -290,7 +290,7 @@ def _serve_one(child_r, fds, unused_fds, handlers): os.close(fd) (_forkserver._forkserver_alive_fd, - semaphore_tracker._semaphore_tracker._fd, + resource_tracker._resource_tracker._fd, *_forkserver._inherited_fds) = fds # Run process object received over pipe diff --git a/Lib/multiprocessing/popen_spawn_posix.py b/Lib/multiprocessing/popen_spawn_posix.py index 3815106..59f8e45 100644 --- a/Lib/multiprocessing/popen_spawn_posix.py +++ b/Lib/multiprocessing/popen_spawn_posix.py @@ -36,8 +36,8 @@ class Popen(popen_fork.Popen): return fd def _launch(self, process_obj): - from . import semaphore_tracker - tracker_fd = semaphore_tracker.getfd() + from . import resource_tracker + tracker_fd = resource_tracker.getfd() self._fds.append(tracker_fd) prep_data = spawn.get_preparation_data(process_obj._name) fp = io.BytesIO() diff --git a/Lib/multiprocessing/resource_tracker.py b/Lib/multiprocessing/resource_tracker.py new file mode 100644 index 0000000..e67e0b2 --- /dev/null +++ b/Lib/multiprocessing/resource_tracker.py @@ -0,0 +1,213 @@ +############################################################################### +# Server process to keep track of unlinked resources (like shared memory +# segments, semaphores etc.) and clean them. +# +# On Unix we run a server process which keeps track of unlinked +# resources. The server ignores SIGINT and SIGTERM and reads from a +# pipe. Every other process of the program has a copy of the writable +# end of the pipe, so we get EOF when all other processes have exited. +# Then the server process unlinks any remaining resource names. +# +# This is important because there may be system limits for such resources: for +# instance, the system only supports a limited number of named semaphores, and +# shared-memory segments live in the RAM. If a python process leaks such a +# resource, this resource will not be removed till the next reboot. Without +# this resource tracker process, "killall python" would probably leave unlinked +# resources. + +import os +import signal +import sys +import threading +import warnings +import _multiprocessing +import _posixshmem + +from . import spawn +from . import util + +__all__ = ['ensure_running', 'register', 'unregister'] + +_HAVE_SIGMASK = hasattr(signal, 'pthread_sigmask') +_IGNORED_SIGNALS = (signal.SIGINT, signal.SIGTERM) + +_CLEANUP_FUNCS = { + 'noop': lambda: None, + 'semaphore': _multiprocessing.sem_unlink, + 'shared_memory': _posixshmem.shm_unlink +} + + +class ResourceTracker(object): + + def __init__(self): + self._lock = threading.Lock() + self._fd = None + self._pid = None + + def getfd(self): + self.ensure_running() + return self._fd + + def ensure_running(self): + '''Make sure that resource tracker process is running. + + This can be run from any process. Usually a child process will use + the resource created by its parent.''' + with self._lock: + if self._fd is not None: + # resource tracker was launched before, is it still running? + if self._check_alive(): + # => still alive + return + # => dead, launch it again + os.close(self._fd) + + # Clean-up to avoid dangling processes. + try: + # _pid can be None if this process is a child from another + # python process, which has started the resource_tracker. + if self._pid is not None: + os.waitpid(self._pid, 0) + except ChildProcessError: + # The resource_tracker has already been terminated. + pass + self._fd = None + self._pid = None + + warnings.warn('resource_tracker: process died unexpectedly, ' + 'relaunching. Some resources might leak.') + + fds_to_pass = [] + try: + fds_to_pass.append(sys.stderr.fileno()) + except Exception: + pass + cmd = 'from multiprocessing.resource_tracker import main;main(%d)' + r, w = os.pipe() + try: + fds_to_pass.append(r) + # process will out live us, so no need to wait on pid + exe = spawn.get_executable() + args = [exe] + util._args_from_interpreter_flags() + args += ['-c', cmd % r] + # bpo-33613: Register a signal mask that will block the signals. + # This signal mask will be inherited by the child that is going + # to be spawned and will protect the child from a race condition + # that can make the child die before it registers signal handlers + # for SIGINT and SIGTERM. The mask is unregistered after spawning + # the child. + try: + if _HAVE_SIGMASK: + signal.pthread_sigmask(signal.SIG_BLOCK, _IGNORED_SIGNALS) + pid = util.spawnv_passfds(exe, args, fds_to_pass) + finally: + if _HAVE_SIGMASK: + signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS) + except: + os.close(w) + raise + else: + self._fd = w + self._pid = pid + finally: + os.close(r) + + def _check_alive(self): + '''Check that the pipe has not been closed by sending a probe.''' + try: + # We cannot use send here as it calls ensure_running, creating + # a cycle. + os.write(self._fd, b'PROBE:0:noop\n') + except OSError: + return False + else: + return True + + def register(self, name, rtype): + '''Register name of resource with resource tracker.''' + self._send('REGISTER', name, rtype) + + def unregister(self, name, rtype): + '''Unregister name of resource with resource tracker.''' + self._send('UNREGISTER', name, rtype) + + def _send(self, cmd, name, rtype): + self.ensure_running() + msg = '{0}:{1}:{2}\n'.format(cmd, name, rtype).encode('ascii') + if len(name) > 512: + # posix guarantees that writes to a pipe of less than PIPE_BUF + # bytes are atomic, and that PIPE_BUF >= 512 + raise ValueError('name too long') + nbytes = os.write(self._fd, msg) + assert nbytes == len(msg), "nbytes {0:n} but len(msg) {1:n}".format( + nbytes, len(msg)) + + +_resource_tracker = ResourceTracker() +ensure_running = _resource_tracker.ensure_running +register = _resource_tracker.register +unregister = _resource_tracker.unregister +getfd = _resource_tracker.getfd + +def main(fd): + '''Run resource tracker.''' + # protect the process from ^C and "killall python" etc + signal.signal(signal.SIGINT, signal.SIG_IGN) + signal.signal(signal.SIGTERM, signal.SIG_IGN) + if _HAVE_SIGMASK: + signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS) + + for f in (sys.stdin, sys.stdout): + try: + f.close() + except Exception: + pass + + cache = {rtype: set() for rtype in _CLEANUP_FUNCS.keys()} + try: + # keep track of registered/unregistered resources + with open(fd, 'rb') as f: + for line in f: + try: + cmd, name, rtype = line.strip().decode('ascii').split(':') + cleanup_func = _CLEANUP_FUNCS.get(rtype, None) + if cleanup_func is None: + raise ValueError( + f'Cannot register {name} for automatic cleanup: ' + f'unknown resource type {rtype}') + + if cmd == 'REGISTER': + cache[rtype].add(name) + elif cmd == 'UNREGISTER': + cache[rtype].remove(name) + elif cmd == 'PROBE': + pass + else: + raise RuntimeError('unrecognized command %r' % cmd) + except Exception: + try: + sys.excepthook(*sys.exc_info()) + except: + pass + finally: + # all processes have terminated; cleanup any remaining resources + for rtype, rtype_cache in cache.items(): + if rtype_cache: + try: + warnings.warn('resource_tracker: There appear to be %d ' + 'leaked %s objects to clean up at shutdown' % + (len(rtype_cache), rtype)) + except Exception: + pass + for name in rtype_cache: + # For some reason the process which created and registered this + # resource has failed to unregister it. Presumably it has + # died. We therefore unlink it. + try: + try: + _CLEANUP_FUNCS[rtype](name) + except Exception as e: + warnings.warn('resource_tracker: %r: %s' % (name, e)) + finally: + pass diff --git a/Lib/multiprocessing/semaphore_tracker.py b/Lib/multiprocessing/semaphore_tracker.py deleted file mode 100644 index 3c2c3ad..0000000 --- a/Lib/multiprocessing/semaphore_tracker.py +++ /dev/null @@ -1,196 +0,0 @@ -# -# On Unix we run a server process which keeps track of unlinked -# semaphores. The server ignores SIGINT and SIGTERM and reads from a -# pipe. Every other process of the program has a copy of the writable -# end of the pipe, so we get EOF when all other processes have exited. -# Then the server process unlinks any remaining semaphore names. -# -# This is important because the system only supports a limited number -# of named semaphores, and they will not be automatically removed till -# the next reboot. Without this semaphore tracker process, "killall -# python" would probably leave unlinked semaphores. -# - -import os -import signal -import sys -import threading -import warnings -import _multiprocessing - -from . import spawn -from . import util - -__all__ = ['ensure_running', 'register', 'unregister'] - -_HAVE_SIGMASK = hasattr(signal, 'pthread_sigmask') -_IGNORED_SIGNALS = (signal.SIGINT, signal.SIGTERM) - - -class SemaphoreTracker(object): - - def __init__(self): - self._lock = threading.Lock() - self._fd = None - self._pid = None - - def getfd(self): - self.ensure_running() - return self._fd - - def ensure_running(self): - '''Make sure that semaphore tracker process is running. - - This can be run from any process. Usually a child process will use - the semaphore created by its parent.''' - with self._lock: - if self._fd is not None: - # semaphore tracker was launched before, is it still running? - if self._check_alive(): - # => still alive - return - # => dead, launch it again - os.close(self._fd) - - # Clean-up to avoid dangling processes. - try: - # _pid can be None if this process is a child from another - # python process, which has started the semaphore_tracker. - if self._pid is not None: - os.waitpid(self._pid, 0) - except ChildProcessError: - # The semaphore_tracker has already been terminated. - pass - self._fd = None - self._pid = None - - warnings.warn('semaphore_tracker: process died unexpectedly, ' - 'relaunching. Some semaphores might leak.') - - fds_to_pass = [] - try: - fds_to_pass.append(sys.stderr.fileno()) - except Exception: - pass - cmd = 'from multiprocessing.semaphore_tracker import main;main(%d)' - r, w = os.pipe() - try: - fds_to_pass.append(r) - # process will out live us, so no need to wait on pid - exe = spawn.get_executable() - args = [exe] + util._args_from_interpreter_flags() - args += ['-c', cmd % r] - # bpo-33613: Register a signal mask that will block the signals. - # This signal mask will be inherited by the child that is going - # to be spawned and will protect the child from a race condition - # that can make the child die before it registers signal handlers - # for SIGINT and SIGTERM. The mask is unregistered after spawning - # the child. - try: - if _HAVE_SIGMASK: - signal.pthread_sigmask(signal.SIG_BLOCK, _IGNORED_SIGNALS) - pid = util.spawnv_passfds(exe, args, fds_to_pass) - finally: - if _HAVE_SIGMASK: - signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS) - except: - os.close(w) - raise - else: - self._fd = w - self._pid = pid - finally: - os.close(r) - - def _check_alive(self): - '''Check that the pipe has not been closed by sending a probe.''' - try: - # We cannot use send here as it calls ensure_running, creating - # a cycle. - os.write(self._fd, b'PROBE:0\n') - except OSError: - return False - else: - return True - - def register(self, name): - '''Register name of semaphore with semaphore tracker.''' - self._send('REGISTER', name) - - def unregister(self, name): - '''Unregister name of semaphore with semaphore tracker.''' - self._send('UNREGISTER', name) - - def _send(self, cmd, name): - self.ensure_running() - msg = '{0}:{1}\n'.format(cmd, name).encode('ascii') - if len(name) > 512: - # posix guarantees that writes to a pipe of less than PIPE_BUF - # bytes are atomic, and that PIPE_BUF >= 512 - raise ValueError('name too long') - nbytes = os.write(self._fd, msg) - assert nbytes == len(msg), "nbytes {0:n} but len(msg) {1:n}".format( - nbytes, len(msg)) - - -_semaphore_tracker = SemaphoreTracker() -ensure_running = _semaphore_tracker.ensure_running -register = _semaphore_tracker.register -unregister = _semaphore_tracker.unregister -getfd = _semaphore_tracker.getfd - -def main(fd): - '''Run semaphore tracker.''' - # protect the process from ^C and "killall python" etc - signal.signal(signal.SIGINT, signal.SIG_IGN) - signal.signal(signal.SIGTERM, signal.SIG_IGN) - if _HAVE_SIGMASK: - signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS) - - for f in (sys.stdin, sys.stdout): - try: - f.close() - except Exception: - pass - - cache = set() - try: - # keep track of registered/unregistered semaphores - with open(fd, 'rb') as f: - for line in f: - try: - cmd, name = line.strip().split(b':') - if cmd == b'REGISTER': - cache.add(name) - elif cmd == b'UNREGISTER': - cache.remove(name) - elif cmd == b'PROBE': - pass - else: - raise RuntimeError('unrecognized command %r' % cmd) - except Exception: - try: - sys.excepthook(*sys.exc_info()) - except: - pass - finally: - # all processes have terminated; cleanup any remaining semaphores - if cache: - try: - warnings.warn('semaphore_tracker: There appear to be %d ' - 'leaked semaphores to clean up at shutdown' % - len(cache)) - except Exception: - pass - for name in cache: - # For some reason the process which created and registered this - # semaphore has failed to unregister it. Presumably it has died. - # We therefore unlink it. - try: - name = name.decode('ascii') - try: - _multiprocessing.sem_unlink(name) - except Exception as e: - warnings.warn('semaphore_tracker: %r: %s' % (name, e)) - finally: - pass diff --git a/Lib/multiprocessing/shared_memory.py b/Lib/multiprocessing/shared_memory.py index ebc8885..184e367 100644 --- a/Lib/multiprocessing/shared_memory.py +++ b/Lib/multiprocessing/shared_memory.py @@ -113,6 +113,9 @@ class SharedMemory: self.unlink() raise + from .resource_tracker import register + register(self._name, "shared_memory") + else: # Windows Named Shared Memory @@ -231,7 +234,9 @@ class SharedMemory: called once (and only once) across all processes which have access to the shared memory block.""" if _USE_POSIX and self._name: + from .resource_tracker import unregister _posixshmem.shm_unlink(self._name) + unregister(self._name, "shared_memory") _encoding = "utf8" diff --git a/Lib/multiprocessing/spawn.py b/Lib/multiprocessing/spawn.py index 6759351..f66b5aa 100644 --- a/Lib/multiprocessing/spawn.py +++ b/Lib/multiprocessing/spawn.py @@ -111,8 +111,8 @@ def spawn_main(pipe_handle, parent_pid=None, tracker_fd=None): _winapi.CloseHandle(source_process) fd = msvcrt.open_osfhandle(new_handle, os.O_RDONLY) else: - from . import semaphore_tracker - semaphore_tracker._semaphore_tracker._fd = tracker_fd + from . import resource_tracker + resource_tracker._resource_tracker._fd = tracker_fd fd = pipe_handle exitcode = _main(fd) sys.exit(exitcode) diff --git a/Lib/multiprocessing/synchronize.py b/Lib/multiprocessing/synchronize.py index 5137c49..4fcbefc 100644 --- a/Lib/multiprocessing/synchronize.py +++ b/Lib/multiprocessing/synchronize.py @@ -76,16 +76,16 @@ class SemLock(object): # We only get here if we are on Unix with forking # disabled. When the object is garbage collected or the # process shuts down we unlink the semaphore name - from .semaphore_tracker import register - register(self._semlock.name) + from .resource_tracker import register + register(self._semlock.name, "semaphore") util.Finalize(self, SemLock._cleanup, (self._semlock.name,), exitpriority=0) @staticmethod def _cleanup(name): - from .semaphore_tracker import unregister + from .resource_tracker import unregister sem_unlink(name) - unregister(name) + unregister(name, "semaphore") def _make_methods(self): self.acquire = self._semlock.acquire diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index d97e423..a50293c 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -88,6 +88,13 @@ def join_process(process): support.join_thread(process, timeout=TIMEOUT) +if os.name == "posix": + from multiprocessing import resource_tracker + + def _resource_unlink(name, rtype): + resource_tracker._CLEANUP_FUNCS[rtype](name) + + # # Constants # @@ -3896,6 +3903,32 @@ class _TestSharedMemory(BaseTestCase): deserialized_sl.shm.close() sl.shm.close() + def test_shared_memory_cleaned_after_process_termination(self): + import subprocess + from multiprocessing import shared_memory + cmd = '''if 1: + import os, time, sys + from multiprocessing import shared_memory + + # Create a shared_memory segment, and send the segment name + sm = shared_memory.SharedMemory(create=True, size=10) + sys.stdout.write(sm._name + '\\n') + sys.stdout.flush() + time.sleep(100) + ''' + p = subprocess.Popen([sys.executable, '-E', '-c', cmd], + stdout=subprocess.PIPE) + name = p.stdout.readline().strip().decode() + + # killing abruptly processes holding reference to a shared memory + # segment should not leak the given memory segment. + p.terminate() + p.wait() + time.sleep(1.0) # wait for the OS to collect the segment + + with self.assertRaises(FileNotFoundError): + smm = shared_memory.SharedMemory(name, create=False) + # # # @@ -4827,57 +4860,86 @@ class TestStartMethod(unittest.TestCase): @unittest.skipIf(sys.platform == "win32", "test semantics don't make sense on Windows") -class TestSemaphoreTracker(unittest.TestCase): +class TestResourceTracker(unittest.TestCase): - def test_semaphore_tracker(self): + def test_resource_tracker(self): # # Check that killing process does not leak named semaphores # import subprocess cmd = '''if 1: - import multiprocessing as mp, time, os + import time, os, tempfile + import multiprocessing as mp + from multiprocessing import resource_tracker + from multiprocessing.shared_memory import SharedMemory + mp.set_start_method("spawn") - lock1 = mp.Lock() - lock2 = mp.Lock() - os.write(%d, lock1._semlock.name.encode("ascii") + b"\\n") - os.write(%d, lock2._semlock.name.encode("ascii") + b"\\n") + rand = tempfile._RandomNameSequence() + + + def create_and_register_resource(rtype): + if rtype == "semaphore": + lock = mp.Lock() + return lock, lock._semlock.name + elif rtype == "shared_memory": + sm = SharedMemory(create=True, size=10) + return sm, sm._name + else: + raise ValueError( + "Resource type {{}} not understood".format(rtype)) + + + resource1, rname1 = create_and_register_resource("{rtype}") + resource2, rname2 = create_and_register_resource("{rtype}") + + os.write({w}, rname1.encode("ascii") + b"\\n") + os.write({w}, rname2.encode("ascii") + b"\\n") + time.sleep(10) ''' - r, w = os.pipe() - p = subprocess.Popen([sys.executable, - '-E', '-c', cmd % (w, w)], - pass_fds=[w], - stderr=subprocess.PIPE) - os.close(w) - with open(r, 'rb', closefd=True) as f: - name1 = f.readline().rstrip().decode('ascii') - name2 = f.readline().rstrip().decode('ascii') - _multiprocessing.sem_unlink(name1) - p.terminate() - p.wait() - time.sleep(2.0) - with self.assertRaises(OSError) as ctx: - _multiprocessing.sem_unlink(name2) - # docs say it should be ENOENT, but OSX seems to give EINVAL - self.assertIn(ctx.exception.errno, (errno.ENOENT, errno.EINVAL)) - err = p.stderr.read().decode('utf-8') - p.stderr.close() - expected = 'semaphore_tracker: There appear to be 2 leaked semaphores' - self.assertRegex(err, expected) - self.assertRegex(err, r'semaphore_tracker: %r: \[Errno' % name1) - - def check_semaphore_tracker_death(self, signum, should_die): + for rtype in resource_tracker._CLEANUP_FUNCS: + with self.subTest(rtype=rtype): + if rtype == "noop": + # Artefact resource type used by the resource_tracker + continue + r, w = os.pipe() + p = subprocess.Popen([sys.executable, + '-E', '-c', cmd.format(w=w, rtype=rtype)], + pass_fds=[w], + stderr=subprocess.PIPE) + os.close(w) + with open(r, 'rb', closefd=True) as f: + name1 = f.readline().rstrip().decode('ascii') + name2 = f.readline().rstrip().decode('ascii') + _resource_unlink(name1, rtype) + p.terminate() + p.wait() + time.sleep(2.0) + with self.assertRaises(OSError) as ctx: + _resource_unlink(name2, rtype) + # docs say it should be ENOENT, but OSX seems to give EINVAL + self.assertIn( + ctx.exception.errno, (errno.ENOENT, errno.EINVAL)) + err = p.stderr.read().decode('utf-8') + p.stderr.close() + expected = ('resource_tracker: There appear to be 2 leaked {} ' + 'objects'.format( + rtype)) + self.assertRegex(err, expected) + self.assertRegex(err, r'resource_tracker: %r: \[Errno' % name1) + + def check_resource_tracker_death(self, signum, should_die): # bpo-31310: if the semaphore tracker process has died, it should # be restarted implicitly. - from multiprocessing.semaphore_tracker import _semaphore_tracker - pid = _semaphore_tracker._pid + from multiprocessing.resource_tracker import _resource_tracker + pid = _resource_tracker._pid if pid is not None: os.kill(pid, signal.SIGKILL) os.waitpid(pid, 0) with warnings.catch_warnings(): warnings.simplefilter("ignore") - _semaphore_tracker.ensure_running() - pid = _semaphore_tracker._pid + _resource_tracker.ensure_running() + pid = _resource_tracker._pid os.kill(pid, signum) time.sleep(1.0) # give it time to die @@ -4898,50 +4960,50 @@ class TestSemaphoreTracker(unittest.TestCase): self.assertEqual(len(all_warn), 1) the_warn = all_warn[0] self.assertTrue(issubclass(the_warn.category, UserWarning)) - self.assertTrue("semaphore_tracker: process died" + self.assertTrue("resource_tracker: process died" in str(the_warn.message)) else: self.assertEqual(len(all_warn), 0) - def test_semaphore_tracker_sigint(self): + def test_resource_tracker_sigint(self): # Catchable signal (ignored by semaphore tracker) - self.check_semaphore_tracker_death(signal.SIGINT, False) + self.check_resource_tracker_death(signal.SIGINT, False) - def test_semaphore_tracker_sigterm(self): + def test_resource_tracker_sigterm(self): # Catchable signal (ignored by semaphore tracker) - self.check_semaphore_tracker_death(signal.SIGTERM, False) + self.check_resource_tracker_death(signal.SIGTERM, False) - def test_semaphore_tracker_sigkill(self): + def test_resource_tracker_sigkill(self): # Uncatchable signal. - self.check_semaphore_tracker_death(signal.SIGKILL, True) + self.check_resource_tracker_death(signal.SIGKILL, True) @staticmethod - def _is_semaphore_tracker_reused(conn, pid): - from multiprocessing.semaphore_tracker import _semaphore_tracker - _semaphore_tracker.ensure_running() + def _is_resource_tracker_reused(conn, pid): + from multiprocessing.resource_tracker import _resource_tracker + _resource_tracker.ensure_running() # The pid should be None in the child process, expect for the fork # context. It should not be a new value. - reused = _semaphore_tracker._pid in (None, pid) - reused &= _semaphore_tracker._check_alive() + reused = _resource_tracker._pid in (None, pid) + reused &= _resource_tracker._check_alive() conn.send(reused) - def test_semaphore_tracker_reused(self): - from multiprocessing.semaphore_tracker import _semaphore_tracker - _semaphore_tracker.ensure_running() - pid = _semaphore_tracker._pid + def test_resource_tracker_reused(self): + from multiprocessing.resource_tracker import _resource_tracker + _resource_tracker.ensure_running() + pid = _resource_tracker._pid r, w = multiprocessing.Pipe(duplex=False) - p = multiprocessing.Process(target=self._is_semaphore_tracker_reused, + p = multiprocessing.Process(target=self._is_resource_tracker_reused, args=(w, pid)) p.start() - is_semaphore_tracker_reused = r.recv() + is_resource_tracker_reused = r.recv() # Clean up p.join() w.close() r.close() - self.assertTrue(is_semaphore_tracker_reused) + self.assertTrue(is_resource_tracker_reused) class TestSimpleQueue(unittest.TestCase): diff --git a/Misc/NEWS.d/next/Library/2019-05-09-18-12-55.bpo-36867.FuwVTi.rst b/Misc/NEWS.d/next/Library/2019-05-09-18-12-55.bpo-36867.FuwVTi.rst new file mode 100644 index 0000000..5eaf0a0 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2019-05-09-18-12-55.bpo-36867.FuwVTi.rst @@ -0,0 +1 @@ +The multiprocessing.resource_tracker replaces the multiprocessing.semaphore_tracker module. Other than semaphores, resource_tracker also tracks shared_memory segments. \ No newline at end of file diff --git a/PCbuild/lib.pyproj b/PCbuild/lib.pyproj index ffb95c6..7ed71bd 100644 --- a/PCbuild/lib.pyproj +++ b/PCbuild/lib.pyproj @@ -678,7 +678,7 @@ - + -- cgit v0.12