summaryrefslogtreecommitdiffstats
path: root/Demo/threads
diff options
context:
space:
mode:
Diffstat (limited to 'Demo/threads')
-rw-r--r--Demo/threads/sync.py121
1 files changed, 114 insertions, 7 deletions
diff --git a/Demo/threads/sync.py b/Demo/threads/sync.py
index 4e99979..6cdf3e8 100644
--- a/Demo/threads/sync.py
+++ b/Demo/threads/sync.py
@@ -1,16 +1,23 @@
# Defines classes that provide synchronization objects. Note that use of
# this module requires that your Python support threads.
#
-# condition() # a POSIX-like condition-variable object
-# barrier(n) # an n-thread barrier
-# event() # an event object
-# semaphore(n=1)# a semaphore object, with initial count n
+# condition(lock=None) # a POSIX-like condition-variable object
+# barrier(n) # an n-thread barrier
+# event() # an event object
+# semaphore(n=1) # a semaphore object, with initial count n
+# mrsw() # a multiple-reader single-writer lock
#
# CONDITIONS
#
# A condition object is created via
# import this_module
-# your_condition_object = this_module.condition()
+# your_condition_object = this_module.condition(lock=None)
+#
+# As explained below, a condition object has a lock associated with it,
+# used in the protocol to protect condition data. You can specify a
+# lock to use in the constructor, else the constructor will allocate
+# an anonymous lock for you. Specifying a lock explicitly can be useful
+# when more than one condition keys off the same set of shared data.
#
# Methods:
# .acquire()
@@ -209,13 +216,63 @@
# any) blocked by a .p(). It's an (detected) error for a .v() to
# increase the semaphore's count to a value larger than the initial
# count.
+#
+# MULTIPLE-READER SINGLE-WRITER LOCKS
+#
+# A mrsw lock is created via
+# import this_module
+# your_mrsw_lock = this_module.mrsw()
+#
+# This kind of lock is often useful with complex shared data structures.
+# The object lets any number of "readers" proceed, so long as no thread
+# wishes to "write". When a (one or more) thread declares its intention
+# to "write" (e.g., to update a shared structure), all current readers
+# are allowed to finish, and then a writer gets exclusive access; all
+# other readers & writers are blocked until the current writer completes.
+# Finally, if some thread is waiting to write and another is waiting to
+# read, the writer takes precedence.
+#
+# Methods:
+#
+# .read_in()
+# If no thread is writing or waiting to write, returns immediately.
+# Else blocks until no thread is writing or waiting to write. So
+# long as some thread has completed a .read_in but not a .read_out,
+# writers are blocked.
+#
+# .read_out()
+# Use sometime after a .read_in to declare that the thread is done
+# reading. When all threads complete reading, a writer can proceed.
+#
+# .write_in()
+# If no thread is writing (has completed a .write_in, but hasn't yet
+# done a .write_out) or reading (similarly), returns immediately.
+# Else blocks the calling thread, and threads waiting to read, until
+# the current writer completes writing or all the current readers
+# complete reading; if then more than one thread is waiting to
+# write, one of them is allowed to proceed, but which one is not
+# specified.
+#
+# .write_out()
+# Use sometime after a .write_in to declare that the thread is done
+# writing. Then if some other thread is waiting to write, it's
+# allowed to proceed. Else all threads (if any) waiting to read are
+# allowed to proceed.
import thread
class condition:
- def __init__(self):
+ def __init__(self, lock=None):
# the lock actually used by .acquire() and .release()
- self.mutex = thread.allocate_lock()
+ if lock is None:
+ self.mutex = thread.allocate_lock()
+ else:
+ if hasattr(lock, 'acquire') and \
+ hasattr(lock, 'release'):
+ self.mutex = lock
+ else:
+ raise TypeError, 'condition constructor requires ' \
+ 'a lock argument'
# lock used to block threads until a signal
self.checkout = thread.allocate_lock()
@@ -357,6 +414,56 @@ class semaphore:
self.nonzero.signal()
self.nonzero.release()
+class mrsw:
+ def __init__(self):
+ # critical-section lock & the data it protects
+ self.rwOK = thread.allocate_lock()
+ self.nr = 0 # number readers actively reading (not just waiting)
+ self.nw = 0 # number writers either waiting to write or writing
+ self.writing = 0 # 1 iff some thread is writing
+
+ # conditions
+ self.readOK = condition(self.rwOK) # OK to unblock readers
+ self.writeOK = condition(self.rwOK) # OK to unblock writers
+
+ def read_in(self):
+ self.rwOK.acquire()
+ while self.nw:
+ self.readOK.wait()
+ self.nr = self.nr + 1
+ self.rwOK.release()
+
+ def read_out(self):
+ self.rwOK.acquire()
+ if self.nr <= 0:
+ raise ValueError, \
+ '.read_out() invoked without an active reader'
+ self.nr = self.nr - 1
+ if self.nr == 0:
+ self.writeOK.signal()
+ self.rwOK.release()
+
+ def write_in(self):
+ self.rwOK.acquire()
+ self.nw = self.nw + 1
+ while self.writing or self.nr:
+ self.writeOK.wait()
+ self.writing = 1
+ self.rwOK.release()
+
+ def write_out(self):
+ self.rwOK.acquire()
+ if not self.writing:
+ raise ValueError, \
+ '.write_out() invoked without an active writer'
+ self.writing = 0
+ self.nw = self.nw - 1
+ if self.nw:
+ self.writeOK.signal()
+ else:
+ self.readOK.broadcast()
+ self.rwOK.release()
+
# The rest of the file is a test case, that runs a number of parallelized
# quicksorts in parallel. If it works, you'll get about 600 lines of
# tracing output, with a line like