diff options
author | Pierre Glaser <pierreglaser@msn.com> | 2019-05-10 20:59:08 (GMT) |
---|---|---|
committer | Antoine Pitrou <antoine@python.org> | 2019-05-10 20:59:08 (GMT) |
commit | f22cc69b012f52882d434a5c44a004bc3aa5c33c (patch) | |
tree | 03dc6cf6c5449ab30ac1243140b6b8f3b2359089 /Lib/test/_test_multiprocessing.py | |
parent | d0d64ad1f5f1dc1630004091d7f8209546c1220a (diff) | |
download | cpython-f22cc69b012f52882d434a5c44a004bc3aa5c33c.zip cpython-f22cc69b012f52882d434a5c44a004bc3aa5c33c.tar.gz cpython-f22cc69b012f52882d434a5c44a004bc3aa5c33c.tar.bz2 |
bpo-36867: Make semaphore_tracker track other system resources (GH-13222)
The multiprocessing.resource_tracker replaces the multiprocessing.semaphore_tracker module. Other than semaphores, resource_tracker also tracks shared_memory segments. Patch by Pierre Glaser.
Diffstat (limited to 'Lib/test/_test_multiprocessing.py')
-rw-r--r-- | Lib/test/_test_multiprocessing.py | 170 |
1 files changed, 116 insertions, 54 deletions
diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index d97e423..a50293c 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -88,6 +88,13 @@ def join_process(process): support.join_thread(process, timeout=TIMEOUT) +if os.name == "posix": + from multiprocessing import resource_tracker + + def _resource_unlink(name, rtype): + resource_tracker._CLEANUP_FUNCS[rtype](name) + + # # Constants # @@ -3896,6 +3903,32 @@ class _TestSharedMemory(BaseTestCase): deserialized_sl.shm.close() sl.shm.close() + def test_shared_memory_cleaned_after_process_termination(self): + import subprocess + from multiprocessing import shared_memory + cmd = '''if 1: + import os, time, sys + from multiprocessing import shared_memory + + # Create a shared_memory segment, and send the segment name + sm = shared_memory.SharedMemory(create=True, size=10) + sys.stdout.write(sm._name + '\\n') + sys.stdout.flush() + time.sleep(100) + ''' + p = subprocess.Popen([sys.executable, '-E', '-c', cmd], + stdout=subprocess.PIPE) + name = p.stdout.readline().strip().decode() + + # killing abruptly processes holding reference to a shared memory + # segment should not leak the given memory segment. + p.terminate() + p.wait() + time.sleep(1.0) # wait for the OS to collect the segment + + with self.assertRaises(FileNotFoundError): + smm = shared_memory.SharedMemory(name, create=False) + # # # @@ -4827,57 +4860,86 @@ class TestStartMethod(unittest.TestCase): @unittest.skipIf(sys.platform == "win32", "test semantics don't make sense on Windows") -class TestSemaphoreTracker(unittest.TestCase): +class TestResourceTracker(unittest.TestCase): - def test_semaphore_tracker(self): + def test_resource_tracker(self): # # Check that killing process does not leak named semaphores # import subprocess cmd = '''if 1: - import multiprocessing as mp, time, os + import time, os, tempfile + import multiprocessing as mp + from multiprocessing import resource_tracker + from multiprocessing.shared_memory import SharedMemory + mp.set_start_method("spawn") - lock1 = mp.Lock() - lock2 = mp.Lock() - os.write(%d, lock1._semlock.name.encode("ascii") + b"\\n") - os.write(%d, lock2._semlock.name.encode("ascii") + b"\\n") + rand = tempfile._RandomNameSequence() + + + def create_and_register_resource(rtype): + if rtype == "semaphore": + lock = mp.Lock() + return lock, lock._semlock.name + elif rtype == "shared_memory": + sm = SharedMemory(create=True, size=10) + return sm, sm._name + else: + raise ValueError( + "Resource type {{}} not understood".format(rtype)) + + + resource1, rname1 = create_and_register_resource("{rtype}") + resource2, rname2 = create_and_register_resource("{rtype}") + + os.write({w}, rname1.encode("ascii") + b"\\n") + os.write({w}, rname2.encode("ascii") + b"\\n") + time.sleep(10) ''' - r, w = os.pipe() - p = subprocess.Popen([sys.executable, - '-E', '-c', cmd % (w, w)], - pass_fds=[w], - stderr=subprocess.PIPE) - os.close(w) - with open(r, 'rb', closefd=True) as f: - name1 = f.readline().rstrip().decode('ascii') - name2 = f.readline().rstrip().decode('ascii') - _multiprocessing.sem_unlink(name1) - p.terminate() - p.wait() - time.sleep(2.0) - with self.assertRaises(OSError) as ctx: - _multiprocessing.sem_unlink(name2) - # docs say it should be ENOENT, but OSX seems to give EINVAL - self.assertIn(ctx.exception.errno, (errno.ENOENT, errno.EINVAL)) - err = p.stderr.read().decode('utf-8') - p.stderr.close() - expected = 'semaphore_tracker: There appear to be 2 leaked semaphores' - self.assertRegex(err, expected) - self.assertRegex(err, r'semaphore_tracker: %r: \[Errno' % name1) - - def check_semaphore_tracker_death(self, signum, should_die): + for rtype in resource_tracker._CLEANUP_FUNCS: + with self.subTest(rtype=rtype): + if rtype == "noop": + # Artefact resource type used by the resource_tracker + continue + r, w = os.pipe() + p = subprocess.Popen([sys.executable, + '-E', '-c', cmd.format(w=w, rtype=rtype)], + pass_fds=[w], + stderr=subprocess.PIPE) + os.close(w) + with open(r, 'rb', closefd=True) as f: + name1 = f.readline().rstrip().decode('ascii') + name2 = f.readline().rstrip().decode('ascii') + _resource_unlink(name1, rtype) + p.terminate() + p.wait() + time.sleep(2.0) + with self.assertRaises(OSError) as ctx: + _resource_unlink(name2, rtype) + # docs say it should be ENOENT, but OSX seems to give EINVAL + self.assertIn( + ctx.exception.errno, (errno.ENOENT, errno.EINVAL)) + err = p.stderr.read().decode('utf-8') + p.stderr.close() + expected = ('resource_tracker: There appear to be 2 leaked {} ' + 'objects'.format( + rtype)) + self.assertRegex(err, expected) + self.assertRegex(err, r'resource_tracker: %r: \[Errno' % name1) + + def check_resource_tracker_death(self, signum, should_die): # bpo-31310: if the semaphore tracker process has died, it should # be restarted implicitly. - from multiprocessing.semaphore_tracker import _semaphore_tracker - pid = _semaphore_tracker._pid + from multiprocessing.resource_tracker import _resource_tracker + pid = _resource_tracker._pid if pid is not None: os.kill(pid, signal.SIGKILL) os.waitpid(pid, 0) with warnings.catch_warnings(): warnings.simplefilter("ignore") - _semaphore_tracker.ensure_running() - pid = _semaphore_tracker._pid + _resource_tracker.ensure_running() + pid = _resource_tracker._pid os.kill(pid, signum) time.sleep(1.0) # give it time to die @@ -4898,50 +4960,50 @@ class TestSemaphoreTracker(unittest.TestCase): self.assertEqual(len(all_warn), 1) the_warn = all_warn[0] self.assertTrue(issubclass(the_warn.category, UserWarning)) - self.assertTrue("semaphore_tracker: process died" + self.assertTrue("resource_tracker: process died" in str(the_warn.message)) else: self.assertEqual(len(all_warn), 0) - def test_semaphore_tracker_sigint(self): + def test_resource_tracker_sigint(self): # Catchable signal (ignored by semaphore tracker) - self.check_semaphore_tracker_death(signal.SIGINT, False) + self.check_resource_tracker_death(signal.SIGINT, False) - def test_semaphore_tracker_sigterm(self): + def test_resource_tracker_sigterm(self): # Catchable signal (ignored by semaphore tracker) - self.check_semaphore_tracker_death(signal.SIGTERM, False) + self.check_resource_tracker_death(signal.SIGTERM, False) - def test_semaphore_tracker_sigkill(self): + def test_resource_tracker_sigkill(self): # Uncatchable signal. - self.check_semaphore_tracker_death(signal.SIGKILL, True) + self.check_resource_tracker_death(signal.SIGKILL, True) @staticmethod - def _is_semaphore_tracker_reused(conn, pid): - from multiprocessing.semaphore_tracker import _semaphore_tracker - _semaphore_tracker.ensure_running() + def _is_resource_tracker_reused(conn, pid): + from multiprocessing.resource_tracker import _resource_tracker + _resource_tracker.ensure_running() # The pid should be None in the child process, expect for the fork # context. It should not be a new value. - reused = _semaphore_tracker._pid in (None, pid) - reused &= _semaphore_tracker._check_alive() + reused = _resource_tracker._pid in (None, pid) + reused &= _resource_tracker._check_alive() conn.send(reused) - def test_semaphore_tracker_reused(self): - from multiprocessing.semaphore_tracker import _semaphore_tracker - _semaphore_tracker.ensure_running() - pid = _semaphore_tracker._pid + def test_resource_tracker_reused(self): + from multiprocessing.resource_tracker import _resource_tracker + _resource_tracker.ensure_running() + pid = _resource_tracker._pid r, w = multiprocessing.Pipe(duplex=False) - p = multiprocessing.Process(target=self._is_semaphore_tracker_reused, + p = multiprocessing.Process(target=self._is_resource_tracker_reused, args=(w, pid)) p.start() - is_semaphore_tracker_reused = r.recv() + is_resource_tracker_reused = r.recv() # Clean up p.join() w.close() r.close() - self.assertTrue(is_semaphore_tracker_reused) + self.assertTrue(is_resource_tracker_reused) class TestSimpleQueue(unittest.TestCase): |