diff options
Diffstat (limited to 'Demo/threads')
-rw-r--r-- | Demo/threads/find.py | 153 |
1 files changed, 153 insertions, 0 deletions
diff --git a/Demo/threads/find.py b/Demo/threads/find.py new file mode 100644 index 0000000..e97bd99 --- /dev/null +++ b/Demo/threads/find.py @@ -0,0 +1,153 @@ +# A parallelized "find(1)" using the thread module (SGI only for now). + +# This demonstrates the use of a work queue and worker threads. +# It really does do more stats/sec when using multiple threads, +# although the improvement is only about 20-30 percent. + +# I'm too lazy to write a command line parser for the full find(1) +# command line syntax, so the predicate it searches for is wired-in, +# see function selector() below. (It currently searches for files with +# group or world write permission.) + +# Usage: parfind.py [-w nworkers] [directory] ... +# Default nworkers is 4, maximum appears to be 8 (on Irix 4.0.2) + + +import sys +import getopt +import string +import time +import os +from stat import * +import thread + + +# Work queue class. Usage: +# wq = WorkQ() +# wq.addwork(func, (arg1, arg2, ...)) # one or more calls +# wq.run(nworkers) +# The work is done when wq.run() completes. +# The function calls executed by the workers may add more work. +# Don't use keyboard interrupts! + +class WorkQ: + + # Invariants: + + # - busy and work are only modified when mutex is locked + # - len(work) is the number of jobs ready to be taken + # - busy is the number of jobs being done + # - todo is locked iff there is no work and somebody is busy + + def __init__(self): + self.mutex = thread.allocate() + self.todo = thread.allocate() + self.todo.acquire() + self.work = [] + self.busy = 0 + + def addwork(self, job): + if not job: + raise TypeError, 'cannot add null job' + self.mutex.acquire() + self.work.append(job) + self.mutex.release() + if len(self.work) == 1: + self.todo.release() + + def _getwork(self): + self.todo.acquire() + self.mutex.acquire() + if self.busy == 0 and len(self.work) == 0: + self.mutex.release() + self.todo.release() + return None + job = self.work[0] + del self.work[0] + self.busy = self.busy + 1 + self.mutex.release() + if len(self.work) > 0: + self.todo.release() + return job + + def _donework(self): + self.mutex.acquire() + self.busy = self.busy - 1 + if self.busy == 0 and len(self.work) == 0: + self.todo.release() + self.mutex.release() + + def _worker(self): + while 1: + job = self._getwork() + if not job: + break + func, args = job + apply(func, args) + self._donework() + + def run(self, nworkers): + if not self.work: + return # Nothing to do + for i in range(nworkers-1): + thread.start_new(self._worker, ()) + self._worker() + self.todo.acquire() + + +# Main program + +def main(): + nworkers = 4 + opts, args = getopt.getopt(sys.argv[1:], '-w:') + for opt, arg in opts: + if opt == '-w': + nworkers = string.atoi(arg) + if not args: + args = [os.curdir] + + wq = WorkQ() + for dir in args: + wq.addwork(find, (dir, selector, wq)) + + t1 = time.millitimer() + wq.run(nworkers) + t2 = time.millitimer() + + sys.stderr.write('Total time ' + `t2-t1` + ' msec.\n') + + +# The predicate -- defines what files we look for. +# Feel free to change this to suit your purpose + +def selector(dir, name, fullname, stat): + # Look for group or world writable files + return (stat[ST_MODE] & 0022) != 0 + + +# The find procedure -- calls wq.addwork() for subdirectories + +def find(dir, pred, wq): + try: + names = os.listdir(dir) + except os.error, msg: + print `dir`, ':', msg + return + for name in names: + if name not in (os.curdir, os.pardir): + fullname = os.path.join(dir, name) + try: + stat = os.lstat(fullname) + except os.error, msg: + print `fullname`, ':', msg + continue + if pred(dir, name, fullname, stat): + print fullname + if S_ISDIR(stat[ST_MODE]): + if not os.path.ismount(fullname): + wq.addwork(find, (fullname, pred, wq)) + + +# Call the main program + +main() |