diff options
Diffstat (limited to 'Lib/multiprocessing/semaphore_tracker.py')
-rw-r--r-- | Lib/multiprocessing/semaphore_tracker.py | 112 |
1 files changed, 61 insertions, 51 deletions
diff --git a/Lib/multiprocessing/semaphore_tracker.py b/Lib/multiprocessing/semaphore_tracker.py index 6c9e4a5..ddb2b52 100644 --- a/Lib/multiprocessing/semaphore_tracker.py +++ b/Lib/multiprocessing/semaphore_tracker.py @@ -26,60 +26,70 @@ from . import current_process __all__ = ['ensure_running', 'register', 'unregister'] -_semaphore_tracker_fd = None -_lock = threading.Lock() +class SemaphoreTracker(object): + def __init__(self): + self._lock = threading.Lock() + self._fd = None -def ensure_running(): - '''Make sure that semaphore tracker process is running. + def getfd(self): + self.ensure_running() + return self._fd - This can be run from any process. Usually a child process will use - the semaphore created by its parent.''' - global _semaphore_tracker_fd - with _lock: - if _semaphore_tracker_fd is not None: - return - fds_to_pass = [] - try: - fds_to_pass.append(sys.stderr.fileno()) - except Exception: - pass - cmd = 'from multiprocessing.semaphore_tracker import main; main(%d)' - r, w = os.pipe() - try: - fds_to_pass.append(r) - # process will out live us, so no need to wait on pid - exe = spawn.get_executable() - args = [exe] + util._args_from_interpreter_flags() - args += ['-c', cmd % r] - util.spawnv_passfds(exe, args, fds_to_pass) - except: - os.close(w) - raise - else: - _semaphore_tracker_fd = w - finally: - os.close(r) - - -def register(name): - '''Register name of semaphore with semaphore tracker.''' - _send('REGISTER', name) - - -def unregister(name): - '''Unregister name of semaphore with semaphore tracker.''' - _send('UNREGISTER', name) - - -def _send(cmd, name): - msg = '{0}:{1}\n'.format(cmd, name).encode('ascii') - if len(name) > 512: - # posix guarantees that writes to a pipe of less than PIPE_BUF - # bytes are atomic, and that PIPE_BUF >= 512 - raise ValueError('name too long') - nbytes = os.write(_semaphore_tracker_fd, msg) - assert nbytes == len(msg) + def ensure_running(self): + '''Make sure that semaphore tracker process is running. + + This can be run from any process. Usually a child process will use + the semaphore created by its parent.''' + with self._lock: + if self._fd is not None: + return + fds_to_pass = [] + try: + fds_to_pass.append(sys.stderr.fileno()) + except Exception: + pass + cmd = 'from multiprocessing.semaphore_tracker import main;main(%d)' + r, w = os.pipe() + try: + fds_to_pass.append(r) + # process will out live us, so no need to wait on pid + exe = spawn.get_executable() + args = [exe] + util._args_from_interpreter_flags() + args += ['-c', cmd % r] + util.spawnv_passfds(exe, args, fds_to_pass) + except: + os.close(w) + raise + else: + self._fd = w + finally: + os.close(r) + + def register(self, name): + '''Register name of semaphore with semaphore tracker.''' + self._send('REGISTER', name) + + def unregister(self, name): + '''Unregister name of semaphore with semaphore tracker.''' + self._send('UNREGISTER', name) + + def _send(self, cmd, name): + self.ensure_running() + msg = '{0}:{1}\n'.format(cmd, name).encode('ascii') + if len(name) > 512: + # posix guarantees that writes to a pipe of less than PIPE_BUF + # bytes are atomic, and that PIPE_BUF >= 512 + raise ValueError('name too long') + nbytes = os.write(self._fd, msg) + assert nbytes == len(msg) + + +_semaphore_tracker = SemaphoreTracker() +ensure_running = _semaphore_tracker.ensure_running +register = _semaphore_tracker.register +unregister = _semaphore_tracker.unregister +getfd = _semaphore_tracker.getfd def main(fd): |