summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
authorGuido van Rossum <guido@python.org>2002-12-30 22:30:22 (GMT)
committerGuido van Rossum <guido@python.org>2002-12-30 22:30:22 (GMT)
commitad50ca91a99e9a16a583fb13799c79f23067ef30 (patch)
tree31e4e18a5980f1c50029b117b3c6231cb0e59458 /Lib
parentc91ed400e053dc9f11dd30c84e2bb611999dce50 (diff)
downloadcpython-ad50ca91a99e9a16a583fb13799c79f23067ef30.zip
cpython-ad50ca91a99e9a16a583fb13799c79f23067ef30.tar.gz
cpython-ad50ca91a99e9a16a583fb13799c79f23067ef30.tar.bz2
Brett Cannon's dummy_thread and dummy_threading modules (SF patch
622537), with some nitpicking editorial changes.
Diffstat (limited to 'Lib')
-rw-r--r--Lib/dummy_thread.py116
-rw-r--r--Lib/dummy_threading.py63
-rw-r--r--Lib/test/test_dummy_thread.py167
-rw-r--r--Lib/test/test_dummy_threading.py70
4 files changed, 416 insertions, 0 deletions
diff --git a/Lib/dummy_thread.py b/Lib/dummy_thread.py
new file mode 100644
index 0000000..b0ba0ce
--- /dev/null
+++ b/Lib/dummy_thread.py
@@ -0,0 +1,116 @@
+"""Drop-in replacement for the thread module.
+
+Meant to be used as a brain-dead substitute so that threaded code does
+not need to be rewritten for when the thread module is not present.
+
+Suggested usage is::
+
+ try:
+ import thread
+ except ImportError:
+ import dummy_thread as thread
+
+"""
+__author__ = "Brett Cannon"
+__email__ = "brett@python.org"
+
+# Exports only things specified by thread documentation
+# (skipping obsolete synonyms allocate(), start_new(), exit_thread())
+__all__ = ['error', 'start_new_thread', 'exit', 'get_ident', 'allocate_lock',
+ 'LockType']
+
+import traceback as _traceback
+
+class error(Exception):
+ """Dummy implementation of thread.error."""
+
+ def __init__(self, *args):
+ self.args = args
+
+def start_new_thread(function, args, kwargs={}):
+ """Dummy implementation of thread.start_new_thread().
+
+ Compatibility is maintained by making sure that ``args`` is a
+ tuple and ``kwargs`` is a dictionary. If an exception is raised
+ and it is SystemExit (which can be done by thread.exit()) it is
+ caught and nothing is done; all other exceptions are printed out
+ by using traceback.print_exc().
+
+ """
+ if type(args) != type(tuple()):
+ raise TypeError("2nd arg must be a tuple")
+ if type(kwargs) != type(dict()):
+ raise TypeError("3rd arg must be a dict")
+ try:
+ function(*args, **kwargs)
+ except SystemExit:
+ pass
+ except:
+ _traceback.print_exc()
+
+def exit():
+ """Dummy implementation of thread.exit()."""
+ raise SystemExit
+
+def get_ident():
+ """Dummy implementation of thread.get_ident().
+
+ Since this module should only be used when threadmodule is not
+ available, it is safe to assume that the current process is the
+ only thread. Thus a constant can be safely returned.
+ """
+ return -1
+
+def allocate_lock():
+ """Dummy implementation of thread.allocate_lock()."""
+ return LockType()
+
+class LockType(object):
+ """Class implementing dummy implementation of thread.LockType.
+
+ Compatibility is maintained by maintaining self.locked_status
+ which is a boolean that stores the state of the lock. Pickling of
+ the lock, though, should not be done since if the thread module is
+ then used with an unpickled ``lock()`` from here problems could
+ occur from this class not having atomic methods.
+
+ """
+
+ def __init__(self):
+ self.locked_status = False
+
+ def acquire(self, waitflag=None):
+ """Dummy implementation of acquire().
+
+ For blocking calls, self.locked_status is automatically set to
+ True and returned appropriately based on value of
+ ``waitflag``. If it is non-blocking, then the value is
+ actually checked and not set if it is already acquired. This
+ is all done so that threading.Condition's assert statements
+ aren't triggered and throw a little fit.
+
+ """
+ if waitflag is None:
+ self.locked_status = True
+ return None
+ elif not waitflag:
+ if not self.locked_status:
+ self.locked_status = True
+ return True
+ else:
+ return False
+ else:
+ self.locked_status = True
+ return True
+
+ def release(self):
+ """Release the dummy lock."""
+ # XXX Perhaps shouldn't actually bother to test? Could lead
+ # to problems for complex, threaded code.
+ if not self.locked_status:
+ raise error
+ self.locked_status = False
+ return True
+
+ def locked(self):
+ return self.locked_status
diff --git a/Lib/dummy_threading.py b/Lib/dummy_threading.py
new file mode 100644
index 0000000..2e070aa
--- /dev/null
+++ b/Lib/dummy_threading.py
@@ -0,0 +1,63 @@
+"""Faux ``threading`` version using ``dummy_thread`` instead of ``thread``.
+
+The module ``_dummy_threading`` is added to ``sys.modules`` in order
+to not have ``threading`` considered imported. Had ``threading`` been
+directly imported it would have made all subsequent imports succeed
+regardless of whether ``thread`` was available which is not desired.
+
+:Author: Brett Cannon
+:Contact: brett@python.org
+
+XXX: Try to get rid of ``_dummy_threading``.
+
+"""
+from sys import modules as sys_modules
+
+import dummy_thread
+
+# Declaring now so as to not have to nest ``try``s to get proper clean-up.
+holding_thread = False
+holding_threading = False
+
+try:
+ # Could have checked if ``thread`` was not in sys.modules and gone
+ # a different route, but decided to mirror technique used with
+ # ``threading`` below.
+ if 'thread' in sys_modules:
+ held_thread = sys_modules['thread']
+ holding_thread = True
+ # Must have some module named ``thread`` that implements its API
+ # in order to initially import ``threading``.
+ sys_modules['thread'] = sys_modules['dummy_thread']
+
+ if 'threading' in sys_modules:
+ # If ``threading`` is already imported, might as well prevent
+ # trying to import it more than needed by saving it if it is
+ # already imported before deleting it.
+ held_threading = sys_modules['threading']
+ holding_threading = True
+ del sys_modules['threading']
+ import threading
+ # Need a copy of the code kept somewhere...
+ sys_modules['_dummy_threading'] = sys_modules['threading']
+ del sys_modules['threading']
+ from _dummy_threading import *
+ from _dummy_threading import __all__
+
+finally:
+ # Put back ``threading`` if we overwrote earlier
+ if holding_threading:
+ sys_modules['threading'] = held_threading
+ del held_threading
+ del holding_threading
+
+ # Put back ``thread`` if we overwrote, else del the entry we made
+ if holding_thread:
+ sys_modules['thread'] = held_thread
+ del held_thread
+ else:
+ del sys_modules['thread']
+ del holding_thread
+
+ del dummy_thread
+ del sys_modules
diff --git a/Lib/test/test_dummy_thread.py b/Lib/test/test_dummy_thread.py
new file mode 100644
index 0000000..3be3931
--- /dev/null
+++ b/Lib/test/test_dummy_thread.py
@@ -0,0 +1,167 @@
+"""Generic thread tests.
+
+Meant to be used by dummy_thread and thread. To allow for different modules
+to be used, test_main() can be called with the module to use as the thread
+implementation as its sole argument.
+
+"""
+import dummy_thread as _thread
+import time
+import Queue
+import random
+import unittest
+from test import test_support
+
+
+class LockTests(unittest.TestCase):
+ """Test lock objects."""
+
+ def setUp(self):
+ # Create a lock
+ self.lock = _thread.allocate_lock()
+
+ def test_initlock(self):
+ #Make sure locks start locked
+ self.failUnless(not self.lock.locked(),
+ "Lock object is not initialized unlocked.")
+
+ def test_release(self):
+ # Test self.lock.release()
+ self.lock.acquire()
+ self.lock.release()
+ self.failUnless(not self.lock.locked(),
+ "Lock object did not release properly.")
+
+ def test_improper_release(self):
+ #Make sure release of an unlocked thread raises _thread.error
+ self.failUnlessRaises(_thread.error, self.lock.release)
+
+ def test_cond_acquire_success(self):
+ #Make sure the conditional acquiring of the lock works.
+ self.failUnless(self.lock.acquire(0),
+ "Conditional acquiring of the lock failed.")
+
+ def test_cond_acquire_fail(self):
+ #Test acquiring locked lock returns False
+ self.lock.acquire(0)
+ self.failUnless(not self.lock.acquire(0),
+ "Conditional acquiring of a locked lock incorrectly "
+ "succeeded.")
+
+ def test_uncond_acquire_success(self):
+ #Make sure unconditional acquiring of a lock works.
+ self.lock.acquire()
+ self.failUnless(self.lock.locked(),
+ "Uncondional locking failed.")
+
+ def test_uncond_acquire_return_val(self):
+ #Make sure that an unconditional locking returns True.
+ self.failUnless(self.lock.acquire(1) is True,
+ "Unconditional locking did not return True.")
+
+ def test_uncond_acquire_blocking(self):
+ #Make sure that unconditional acquiring of a locked lock blocks.
+ def delay_unlock(to_unlock, delay):
+ """Hold on to lock for a set amount of time before unlocking."""
+ time.sleep(delay)
+ to_unlock.release()
+
+ self.lock.acquire()
+ delay = 1 #In seconds
+ start_time = int(time.time())
+ _thread.start_new_thread(delay_unlock,(self.lock, delay))
+ if test_support.verbose:
+ print
+ print "*** Waiting for thread to release the lock "\
+ "(approx. %s sec.) ***" % delay
+ self.lock.acquire()
+ end_time = int(time.time())
+ if test_support.verbose:
+ print "done"
+ self.failUnless((end_time - start_time) >= delay,
+ "Blocking by unconditional acquiring failed.")
+
+class MiscTests(unittest.TestCase):
+ """Miscellaneous tests."""
+
+ def test_exit(self):
+ #Make sure _thread.exit() raises SystemExit
+ self.failUnlessRaises(SystemExit, _thread.exit)
+
+ def test_ident(self):
+ #Test sanity of _thread.get_ident()
+ self.failUnless(isinstance(_thread.get_ident(), int),
+ "_thread.get_ident() returned a non-integer")
+ self.failUnless(_thread.get_ident() != 0,
+ "_thread.get_ident() returned 0")
+
+ def test_LockType(self):
+ #Make sure _thread.LockType is the same type as _thread.allocate_locke()
+ self.failUnless(isinstance(_thread.allocate_lock(), _thread.LockType),
+ "_thread.LockType is not an instance of what is "
+ "returned by _thread.allocate_lock()")
+
+class ThreadTests(unittest.TestCase):
+ """Test thread creation."""
+
+ def test_arg_passing(self):
+ #Make sure that parameter passing works.
+ def arg_tester(queue, arg1=False, arg2=False):
+ """Use to test _thread.start_new_thread() passes args properly."""
+ queue.put((arg1, arg2))
+
+ testing_queue = Queue.Queue(1)
+ _thread.start_new_thread(arg_tester, (testing_queue, True, True))
+ result = testing_queue.get()
+ self.failUnless(result[0] and result[1],
+ "Argument passing for thread creation using tuple failed")
+ _thread.start_new_thread(arg_tester, tuple(), {'queue':testing_queue,
+ 'arg1':True, 'arg2':True})
+ result = testing_queue.get()
+ self.failUnless(result[0] and result[1],
+ "Argument passing for thread creation using kwargs failed")
+ _thread.start_new_thread(arg_tester, (testing_queue, True), {'arg2':True})
+ result = testing_queue.get()
+ self.failUnless(result[0] and result[1],
+ "Argument passing for thread creation using both tuple"
+ " and kwargs failed")
+
+ def test_multi_creation(self):
+ #Make sure multiple threads can be created.
+ def queue_mark(queue, delay):
+ """Wait for ``delay`` seconds and then put something into ``queue``"""
+ time.sleep(delay)
+ queue.put(_thread.get_ident())
+
+ thread_count = 5
+ delay = 1.5
+ testing_queue = Queue.Queue(thread_count)
+ if test_support.verbose:
+ print
+ print "*** Testing multiple thread creation "\
+ "(will take approx. %s to %s sec.) ***" % (delay, thread_count)
+ for count in xrange(thread_count):
+ _thread.start_new_thread(queue_mark,
+ (testing_queue, round(random.random(), 1)))
+ time.sleep(delay)
+ if test_support.verbose:
+ print 'done'
+ self.failUnless(testing_queue.qsize() == thread_count,
+ "Not all %s threads executed properly after %s sec." %
+ (thread_count, delay))
+
+def test_main(imported_module=None):
+ global _thread
+ if imported_module:
+ _thread = imported_module
+ if test_support.verbose:
+ print
+ print "*** Using %s as _thread module ***" % _thread
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(LockTests))
+ suite.addTest(unittest.makeSuite(MiscTests))
+ suite.addTest(unittest.makeSuite(ThreadTests))
+ test_support.run_suite(suite)
+
+if __name__ == '__main__':
+ test_main()
diff --git a/Lib/test/test_dummy_threading.py b/Lib/test/test_dummy_threading.py
new file mode 100644
index 0000000..cc8c0e3
--- /dev/null
+++ b/Lib/test/test_dummy_threading.py
@@ -0,0 +1,70 @@
+# Very rudimentary test of threading module
+
+# Create a bunch of threads, let each do some work, wait until all are done
+
+from test.test_support import verbose
+import random
+import dummy_threading as _threading
+import time
+
+
+class TestThread(_threading.Thread):
+
+ def run(self):
+ global running
+ delay = random.random() * 2
+ if verbose:
+ print 'task', self.getName(), 'will run for', delay, 'sec'
+ sema.acquire()
+ mutex.acquire()
+ running = running + 1
+ if verbose:
+ print running, 'tasks are running'
+ mutex.release()
+ time.sleep(delay)
+ if verbose:
+ print 'task', self.getName(), 'done'
+ mutex.acquire()
+ running = running - 1
+ if verbose:
+ print self.getName(), 'is finished.', running, 'tasks are running'
+ mutex.release()
+ sema.release()
+
+def starttasks():
+ for i in range(numtasks):
+ t = TestThread(name="<thread %d>"%i)
+ threads.append(t)
+ t.start()
+
+
+def test_main():
+ # This takes about n/3 seconds to run (about n/3 clumps of tasks, times
+ # about 1 second per clump).
+ global numtasks
+ numtasks = 10
+
+ # no more than 3 of the 10 can run at once
+ global sema
+ sema = _threading.BoundedSemaphore(value=3)
+ global mutex
+ mutex = _threading.RLock()
+ global running
+ running = 0
+
+ global threads
+ threads = []
+
+ starttasks()
+
+ if verbose:
+ print 'waiting for all tasks to complete'
+ for t in threads:
+ t.join()
+ if verbose:
+ print 'all tasks done'
+
+
+
+if __name__ == '__main__':
+ test_main()