summaryrefslogtreecommitdiffstats
path: root/Lib/multiprocessing/semaphore_tracker.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/multiprocessing/semaphore_tracker.py')
-rw-r--r--Lib/multiprocessing/semaphore_tracker.py112
1 files changed, 61 insertions, 51 deletions
diff --git a/Lib/multiprocessing/semaphore_tracker.py b/Lib/multiprocessing/semaphore_tracker.py
index 6c9e4a5..ddb2b52 100644
--- a/Lib/multiprocessing/semaphore_tracker.py
+++ b/Lib/multiprocessing/semaphore_tracker.py
@@ -26,60 +26,70 @@ from . import current_process
__all__ = ['ensure_running', 'register', 'unregister']
-_semaphore_tracker_fd = None
-_lock = threading.Lock()
+class SemaphoreTracker(object):
+ def __init__(self):
+ self._lock = threading.Lock()
+ self._fd = None
-def ensure_running():
- '''Make sure that semaphore tracker process is running.
+ def getfd(self):
+ self.ensure_running()
+ return self._fd
- This can be run from any process. Usually a child process will use
- the semaphore created by its parent.'''
- global _semaphore_tracker_fd
- with _lock:
- if _semaphore_tracker_fd is not None:
- return
- fds_to_pass = []
- try:
- fds_to_pass.append(sys.stderr.fileno())
- except Exception:
- pass
- cmd = 'from multiprocessing.semaphore_tracker import main; main(%d)'
- r, w = os.pipe()
- try:
- fds_to_pass.append(r)
- # process will out live us, so no need to wait on pid
- exe = spawn.get_executable()
- args = [exe] + util._args_from_interpreter_flags()
- args += ['-c', cmd % r]
- util.spawnv_passfds(exe, args, fds_to_pass)
- except:
- os.close(w)
- raise
- else:
- _semaphore_tracker_fd = w
- finally:
- os.close(r)
-
-
-def register(name):
- '''Register name of semaphore with semaphore tracker.'''
- _send('REGISTER', name)
-
-
-def unregister(name):
- '''Unregister name of semaphore with semaphore tracker.'''
- _send('UNREGISTER', name)
-
-
-def _send(cmd, name):
- msg = '{0}:{1}\n'.format(cmd, name).encode('ascii')
- if len(name) > 512:
- # posix guarantees that writes to a pipe of less than PIPE_BUF
- # bytes are atomic, and that PIPE_BUF >= 512
- raise ValueError('name too long')
- nbytes = os.write(_semaphore_tracker_fd, msg)
- assert nbytes == len(msg)
+ def ensure_running(self):
+ '''Make sure that semaphore tracker process is running.
+
+ This can be run from any process. Usually a child process will use
+ the semaphore created by its parent.'''
+ with self._lock:
+ if self._fd is not None:
+ return
+ fds_to_pass = []
+ try:
+ fds_to_pass.append(sys.stderr.fileno())
+ except Exception:
+ pass
+ cmd = 'from multiprocessing.semaphore_tracker import main;main(%d)'
+ r, w = os.pipe()
+ try:
+ fds_to_pass.append(r)
+ # process will out live us, so no need to wait on pid
+ exe = spawn.get_executable()
+ args = [exe] + util._args_from_interpreter_flags()
+ args += ['-c', cmd % r]
+ util.spawnv_passfds(exe, args, fds_to_pass)
+ except:
+ os.close(w)
+ raise
+ else:
+ self._fd = w
+ finally:
+ os.close(r)
+
+ def register(self, name):
+ '''Register name of semaphore with semaphore tracker.'''
+ self._send('REGISTER', name)
+
+ def unregister(self, name):
+ '''Unregister name of semaphore with semaphore tracker.'''
+ self._send('UNREGISTER', name)
+
+ def _send(self, cmd, name):
+ self.ensure_running()
+ msg = '{0}:{1}\n'.format(cmd, name).encode('ascii')
+ if len(name) > 512:
+ # posix guarantees that writes to a pipe of less than PIPE_BUF
+ # bytes are atomic, and that PIPE_BUF >= 512
+ raise ValueError('name too long')
+ nbytes = os.write(self._fd, msg)
+ assert nbytes == len(msg)
+
+
+_semaphore_tracker = SemaphoreTracker()
+ensure_running = _semaphore_tracker.ensure_running
+register = _semaphore_tracker.register
+unregister = _semaphore_tracker.unregister
+getfd = _semaphore_tracker.getfd
def main(fd):