diff options
Diffstat (limited to 'Demo/metaclasses/Synch.py')
-rw-r--r-- | Demo/metaclasses/Synch.py | 256 |
1 files changed, 256 insertions, 0 deletions
diff --git a/Demo/metaclasses/Synch.py b/Demo/metaclasses/Synch.py new file mode 100644 index 0000000..1fb9160 --- /dev/null +++ b/Demo/metaclasses/Synch.py @@ -0,0 +1,256 @@ +"""Synchronization metaclass. + +This metaclass makes it possible to declare synchronized methods. + +""" + +import thread + +# First we need to define a reentrant lock. +# This is generally useful and should probably be in a standard Python +# library module. For now, we in-line it. + +class Lock: + + """Reentrant lock. + + This is a mutex-like object which can be acquired by the same + thread more than once. It keeps a reference count of the number + of times it has been acquired by the same thread. Each acquire() + call must be matched by a release() call and only the last + release() call actually releases the lock for acquisition by + another thread. + + The implementation uses two locks internally: + + __mutex is a short term lock used to protect the instance variables + __wait is the lock for which other threads wait + + A thread intending to acquire both locks should acquire __wait + first. + + The implementation uses two other instance variables, protected by + locking __mutex: + + __tid is the thread ID of the thread that currently has the lock + __count is the number of times the current thread has acquired it + + When the lock is released, __tid is None and __count is zero. + + """ + + def __init__(self): + """Constructor. Initialize all instance variables.""" + self.__mutex = thread.allocate_lock() + self.__wait = thread.allocate_lock() + self.__tid = None + self.__count = 0 + + def acquire(self, flag=1): + """Acquire the lock. + + If the optional flag argument is false, returns immediately + when it cannot acquire the __wait lock without blocking (it + may still block for a little while in order to acquire the + __mutex lock). + + The return value is only relevant when the flag argument is + false; it is 1 if the lock is acquired, 0 if not. + + """ + self.__mutex.acquire() + try: + if self.__tid == thread.get_ident(): + self.__count = self.__count + 1 + return 1 + finally: + self.__mutex.release() + locked = self.__wait.acquire(flag) + if not flag and not locked: + return 0 + try: + self.__mutex.acquire() + assert self.__tid == None + assert self.__count == 0 + self.__tid = thread.get_ident() + self.__count = 1 + return 1 + finally: + self.__mutex.release() + + def release(self): + """Release the lock. + + If this thread doesn't currently have the lock, an assertion + error is raised. + + Only allow another thread to acquire the lock when the count + reaches zero after decrementing it. + + """ + self.__mutex.acquire() + try: + assert self.__tid == thread.get_ident() + assert self.__count > 0 + self.__count = self.__count - 1 + if self.__count == 0: + self.__tid = None + self.__wait.release() + finally: + self.__mutex.release() + + +def _testLock(): + + done = [] + + def f2(lock, done=done): + lock.acquire() + print "f2 running in thread %d\n" % thread.get_ident(), + lock.release() + done.append(1) + + def f1(lock, f2=f2, done=done): + lock.acquire() + print "f1 running in thread %d\n" % thread.get_ident(), + try: + f2(lock) + finally: + lock.release() + done.append(1) + + lock = Lock() + lock.acquire() + f1(lock) # Adds 2 to done + lock.release() + + lock.acquire() + + thread.start_new_thread(f1, (lock,)) # Adds 2 + thread.start_new_thread(f1, (lock, f1)) # Adds 3 + thread.start_new_thread(f2, (lock,)) # Adds 1 + thread.start_new_thread(f2, (lock,)) # Adds 1 + + lock.release() + import time + while len(done) < 9: + print len(done) + time.sleep(0.001) + print len(done) + + +# Now, the Locking metaclass is a piece of cake. +# As an example feature, methods whose name begins with exactly one +# underscore are not synchronized. + +from Meta import MetaClass, MetaHelper, MetaMethodWrapper + +class LockingMethodWrapper(MetaMethodWrapper): + def __call__(self, *args, **kw): + if self.__name__[:1] == '_' and self.__name__[1:] != '_': + return apply(self.func, (self.inst,) + args, kw) + self.inst.__lock__.acquire() + try: + return apply(self.func, (self.inst,) + args, kw) + finally: + self.inst.__lock__.release() + +class LockingHelper(MetaHelper): + __methodwrapper__ = LockingMethodWrapper + def __helperinit__(self, formalclass): + MetaHelper.__helperinit__(self, formalclass) + self.__lock__ = Lock() + +class LockingMetaClass(MetaClass): + __helper__ = LockingHelper + +Locking = LockingMetaClass('Locking', (), {}) + +def _test(): + # For kicks, take away the Locking base class and see it die + class Buffer(Locking): + def __init__(self, initialsize): + assert initialsize > 0 + self.size = initialsize + self.buffer = [None]*self.size + self.first = self.last = 0 + def put(self, item): + # Do we need to grow the buffer? + if (self.last+1) % self.size != self.first: + # Insert the new item + self.buffer[self.last] = item + self.last = (self.last+1) % self.size + return + # Double the buffer size + # First normalize it so that first==0 and last==size-1 + print "buffer =", self.buffer + print "first = %d, last = %d, size = %d" % ( + self.first, self.last, self.size) + if self.first <= self.last: + temp = self.buffer[self.first:self.last] + else: + temp = self.buffer[self.first:] + self.buffer[:self.last] + print "temp =", temp + self.buffer = temp + [None]*(self.size+1) + self.first = 0 + self.last = self.size-1 + self.size = self.size*2 + print "Buffer size doubled to", self.size + print "new buffer =", self.buffer + print "first = %d, last = %d, size = %d" % ( + self.first, self.last, self.size) + self.put(item) # Recursive call to test the locking + def get(self): + # Is the buffer empty? + if self.first == self.last: + raise EOFError # Avoid defining a new exception + item = self.buffer[self.first] + self.first = (self.first+1) % self.size + return item + + def producer(buffer, wait, n=1000): + import time + i = 0 + while i < n: + print "put", i + buffer.put(i) + i = i+1 + print "Producer: done producing", n, "items" + wait.release() + + def consumer(buffer, wait, n=1000): + import time + i = 0 + tout = 0.001 + while i < n: + try: + x = buffer.get() + if x != i: + raise AssertionError, \ + "get() returned %s, expected %s" % (x, i) + print "got", i + i = i+1 + tout = 0.001 + except EOFError: + time.sleep(tout) + tout = tout*2 + print "Consumer: done consuming", n, "items" + wait.release() + + pwait = thread.allocate_lock() + pwait.acquire() + cwait = thread.allocate_lock() + cwait.acquire() + buffer = Buffer(1) + n = 1000 + thread.start_new_thread(consumer, (buffer, cwait, n)) + thread.start_new_thread(producer, (buffer, pwait, n)) + pwait.acquire() + print "Producer done" + cwait.acquire() + print "All done" + print "buffer size ==", len(buffer.buffer) + +if __name__ == '__main__': + _testLock() + _test() |