diff options
Diffstat (limited to 'Lib/multiprocessing/synchronize.py')
-rw-r--r-- | Lib/multiprocessing/synchronize.py | 112 |
1 files changed, 78 insertions, 34 deletions
diff --git a/Lib/multiprocessing/synchronize.py b/Lib/multiprocessing/synchronize.py index 0faca78..9d8e282 100644 --- a/Lib/multiprocessing/synchronize.py +++ b/Lib/multiprocessing/synchronize.py @@ -11,20 +11,24 @@ __all__ = [ 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event' ] +import os import threading import sys - +import itertools +import tempfile import _multiprocessing -from multiprocessing.process import current_process -from multiprocessing.util import register_after_fork, debug -from multiprocessing.forking import assert_spawning, Popen + from time import time as _time +from . import context +from . import process +from . import util + # Try to import the mp.synchronize module cleanly, if it fails # raise ImportError for platforms lacking a working sem_open implementation. # See issue 3770 try: - from _multiprocessing import SemLock + from _multiprocessing import SemLock, sem_unlink except (ImportError): raise ImportError("This platform lacks a functioning sem_open" + " implementation, therefore, the required" + @@ -44,15 +48,46 @@ SEM_VALUE_MAX = _multiprocessing.SemLock.SEM_VALUE_MAX class SemLock(object): - def __init__(self, kind, value, maxvalue): - sl = self._semlock = _multiprocessing.SemLock(kind, value, maxvalue) - debug('created semlock with handle %s' % sl.handle) + _rand = tempfile._RandomNameSequence() + + def __init__(self, kind, value, maxvalue, *, ctx): + ctx = ctx or get_context() + ctx = ctx.get_context() + unlink_now = sys.platform == 'win32' or ctx._name == 'fork' + for i in range(100): + try: + sl = self._semlock = _multiprocessing.SemLock( + kind, value, maxvalue, self._make_name(), + unlink_now) + except FileExistsError: + pass + else: + break + else: + raise FileExistsError('cannot find name for semaphore') + + util.debug('created semlock with handle %s' % sl.handle) self._make_methods() if sys.platform != 'win32': def _after_fork(obj): obj._semlock._after_fork() - register_after_fork(self, _after_fork) + util.register_after_fork(self, _after_fork) + + if self._semlock.name is not None: + # We only get here if we are on Unix with forking + # disabled. When the object is garbage collected or the + # process shuts down we unlink the semaphore name + from .semaphore_tracker import register + register(self._semlock.name) + util.Finalize(self, SemLock._cleanup, (self._semlock.name,), + exitpriority=0) + + @staticmethod + def _cleanup(name): + from .semaphore_tracker import unregister + sem_unlink(name) + unregister(name) def _make_methods(self): self.acquire = self._semlock.acquire @@ -65,23 +100,32 @@ class SemLock(object): return self._semlock.__exit__(*args) def __getstate__(self): - assert_spawning(self) + context.assert_spawning(self) sl = self._semlock - return (Popen.duplicate_for_child(sl.handle), sl.kind, sl.maxvalue) + if sys.platform == 'win32': + h = context.get_spawning_popen().duplicate_for_child(sl.handle) + else: + h = sl.handle + return (h, sl.kind, sl.maxvalue, sl.name) def __setstate__(self, state): self._semlock = _multiprocessing.SemLock._rebuild(*state) - debug('recreated blocker with handle %r' % state[0]) + util.debug('recreated blocker with handle %r' % state[0]) self._make_methods() + @staticmethod + def _make_name(): + return '%s-%s' % (process.current_process()._config['semprefix'], + next(SemLock._rand)) + # # Semaphore # class Semaphore(SemLock): - def __init__(self, value=1): - SemLock.__init__(self, SEMAPHORE, value, SEM_VALUE_MAX) + def __init__(self, value=1, *, ctx): + SemLock.__init__(self, SEMAPHORE, value, SEM_VALUE_MAX, ctx=ctx) def get_value(self): return self._semlock._get_value() @@ -99,8 +143,8 @@ class Semaphore(SemLock): class BoundedSemaphore(Semaphore): - def __init__(self, value=1): - SemLock.__init__(self, SEMAPHORE, value, value) + def __init__(self, value=1, *, ctx): + SemLock.__init__(self, SEMAPHORE, value, value, ctx=ctx) def __repr__(self): try: @@ -116,13 +160,13 @@ class BoundedSemaphore(Semaphore): class Lock(SemLock): - def __init__(self): - SemLock.__init__(self, SEMAPHORE, 1, 1) + def __init__(self, *, ctx): + SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx) def __repr__(self): try: if self._semlock._is_mine(): - name = current_process().name + name = process.current_process().name if threading.current_thread().name != 'MainThread': name += '|' + threading.current_thread().name elif self._semlock._get_value() == 1: @@ -141,13 +185,13 @@ class Lock(SemLock): class RLock(SemLock): - def __init__(self): - SemLock.__init__(self, RECURSIVE_MUTEX, 1, 1) + def __init__(self, *, ctx): + SemLock.__init__(self, RECURSIVE_MUTEX, 1, 1, ctx=ctx) def __repr__(self): try: if self._semlock._is_mine(): - name = current_process().name + name = process.current_process().name if threading.current_thread().name != 'MainThread': name += '|' + threading.current_thread().name count = self._semlock._count() @@ -167,15 +211,15 @@ class RLock(SemLock): class Condition(object): - def __init__(self, lock=None): - self._lock = lock or RLock() - self._sleeping_count = Semaphore(0) - self._woken_count = Semaphore(0) - self._wait_semaphore = Semaphore(0) + def __init__(self, lock=None, *, ctx): + self._lock = lock or ctx.RLock() + self._sleeping_count = ctx.Semaphore(0) + self._woken_count = ctx.Semaphore(0) + self._wait_semaphore = ctx.Semaphore(0) self._make_methods() def __getstate__(self): - assert_spawning(self) + context.assert_spawning(self) return (self._lock, self._sleeping_count, self._woken_count, self._wait_semaphore) @@ -289,9 +333,9 @@ class Condition(object): class Event(object): - def __init__(self): - self._cond = Condition(Lock()) - self._flag = Semaphore(0) + def __init__(self, *, ctx): + self._cond = ctx.Condition(ctx.Lock()) + self._flag = ctx.Semaphore(0) def is_set(self): self._cond.acquire() @@ -340,11 +384,11 @@ class Event(object): class Barrier(threading.Barrier): - def __init__(self, parties, action=None, timeout=None): + def __init__(self, parties, action=None, timeout=None, *, ctx): import struct - from multiprocessing.heap import BufferWrapper + from .heap import BufferWrapper wrapper = BufferWrapper(struct.calcsize('i') * 2) - cond = Condition() + cond = ctx.Condition() self.__setstate__((parties, action, timeout, cond, wrapper)) self._state = 0 self._count = 0 |