summaryrefslogtreecommitdiffstats
path: root/Lib/test/_test_multiprocessing.py
diff options
context:
space:
mode:
authorPierre Glaser <pierreglaser@msn.com>2019-05-10 20:59:08 (GMT)
committerAntoine Pitrou <antoine@python.org>2019-05-10 20:59:08 (GMT)
commitf22cc69b012f52882d434a5c44a004bc3aa5c33c (patch)
tree03dc6cf6c5449ab30ac1243140b6b8f3b2359089 /Lib/test/_test_multiprocessing.py
parentd0d64ad1f5f1dc1630004091d7f8209546c1220a (diff)
downloadcpython-f22cc69b012f52882d434a5c44a004bc3aa5c33c.zip
cpython-f22cc69b012f52882d434a5c44a004bc3aa5c33c.tar.gz
cpython-f22cc69b012f52882d434a5c44a004bc3aa5c33c.tar.bz2
bpo-36867: Make semaphore_tracker track other system resources (GH-13222)
The multiprocessing.resource_tracker replaces the multiprocessing.semaphore_tracker module. Other than semaphores, resource_tracker also tracks shared_memory segments. Patch by Pierre Glaser.
Diffstat (limited to 'Lib/test/_test_multiprocessing.py')
-rw-r--r--Lib/test/_test_multiprocessing.py170
1 files changed, 116 insertions, 54 deletions
diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py
index d97e423..a50293c 100644
--- a/Lib/test/_test_multiprocessing.py
+++ b/Lib/test/_test_multiprocessing.py
@@ -88,6 +88,13 @@ def join_process(process):
support.join_thread(process, timeout=TIMEOUT)
+if os.name == "posix":
+ from multiprocessing import resource_tracker
+
+ def _resource_unlink(name, rtype):
+ resource_tracker._CLEANUP_FUNCS[rtype](name)
+
+
#
# Constants
#
@@ -3896,6 +3903,32 @@ class _TestSharedMemory(BaseTestCase):
deserialized_sl.shm.close()
sl.shm.close()
+ def test_shared_memory_cleaned_after_process_termination(self):
+ import subprocess
+ from multiprocessing import shared_memory
+ cmd = '''if 1:
+ import os, time, sys
+ from multiprocessing import shared_memory
+
+ # Create a shared_memory segment, and send the segment name
+ sm = shared_memory.SharedMemory(create=True, size=10)
+ sys.stdout.write(sm._name + '\\n')
+ sys.stdout.flush()
+ time.sleep(100)
+ '''
+ p = subprocess.Popen([sys.executable, '-E', '-c', cmd],
+ stdout=subprocess.PIPE)
+ name = p.stdout.readline().strip().decode()
+
+ # killing abruptly processes holding reference to a shared memory
+ # segment should not leak the given memory segment.
+ p.terminate()
+ p.wait()
+ time.sleep(1.0) # wait for the OS to collect the segment
+
+ with self.assertRaises(FileNotFoundError):
+ smm = shared_memory.SharedMemory(name, create=False)
+
#
#
#
@@ -4827,57 +4860,86 @@ class TestStartMethod(unittest.TestCase):
@unittest.skipIf(sys.platform == "win32",
"test semantics don't make sense on Windows")
-class TestSemaphoreTracker(unittest.TestCase):
+class TestResourceTracker(unittest.TestCase):
- def test_semaphore_tracker(self):
+ def test_resource_tracker(self):
#
# Check that killing process does not leak named semaphores
#
import subprocess
cmd = '''if 1:
- import multiprocessing as mp, time, os
+ import time, os, tempfile
+ import multiprocessing as mp
+ from multiprocessing import resource_tracker
+ from multiprocessing.shared_memory import SharedMemory
+
mp.set_start_method("spawn")
- lock1 = mp.Lock()
- lock2 = mp.Lock()
- os.write(%d, lock1._semlock.name.encode("ascii") + b"\\n")
- os.write(%d, lock2._semlock.name.encode("ascii") + b"\\n")
+ rand = tempfile._RandomNameSequence()
+
+
+ def create_and_register_resource(rtype):
+ if rtype == "semaphore":
+ lock = mp.Lock()
+ return lock, lock._semlock.name
+ elif rtype == "shared_memory":
+ sm = SharedMemory(create=True, size=10)
+ return sm, sm._name
+ else:
+ raise ValueError(
+ "Resource type {{}} not understood".format(rtype))
+
+
+ resource1, rname1 = create_and_register_resource("{rtype}")
+ resource2, rname2 = create_and_register_resource("{rtype}")
+
+ os.write({w}, rname1.encode("ascii") + b"\\n")
+ os.write({w}, rname2.encode("ascii") + b"\\n")
+
time.sleep(10)
'''
- r, w = os.pipe()
- p = subprocess.Popen([sys.executable,
- '-E', '-c', cmd % (w, w)],
- pass_fds=[w],
- stderr=subprocess.PIPE)
- os.close(w)
- with open(r, 'rb', closefd=True) as f:
- name1 = f.readline().rstrip().decode('ascii')
- name2 = f.readline().rstrip().decode('ascii')
- _multiprocessing.sem_unlink(name1)
- p.terminate()
- p.wait()
- time.sleep(2.0)
- with self.assertRaises(OSError) as ctx:
- _multiprocessing.sem_unlink(name2)
- # docs say it should be ENOENT, but OSX seems to give EINVAL
- self.assertIn(ctx.exception.errno, (errno.ENOENT, errno.EINVAL))
- err = p.stderr.read().decode('utf-8')
- p.stderr.close()
- expected = 'semaphore_tracker: There appear to be 2 leaked semaphores'
- self.assertRegex(err, expected)
- self.assertRegex(err, r'semaphore_tracker: %r: \[Errno' % name1)
-
- def check_semaphore_tracker_death(self, signum, should_die):
+ for rtype in resource_tracker._CLEANUP_FUNCS:
+ with self.subTest(rtype=rtype):
+ if rtype == "noop":
+ # Artefact resource type used by the resource_tracker
+ continue
+ r, w = os.pipe()
+ p = subprocess.Popen([sys.executable,
+ '-E', '-c', cmd.format(w=w, rtype=rtype)],
+ pass_fds=[w],
+ stderr=subprocess.PIPE)
+ os.close(w)
+ with open(r, 'rb', closefd=True) as f:
+ name1 = f.readline().rstrip().decode('ascii')
+ name2 = f.readline().rstrip().decode('ascii')
+ _resource_unlink(name1, rtype)
+ p.terminate()
+ p.wait()
+ time.sleep(2.0)
+ with self.assertRaises(OSError) as ctx:
+ _resource_unlink(name2, rtype)
+ # docs say it should be ENOENT, but OSX seems to give EINVAL
+ self.assertIn(
+ ctx.exception.errno, (errno.ENOENT, errno.EINVAL))
+ err = p.stderr.read().decode('utf-8')
+ p.stderr.close()
+ expected = ('resource_tracker: There appear to be 2 leaked {} '
+ 'objects'.format(
+ rtype))
+ self.assertRegex(err, expected)
+ self.assertRegex(err, r'resource_tracker: %r: \[Errno' % name1)
+
+ def check_resource_tracker_death(self, signum, should_die):
# bpo-31310: if the semaphore tracker process has died, it should
# be restarted implicitly.
- from multiprocessing.semaphore_tracker import _semaphore_tracker
- pid = _semaphore_tracker._pid
+ from multiprocessing.resource_tracker import _resource_tracker
+ pid = _resource_tracker._pid
if pid is not None:
os.kill(pid, signal.SIGKILL)
os.waitpid(pid, 0)
with warnings.catch_warnings():
warnings.simplefilter("ignore")
- _semaphore_tracker.ensure_running()
- pid = _semaphore_tracker._pid
+ _resource_tracker.ensure_running()
+ pid = _resource_tracker._pid
os.kill(pid, signum)
time.sleep(1.0) # give it time to die
@@ -4898,50 +4960,50 @@ class TestSemaphoreTracker(unittest.TestCase):
self.assertEqual(len(all_warn), 1)
the_warn = all_warn[0]
self.assertTrue(issubclass(the_warn.category, UserWarning))
- self.assertTrue("semaphore_tracker: process died"
+ self.assertTrue("resource_tracker: process died"
in str(the_warn.message))
else:
self.assertEqual(len(all_warn), 0)
- def test_semaphore_tracker_sigint(self):
+ def test_resource_tracker_sigint(self):
# Catchable signal (ignored by semaphore tracker)
- self.check_semaphore_tracker_death(signal.SIGINT, False)
+ self.check_resource_tracker_death(signal.SIGINT, False)
- def test_semaphore_tracker_sigterm(self):
+ def test_resource_tracker_sigterm(self):
# Catchable signal (ignored by semaphore tracker)
- self.check_semaphore_tracker_death(signal.SIGTERM, False)
+ self.check_resource_tracker_death(signal.SIGTERM, False)
- def test_semaphore_tracker_sigkill(self):
+ def test_resource_tracker_sigkill(self):
# Uncatchable signal.
- self.check_semaphore_tracker_death(signal.SIGKILL, True)
+ self.check_resource_tracker_death(signal.SIGKILL, True)
@staticmethod
- def _is_semaphore_tracker_reused(conn, pid):
- from multiprocessing.semaphore_tracker import _semaphore_tracker
- _semaphore_tracker.ensure_running()
+ def _is_resource_tracker_reused(conn, pid):
+ from multiprocessing.resource_tracker import _resource_tracker
+ _resource_tracker.ensure_running()
# The pid should be None in the child process, expect for the fork
# context. It should not be a new value.
- reused = _semaphore_tracker._pid in (None, pid)
- reused &= _semaphore_tracker._check_alive()
+ reused = _resource_tracker._pid in (None, pid)
+ reused &= _resource_tracker._check_alive()
conn.send(reused)
- def test_semaphore_tracker_reused(self):
- from multiprocessing.semaphore_tracker import _semaphore_tracker
- _semaphore_tracker.ensure_running()
- pid = _semaphore_tracker._pid
+ def test_resource_tracker_reused(self):
+ from multiprocessing.resource_tracker import _resource_tracker
+ _resource_tracker.ensure_running()
+ pid = _resource_tracker._pid
r, w = multiprocessing.Pipe(duplex=False)
- p = multiprocessing.Process(target=self._is_semaphore_tracker_reused,
+ p = multiprocessing.Process(target=self._is_resource_tracker_reused,
args=(w, pid))
p.start()
- is_semaphore_tracker_reused = r.recv()
+ is_resource_tracker_reused = r.recv()
# Clean up
p.join()
w.close()
r.close()
- self.assertTrue(is_semaphore_tracker_reused)
+ self.assertTrue(is_resource_tracker_reused)
class TestSimpleQueue(unittest.TestCase):