diff options
Diffstat (limited to 'Demo')
-rw-r--r-- | Demo/threads/bug.py | 24 | ||||
-rw-r--r-- | Demo/threads/find.py | 192 | ||||
-rw-r--r-- | Demo/threads/telnet.py | 158 |
3 files changed, 187 insertions, 187 deletions
diff --git a/Demo/threads/bug.py b/Demo/threads/bug.py index 5860536..6c5edac 100644 --- a/Demo/threads/bug.py +++ b/Demo/threads/bug.py @@ -1,12 +1,12 @@ # The following self-contained little program usually freezes with most # threads reporting -# +# # Unhandled exception in thread: # Traceback (innermost last): # File "importbug.py", line 6 # x = whrandom.randint(1,3) # AttributeError: randint -# +# # Here's the program; it doesn't use anything from the attached module: import thread @@ -33,37 +33,37 @@ print 'done' # Sticking an acquire/release pair around the 'import' statement makes the # problem go away. -# +# # I believe that what happens is: -# +# # 1) The first thread to hit the import atomically reaches, and executes # most of, get_module. In particular, it finds Lib/whrandom.pyc, # installs its name in sys.modules, and executes -# +# # v = eval_code(co, d, d, d, (object *)NULL); -# +# # to initialize the module. -# +# # 2) eval_code "ticker"-slices the 1st thread out, and gives another thread # a chance. When this 2nd thread hits the same 'import', import_module # finds 'whrandom' in sys.modules, so just proceeds. -# +# # 3) But the 1st thread is still "in the middle" of executing whrandom.pyc. # So the 2nd thread has a good chance of trying to look up 'randint' # before the 1st thread has placed it in whrandom's dict. -# +# # 4) The more threads there are, the more likely that at least one of them # will do this before the 1st thread finishes the import work. -# +# # If that's right, a perhaps not-too-bad workaround would be to introduce a # static "you can't interrupt this thread" flag in ceval.c, check it before # giving up interpreter_lock, and have IMPORT_NAME set it & restore (plain # clearing would not work) it around its call to import_module. To its # credit, there's something wonderfully perverse about fixing a race via an # unprotected static <grin>. -# +# # as-with-most-other-things-(pseudo-)parallel-programming's-more-fun- # in-python-too!-ly y'rs - tim -# +# # Tim Peters tim@ksr.com # not speaking for Kendall Square Research Corp diff --git a/Demo/threads/find.py b/Demo/threads/find.py index f858ccc..11fb65a 100644 --- a/Demo/threads/find.py +++ b/Demo/threads/find.py @@ -32,121 +32,121 @@ import thread class WorkQ: - # Invariants: - - # - busy and work are only modified when mutex is locked - # - len(work) is the number of jobs ready to be taken - # - busy is the number of jobs being done - # - todo is locked iff there is no work and somebody is busy - - def __init__(self): - self.mutex = thread.allocate() - self.todo = thread.allocate() - self.todo.acquire() - self.work = [] - self.busy = 0 - - def addwork(self, func, args): - job = (func, args) - self.mutex.acquire() - self.work.append(job) - self.mutex.release() - if len(self.work) == 1: - self.todo.release() - - def _getwork(self): - self.todo.acquire() - self.mutex.acquire() - if self.busy == 0 and len(self.work) == 0: - self.mutex.release() - self.todo.release() - return None - job = self.work[0] - del self.work[0] - self.busy = self.busy + 1 - self.mutex.release() - if len(self.work) > 0: - self.todo.release() - return job - - def _donework(self): - self.mutex.acquire() - self.busy = self.busy - 1 - if self.busy == 0 and len(self.work) == 0: - self.todo.release() - self.mutex.release() - - def _worker(self): - time.sleep(0.00001) # Let other threads run - while 1: - job = self._getwork() - if not job: - break - func, args = job - apply(func, args) - self._donework() - - def run(self, nworkers): - if not self.work: - return # Nothing to do - for i in range(nworkers-1): - thread.start_new(self._worker, ()) - self._worker() - self.todo.acquire() + # Invariants: + + # - busy and work are only modified when mutex is locked + # - len(work) is the number of jobs ready to be taken + # - busy is the number of jobs being done + # - todo is locked iff there is no work and somebody is busy + + def __init__(self): + self.mutex = thread.allocate() + self.todo = thread.allocate() + self.todo.acquire() + self.work = [] + self.busy = 0 + + def addwork(self, func, args): + job = (func, args) + self.mutex.acquire() + self.work.append(job) + self.mutex.release() + if len(self.work) == 1: + self.todo.release() + + def _getwork(self): + self.todo.acquire() + self.mutex.acquire() + if self.busy == 0 and len(self.work) == 0: + self.mutex.release() + self.todo.release() + return None + job = self.work[0] + del self.work[0] + self.busy = self.busy + 1 + self.mutex.release() + if len(self.work) > 0: + self.todo.release() + return job + + def _donework(self): + self.mutex.acquire() + self.busy = self.busy - 1 + if self.busy == 0 and len(self.work) == 0: + self.todo.release() + self.mutex.release() + + def _worker(self): + time.sleep(0.00001) # Let other threads run + while 1: + job = self._getwork() + if not job: + break + func, args = job + apply(func, args) + self._donework() + + def run(self, nworkers): + if not self.work: + return # Nothing to do + for i in range(nworkers-1): + thread.start_new(self._worker, ()) + self._worker() + self.todo.acquire() # Main program def main(): - sys.argv.append("/tmp") - nworkers = 4 - opts, args = getopt.getopt(sys.argv[1:], '-w:') - for opt, arg in opts: - if opt == '-w': - nworkers = string.atoi(arg) - if not args: - args = [os.curdir] + sys.argv.append("/tmp") + nworkers = 4 + opts, args = getopt.getopt(sys.argv[1:], '-w:') + for opt, arg in opts: + if opt == '-w': + nworkers = string.atoi(arg) + if not args: + args = [os.curdir] - wq = WorkQ() - for dir in args: - wq.addwork(find, (dir, selector, wq)) + wq = WorkQ() + for dir in args: + wq.addwork(find, (dir, selector, wq)) - t1 = time.time() - wq.run(nworkers) - t2 = time.time() + t1 = time.time() + wq.run(nworkers) + t2 = time.time() - sys.stderr.write('Total time ' + `t2-t1` + ' sec.\n') + sys.stderr.write('Total time ' + `t2-t1` + ' sec.\n') # The predicate -- defines what files we look for. # Feel free to change this to suit your purpose def selector(dir, name, fullname, stat): - # Look for group or world writable files - return (stat[ST_MODE] & 0022) != 0 + # Look for group or world writable files + return (stat[ST_MODE] & 0022) != 0 # The find procedure -- calls wq.addwork() for subdirectories def find(dir, pred, wq): - try: - names = os.listdir(dir) - except os.error, msg: - print `dir`, ':', msg - return - for name in names: - if name not in (os.curdir, os.pardir): - fullname = os.path.join(dir, name) - try: - stat = os.lstat(fullname) - except os.error, msg: - print `fullname`, ':', msg - continue - if pred(dir, name, fullname, stat): - print fullname - if S_ISDIR(stat[ST_MODE]): - if not os.path.ismount(fullname): - wq.addwork(find, (fullname, pred, wq)) + try: + names = os.listdir(dir) + except os.error, msg: + print `dir`, ':', msg + return + for name in names: + if name not in (os.curdir, os.pardir): + fullname = os.path.join(dir, name) + try: + stat = os.lstat(fullname) + except os.error, msg: + print `fullname`, ':', msg + continue + if pred(dir, name, fullname, stat): + print fullname + if S_ISDIR(stat[ST_MODE]): + if not os.path.ismount(fullname): + wq.addwork(find, (fullname, pred, wq)) # Call the main program diff --git a/Demo/threads/telnet.py b/Demo/threads/telnet.py index eacb06d..3c70cb0 100644 --- a/Demo/threads/telnet.py +++ b/Demo/threads/telnet.py @@ -21,94 +21,94 @@ BUFSIZE = 8*1024 # Telnet protocol characters -IAC = chr(255) # Interpret as command +IAC = chr(255) # Interpret as command DONT = chr(254) DO = chr(253) WONT = chr(252) WILL = chr(251) def main(): - if len(sys.argv) < 2: - sys.stderr.write('usage: telnet hostname [port]\n') - sys.exit(2) - host = sys.argv[1] - try: - hostaddr = gethostbyname(host) - except error: - sys.stderr.write(sys.argv[1] + ': bad host name\n') - sys.exit(2) - # - if len(sys.argv) > 2: - servname = sys.argv[2] - else: - servname = 'telnet' - # - if '0' <= servname[:1] <= '9': - port = eval(servname) - else: - try: - port = getservbyname(servname, 'tcp') - except error: - sys.stderr.write(servname + ': bad tcp service name\n') - sys.exit(2) - # - s = socket(AF_INET, SOCK_STREAM) - # - try: - s.connect((host, port)) - except error, msg: - sys.stderr.write('connect failed: ' + `msg` + '\n') - sys.exit(1) - # - thread.start_new(child, (s,)) - parent(s) + if len(sys.argv) < 2: + sys.stderr.write('usage: telnet hostname [port]\n') + sys.exit(2) + host = sys.argv[1] + try: + hostaddr = gethostbyname(host) + except error: + sys.stderr.write(sys.argv[1] + ': bad host name\n') + sys.exit(2) + # + if len(sys.argv) > 2: + servname = sys.argv[2] + else: + servname = 'telnet' + # + if '0' <= servname[:1] <= '9': + port = eval(servname) + else: + try: + port = getservbyname(servname, 'tcp') + except error: + sys.stderr.write(servname + ': bad tcp service name\n') + sys.exit(2) + # + s = socket(AF_INET, SOCK_STREAM) + # + try: + s.connect((host, port)) + except error, msg: + sys.stderr.write('connect failed: ' + `msg` + '\n') + sys.exit(1) + # + thread.start_new(child, (s,)) + parent(s) def parent(s): - # read socket, write stdout - iac = 0 # Interpret next char as command - opt = '' # Interpret next char as option - while 1: - data, dummy = s.recvfrom(BUFSIZE) - if not data: - # EOF -- exit - sys.stderr.write( '(Closed by remote host)\n') - sys.exit(1) - cleandata = '' - for c in data: - if opt: - print ord(c) -## print '(replying: ' + `opt+c` + ')' - s.send(opt + c) - opt = '' - elif iac: - iac = 0 - if c == IAC: - cleandata = cleandata + c - elif c in (DO, DONT): - if c == DO: print '(DO)', - else: print '(DONT)', - opt = IAC + WONT - elif c in (WILL, WONT): - if c == WILL: print '(WILL)', - else: print '(WONT)', - opt = IAC + DONT - else: - print '(command)', ord(c) - elif c == IAC: - iac = 1 - print '(IAC)', - else: - cleandata = cleandata + c - sys.stdout.write(cleandata) - sys.stdout.flush() -## print 'Out:', `cleandata` + # read socket, write stdout + iac = 0 # Interpret next char as command + opt = '' # Interpret next char as option + while 1: + data, dummy = s.recvfrom(BUFSIZE) + if not data: + # EOF -- exit + sys.stderr.write( '(Closed by remote host)\n') + sys.exit(1) + cleandata = '' + for c in data: + if opt: + print ord(c) +## print '(replying: ' + `opt+c` + ')' + s.send(opt + c) + opt = '' + elif iac: + iac = 0 + if c == IAC: + cleandata = cleandata + c + elif c in (DO, DONT): + if c == DO: print '(DO)', + else: print '(DONT)', + opt = IAC + WONT + elif c in (WILL, WONT): + if c == WILL: print '(WILL)', + else: print '(WONT)', + opt = IAC + DONT + else: + print '(command)', ord(c) + elif c == IAC: + iac = 1 + print '(IAC)', + else: + cleandata = cleandata + c + sys.stdout.write(cleandata) + sys.stdout.flush() +## print 'Out:', `cleandata` def child(s): - # read stdin, write socket - while 1: - line = sys.stdin.readline() -## print 'Got:', `line` - if not line: break - s.send(line) + # read stdin, write socket + while 1: + line = sys.stdin.readline() +## print 'Got:', `line` + if not line: break + s.send(line) main() |