summaryrefslogtreecommitdiffstats
path: root/Lib/multiprocessing
diff options
context:
space:
mode:
authorPierre Glaser <pierreglaser@msn.com>2019-05-10 20:59:08 (GMT)
committerAntoine Pitrou <antoine@python.org>2019-05-10 20:59:08 (GMT)
commitf22cc69b012f52882d434a5c44a004bc3aa5c33c (patch)
tree03dc6cf6c5449ab30ac1243140b6b8f3b2359089 /Lib/multiprocessing
parentd0d64ad1f5f1dc1630004091d7f8209546c1220a (diff)
downloadcpython-f22cc69b012f52882d434a5c44a004bc3aa5c33c.zip
cpython-f22cc69b012f52882d434a5c44a004bc3aa5c33c.tar.gz
cpython-f22cc69b012f52882d434a5c44a004bc3aa5c33c.tar.bz2
bpo-36867: Make semaphore_tracker track other system resources (GH-13222)
The multiprocessing.resource_tracker replaces the multiprocessing.semaphore_tracker module. Other than semaphores, resource_tracker also tracks shared_memory segments. Patch by Pierre Glaser.
Diffstat (limited to 'Lib/multiprocessing')
-rw-r--r--Lib/multiprocessing/forkserver.py8
-rw-r--r--Lib/multiprocessing/popen_spawn_posix.py4
-rw-r--r--Lib/multiprocessing/resource_tracker.py (renamed from Lib/multiprocessing/semaphore_tracker.py)133
-rw-r--r--Lib/multiprocessing/shared_memory.py5
-rw-r--r--Lib/multiprocessing/spawn.py4
-rw-r--r--Lib/multiprocessing/synchronize.py8
6 files changed, 92 insertions, 70 deletions
diff --git a/Lib/multiprocessing/forkserver.py b/Lib/multiprocessing/forkserver.py
index 040b46e..dabf7bc 100644
--- a/Lib/multiprocessing/forkserver.py
+++ b/Lib/multiprocessing/forkserver.py
@@ -11,7 +11,7 @@ import warnings
from . import connection
from . import process
from .context import reduction
-from . import semaphore_tracker
+from . import resource_tracker
from . import spawn
from . import util
@@ -69,7 +69,7 @@ class ForkServer(object):
parent_r, child_w = os.pipe()
child_r, parent_w = os.pipe()
allfds = [child_r, child_w, self._forkserver_alive_fd,
- semaphore_tracker.getfd()]
+ resource_tracker.getfd()]
allfds += fds
try:
reduction.sendfds(client, allfds)
@@ -90,7 +90,7 @@ class ForkServer(object):
ensure_running() will do nothing.
'''
with self._lock:
- semaphore_tracker.ensure_running()
+ resource_tracker.ensure_running()
if self._forkserver_pid is not None:
# forkserver was launched before, is it still running?
pid, status = os.waitpid(self._forkserver_pid, os.WNOHANG)
@@ -290,7 +290,7 @@ def _serve_one(child_r, fds, unused_fds, handlers):
os.close(fd)
(_forkserver._forkserver_alive_fd,
- semaphore_tracker._semaphore_tracker._fd,
+ resource_tracker._resource_tracker._fd,
*_forkserver._inherited_fds) = fds
# Run process object received over pipe
diff --git a/Lib/multiprocessing/popen_spawn_posix.py b/Lib/multiprocessing/popen_spawn_posix.py
index 3815106..59f8e45 100644
--- a/Lib/multiprocessing/popen_spawn_posix.py
+++ b/Lib/multiprocessing/popen_spawn_posix.py
@@ -36,8 +36,8 @@ class Popen(popen_fork.Popen):
return fd
def _launch(self, process_obj):
- from . import semaphore_tracker
- tracker_fd = semaphore_tracker.getfd()
+ from . import resource_tracker
+ tracker_fd = resource_tracker.getfd()
self._fds.append(tracker_fd)
prep_data = spawn.get_preparation_data(process_obj._name)
fp = io.BytesIO()
diff --git a/Lib/multiprocessing/semaphore_tracker.py b/Lib/multiprocessing/resource_tracker.py
index 3c2c3ad..e67e0b2 100644
--- a/Lib/multiprocessing/semaphore_tracker.py
+++ b/Lib/multiprocessing/resource_tracker.py
@@ -1,15 +1,19 @@
+###############################################################################
+# Server process to keep track of unlinked resources (like shared memory
+# segments, semaphores etc.) and clean them.
#
# On Unix we run a server process which keeps track of unlinked
-# semaphores. The server ignores SIGINT and SIGTERM and reads from a
+# resources. The server ignores SIGINT and SIGTERM and reads from a
# pipe. Every other process of the program has a copy of the writable
# end of the pipe, so we get EOF when all other processes have exited.
-# Then the server process unlinks any remaining semaphore names.
-#
-# This is important because the system only supports a limited number
-# of named semaphores, and they will not be automatically removed till
-# the next reboot. Without this semaphore tracker process, "killall
-# python" would probably leave unlinked semaphores.
+# Then the server process unlinks any remaining resource names.
#
+# This is important because there may be system limits for such resources: for
+# instance, the system only supports a limited number of named semaphores, and
+# shared-memory segments live in the RAM. If a python process leaks such a
+# resource, this resource will not be removed till the next reboot. Without
+# this resource tracker process, "killall python" would probably leave unlinked
+# resources.
import os
import signal
@@ -17,6 +21,7 @@ import sys
import threading
import warnings
import _multiprocessing
+import _posixshmem
from . import spawn
from . import util
@@ -26,8 +31,14 @@ __all__ = ['ensure_running', 'register', 'unregister']
_HAVE_SIGMASK = hasattr(signal, 'pthread_sigmask')
_IGNORED_SIGNALS = (signal.SIGINT, signal.SIGTERM)
+_CLEANUP_FUNCS = {
+ 'noop': lambda: None,
+ 'semaphore': _multiprocessing.sem_unlink,
+ 'shared_memory': _posixshmem.shm_unlink
+}
+
-class SemaphoreTracker(object):
+class ResourceTracker(object):
def __init__(self):
self._lock = threading.Lock()
@@ -39,13 +50,13 @@ class SemaphoreTracker(object):
return self._fd
def ensure_running(self):
- '''Make sure that semaphore tracker process is running.
+ '''Make sure that resource tracker process is running.
This can be run from any process. Usually a child process will use
- the semaphore created by its parent.'''
+ the resource created by its parent.'''
with self._lock:
if self._fd is not None:
- # semaphore tracker was launched before, is it still running?
+ # resource tracker was launched before, is it still running?
if self._check_alive():
# => still alive
return
@@ -55,24 +66,24 @@ class SemaphoreTracker(object):
# Clean-up to avoid dangling processes.
try:
# _pid can be None if this process is a child from another
- # python process, which has started the semaphore_tracker.
+ # python process, which has started the resource_tracker.
if self._pid is not None:
os.waitpid(self._pid, 0)
except ChildProcessError:
- # The semaphore_tracker has already been terminated.
+ # The resource_tracker has already been terminated.
pass
self._fd = None
self._pid = None
- warnings.warn('semaphore_tracker: process died unexpectedly, '
- 'relaunching. Some semaphores might leak.')
+ warnings.warn('resource_tracker: process died unexpectedly, '
+ 'relaunching. Some resources might leak.')
fds_to_pass = []
try:
fds_to_pass.append(sys.stderr.fileno())
except Exception:
pass
- cmd = 'from multiprocessing.semaphore_tracker import main;main(%d)'
+ cmd = 'from multiprocessing.resource_tracker import main;main(%d)'
r, w = os.pipe()
try:
fds_to_pass.append(r)
@@ -107,23 +118,23 @@ class SemaphoreTracker(object):
try:
# We cannot use send here as it calls ensure_running, creating
# a cycle.
- os.write(self._fd, b'PROBE:0\n')
+ os.write(self._fd, b'PROBE:0:noop\n')
except OSError:
return False
else:
return True
- def register(self, name):
- '''Register name of semaphore with semaphore tracker.'''
- self._send('REGISTER', name)
+ def register(self, name, rtype):
+ '''Register name of resource with resource tracker.'''
+ self._send('REGISTER', name, rtype)
- def unregister(self, name):
- '''Unregister name of semaphore with semaphore tracker.'''
- self._send('UNREGISTER', name)
+ def unregister(self, name, rtype):
+ '''Unregister name of resource with resource tracker.'''
+ self._send('UNREGISTER', name, rtype)
- def _send(self, cmd, name):
+ def _send(self, cmd, name, rtype):
self.ensure_running()
- msg = '{0}:{1}\n'.format(cmd, name).encode('ascii')
+ msg = '{0}:{1}:{2}\n'.format(cmd, name, rtype).encode('ascii')
if len(name) > 512:
# posix guarantees that writes to a pipe of less than PIPE_BUF
# bytes are atomic, and that PIPE_BUF >= 512
@@ -133,14 +144,14 @@ class SemaphoreTracker(object):
nbytes, len(msg))
-_semaphore_tracker = SemaphoreTracker()
-ensure_running = _semaphore_tracker.ensure_running
-register = _semaphore_tracker.register
-unregister = _semaphore_tracker.unregister
-getfd = _semaphore_tracker.getfd
+_resource_tracker = ResourceTracker()
+ensure_running = _resource_tracker.ensure_running
+register = _resource_tracker.register
+unregister = _resource_tracker.unregister
+getfd = _resource_tracker.getfd
def main(fd):
- '''Run semaphore tracker.'''
+ '''Run resource tracker.'''
# protect the process from ^C and "killall python" etc
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, signal.SIG_IGN)
@@ -153,18 +164,24 @@ def main(fd):
except Exception:
pass
- cache = set()
+ cache = {rtype: set() for rtype in _CLEANUP_FUNCS.keys()}
try:
- # keep track of registered/unregistered semaphores
+ # keep track of registered/unregistered resources
with open(fd, 'rb') as f:
for line in f:
try:
- cmd, name = line.strip().split(b':')
- if cmd == b'REGISTER':
- cache.add(name)
- elif cmd == b'UNREGISTER':
- cache.remove(name)
- elif cmd == b'PROBE':
+ cmd, name, rtype = line.strip().decode('ascii').split(':')
+ cleanup_func = _CLEANUP_FUNCS.get(rtype, None)
+ if cleanup_func is None:
+ raise ValueError(
+ f'Cannot register {name} for automatic cleanup: '
+ f'unknown resource type {rtype}')
+
+ if cmd == 'REGISTER':
+ cache[rtype].add(name)
+ elif cmd == 'UNREGISTER':
+ cache[rtype].remove(name)
+ elif cmd == 'PROBE':
pass
else:
raise RuntimeError('unrecognized command %r' % cmd)
@@ -174,23 +191,23 @@ def main(fd):
except:
pass
finally:
- # all processes have terminated; cleanup any remaining semaphores
- if cache:
- try:
- warnings.warn('semaphore_tracker: There appear to be %d '
- 'leaked semaphores to clean up at shutdown' %
- len(cache))
- except Exception:
- pass
- for name in cache:
- # For some reason the process which created and registered this
- # semaphore has failed to unregister it. Presumably it has died.
- # We therefore unlink it.
- try:
- name = name.decode('ascii')
+ # all processes have terminated; cleanup any remaining resources
+ for rtype, rtype_cache in cache.items():
+ if rtype_cache:
try:
- _multiprocessing.sem_unlink(name)
- except Exception as e:
- warnings.warn('semaphore_tracker: %r: %s' % (name, e))
- finally:
- pass
+ warnings.warn('resource_tracker: There appear to be %d '
+ 'leaked %s objects to clean up at shutdown' %
+ (len(rtype_cache), rtype))
+ except Exception:
+ pass
+ for name in rtype_cache:
+ # For some reason the process which created and registered this
+ # resource has failed to unregister it. Presumably it has
+ # died. We therefore unlink it.
+ try:
+ try:
+ _CLEANUP_FUNCS[rtype](name)
+ except Exception as e:
+ warnings.warn('resource_tracker: %r: %s' % (name, e))
+ finally:
+ pass
diff --git a/Lib/multiprocessing/shared_memory.py b/Lib/multiprocessing/shared_memory.py
index ebc8885..184e367 100644
--- a/Lib/multiprocessing/shared_memory.py
+++ b/Lib/multiprocessing/shared_memory.py
@@ -113,6 +113,9 @@ class SharedMemory:
self.unlink()
raise
+ from .resource_tracker import register
+ register(self._name, "shared_memory")
+
else:
# Windows Named Shared Memory
@@ -231,7 +234,9 @@ class SharedMemory:
called once (and only once) across all processes which have access
to the shared memory block."""
if _USE_POSIX and self._name:
+ from .resource_tracker import unregister
_posixshmem.shm_unlink(self._name)
+ unregister(self._name, "shared_memory")
_encoding = "utf8"
diff --git a/Lib/multiprocessing/spawn.py b/Lib/multiprocessing/spawn.py
index 6759351..f66b5aa 100644
--- a/Lib/multiprocessing/spawn.py
+++ b/Lib/multiprocessing/spawn.py
@@ -111,8 +111,8 @@ def spawn_main(pipe_handle, parent_pid=None, tracker_fd=None):
_winapi.CloseHandle(source_process)
fd = msvcrt.open_osfhandle(new_handle, os.O_RDONLY)
else:
- from . import semaphore_tracker
- semaphore_tracker._semaphore_tracker._fd = tracker_fd
+ from . import resource_tracker
+ resource_tracker._resource_tracker._fd = tracker_fd
fd = pipe_handle
exitcode = _main(fd)
sys.exit(exitcode)
diff --git a/Lib/multiprocessing/synchronize.py b/Lib/multiprocessing/synchronize.py
index 5137c49..4fcbefc 100644
--- a/Lib/multiprocessing/synchronize.py
+++ b/Lib/multiprocessing/synchronize.py
@@ -76,16 +76,16 @@ class SemLock(object):
# We only get here if we are on Unix with forking
# disabled. When the object is garbage collected or the
# process shuts down we unlink the semaphore name
- from .semaphore_tracker import register
- register(self._semlock.name)
+ from .resource_tracker import register
+ register(self._semlock.name, "semaphore")
util.Finalize(self, SemLock._cleanup, (self._semlock.name,),
exitpriority=0)
@staticmethod
def _cleanup(name):
- from .semaphore_tracker import unregister
+ from .resource_tracker import unregister
sem_unlink(name)
- unregister(name)
+ unregister(name, "semaphore")
def _make_methods(self):
self.acquire = self._semlock.acquire