diff options
Diffstat (limited to 'Lib/multiprocessing/synchronize.py')
-rw-r--r-- | Lib/multiprocessing/synchronize.py | 294 |
1 files changed, 294 insertions, 0 deletions
diff --git a/Lib/multiprocessing/synchronize.py b/Lib/multiprocessing/synchronize.py new file mode 100644 index 0000000..44b1171 --- /dev/null +++ b/Lib/multiprocessing/synchronize.py @@ -0,0 +1,294 @@ +# +# Module implementing synchronization primitives +# +# multiprocessing/synchronize.py +# +# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt +# + +__all__ = [ + 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event' + ] + +import threading +import os +import sys + +from time import time as _time, sleep as _sleep + +import _multiprocessing +from multiprocessing.process import current_process +from multiprocessing.util import Finalize, register_after_fork, debug +from multiprocessing.forking import assert_spawning, Popen + +# +# Constants +# + +RECURSIVE_MUTEX, SEMAPHORE = list(range(2)) +SEM_VALUE_MAX = _multiprocessing.SemLock.SEM_VALUE_MAX + +# +# Base class for semaphores and mutexes; wraps `_multiprocessing.SemLock` +# + +class SemLock(object): + + def __init__(self, kind, value, maxvalue): + sl = self._semlock = _multiprocessing.SemLock(kind, value, maxvalue) + debug('created semlock with handle %s' % sl.handle) + self._make_methods() + + if sys.platform != 'win32': + def _after_fork(obj): + obj._semlock._after_fork() + register_after_fork(self, _after_fork) + + def _make_methods(self): + self.acquire = self._semlock.acquire + self.release = self._semlock.release + self.__enter__ = self._semlock.__enter__ + self.__exit__ = self._semlock.__exit__ + + def __getstate__(self): + assert_spawning(self) + sl = self._semlock + return (Popen.duplicate_for_child(sl.handle), sl.kind, sl.maxvalue) + + def __setstate__(self, state): + self._semlock = _multiprocessing.SemLock._rebuild(*state) + debug('recreated blocker with handle %r' % state[0]) + self._make_methods() + +# +# Semaphore +# + +class Semaphore(SemLock): + + def __init__(self, value=1): + SemLock.__init__(self, SEMAPHORE, value, SEM_VALUE_MAX) + + def get_value(self): + return self._semlock._get_value() + + def __repr__(self): + try: + value = self._semlock._get_value() + except Exception: + value = 'unknown' + return '<Semaphore(value=%s)>' % value + +# +# Bounded semaphore +# + +class BoundedSemaphore(Semaphore): + + def __init__(self, value=1): + SemLock.__init__(self, SEMAPHORE, value, value) + + def __repr__(self): + try: + value = self._semlock._get_value() + except Exception: + value = 'unknown' + return '<BoundedSemaphore(value=%s, maxvalue=%s)>' % \ + (value, self._semlock.maxvalue) + +# +# Non-recursive lock +# + +class Lock(SemLock): + + def __init__(self): + SemLock.__init__(self, SEMAPHORE, 1, 1) + + def __repr__(self): + try: + if self._semlock._is_mine(): + name = current_process().get_name() + if threading.currentThread().getName() != 'MainThread': + name += '|' + threading.currentThread().getName() + elif self._semlock._get_value() == 1: + name = 'None' + elif self._semlock._count() > 0: + name = 'SomeOtherThread' + else: + name = 'SomeOtherProcess' + except Exception: + name = 'unknown' + return '<Lock(owner=%s)>' % name + +# +# Recursive lock +# + +class RLock(SemLock): + + def __init__(self): + SemLock.__init__(self, RECURSIVE_MUTEX, 1, 1) + + def __repr__(self): + try: + if self._semlock._is_mine(): + name = current_process().get_name() + if threading.currentThread().getName() != 'MainThread': + name += '|' + threading.currentThread().getName() + count = self._semlock._count() + elif self._semlock._get_value() == 1: + name, count = 'None', 0 + elif self._semlock._count() > 0: + name, count = 'SomeOtherThread', 'nonzero' + else: + name, count = 'SomeOtherProcess', 'nonzero' + except Exception: + name, count = 'unknown', 'unknown' + return '<RLock(%s, %s)>' % (name, count) + +# +# Condition variable +# + +class Condition(object): + + def __init__(self, lock=None): + self._lock = lock or RLock() + self._sleeping_count = Semaphore(0) + self._woken_count = Semaphore(0) + self._wait_semaphore = Semaphore(0) + self._make_methods() + + def __getstate__(self): + assert_spawning(self) + return (self._lock, self._sleeping_count, + self._woken_count, self._wait_semaphore) + + def __setstate__(self, state): + (self._lock, self._sleeping_count, + self._woken_count, self._wait_semaphore) = state + self._make_methods() + + def _make_methods(self): + self.acquire = self._lock.acquire + self.release = self._lock.release + self.__enter__ = self._lock.__enter__ + self.__exit__ = self._lock.__exit__ + + def __repr__(self): + try: + num_waiters = (self._sleeping_count._semlock._get_value() - + self._woken_count._semlock._get_value()) + except Exception: + num_waiters = 'unkown' + return '<Condition(%s, %s)>' % (self._lock, num_waiters) + + def wait(self, timeout=None): + assert self._lock._semlock._is_mine(), \ + 'must acquire() condition before using wait()' + + # indicate that this thread is going to sleep + self._sleeping_count.release() + + # release lock + count = self._lock._semlock._count() + for i in range(count): + self._lock.release() + + try: + # wait for notification or timeout + self._wait_semaphore.acquire(True, timeout) + finally: + # indicate that this thread has woken + self._woken_count.release() + + # reacquire lock + for i in range(count): + self._lock.acquire() + + def notify(self): + assert self._lock._semlock._is_mine(), 'lock is not owned' + assert not self._wait_semaphore.acquire(False) + + # to take account of timeouts since last notify() we subtract + # woken_count from sleeping_count and rezero woken_count + while self._woken_count.acquire(False): + res = self._sleeping_count.acquire(False) + assert res + + if self._sleeping_count.acquire(False): # try grabbing a sleeper + self._wait_semaphore.release() # wake up one sleeper + self._woken_count.acquire() # wait for the sleeper to wake + + # rezero _wait_semaphore in case a timeout just happened + self._wait_semaphore.acquire(False) + + def notify_all(self): + assert self._lock._semlock._is_mine(), 'lock is not owned' + assert not self._wait_semaphore.acquire(False) + + # to take account of timeouts since last notify*() we subtract + # woken_count from sleeping_count and rezero woken_count + while self._woken_count.acquire(False): + res = self._sleeping_count.acquire(False) + assert res + + sleepers = 0 + while self._sleeping_count.acquire(False): + self._wait_semaphore.release() # wake up one sleeper + sleepers += 1 + + if sleepers: + for i in range(sleepers): + self._woken_count.acquire() # wait for a sleeper to wake + + # rezero wait_semaphore in case some timeouts just happened + while self._wait_semaphore.acquire(False): + pass + +# +# Event +# + +class Event(object): + + def __init__(self): + self._cond = Condition(Lock()) + self._flag = Semaphore(0) + + def is_set(self): + self._cond.acquire() + try: + if self._flag.acquire(False): + self._flag.release() + return True + return False + finally: + self._cond.release() + + def set(self): + self._cond.acquire() + try: + self._flag.acquire(False) + self._flag.release() + self._cond.notify_all() + finally: + self._cond.release() + + def clear(self): + self._cond.acquire() + try: + self._flag.acquire(False) + finally: + self._cond.release() + + def wait(self, timeout=None): + self._cond.acquire() + try: + if self._flag.acquire(False): + self._flag.release() + else: + self._cond.wait(timeout) + finally: + self._cond.release() |