diff options
Diffstat (limited to 'src/scons/JobTests.py')
-rw-r--r-- | src/scons/JobTests.py | 180 |
1 files changed, 180 insertions, 0 deletions
diff --git a/src/scons/JobTests.py b/src/scons/JobTests.py new file mode 100644 index 0000000..2bd2bf0 --- /dev/null +++ b/src/scons/JobTests.py @@ -0,0 +1,180 @@ +__revision__ = "JobTests.py __REVISION__ __DATE__ __DEVELOPER__" + +import unittest +import random +import math +import scons.Job +import sys + +# a large number +num_sines = 10000 + +# how many parallel jobs to perform for the test +num_jobs = 11 + +# how many tasks to perform for the test +num_tasks = num_jobs*5 + +class DummyLock: + "fake lock class to use if threads are not supported" + def acquire(self): + pass + + def release(self): + pass + +class NoThreadsException: + "raised by the ParallelTestCase if threads are not supported" + + def __str__(self): + return "the interpreter doesn't support threads" + +class Task: + """A dummy task class for testing purposes.""" + + def __init__(self, i, taskmaster): + self.i = i + self.taskmaster = taskmaster + self.was_executed = 0 + + def execute(self): + self.taskmaster.guard.acquire() + self.taskmaster.begin_list.append(self.i) + self.taskmaster.guard.release() + + # do something that will take some random amount of time: + for i in range(random.randrange(0, num_sines, 1)): + x = math.sin(i) + + self.was_executed = 1 + + self.taskmaster.guard.acquire() + self.taskmaster.end_list.append(self.i) + self.taskmaster.guard.release() + +class Taskmaster: + """A dummy taskmaster class for testing the job classes.""" + + def __init__(self, n, test_case): + """n is the number of dummy tasks to perform.""" + + self.test_case = test_case + self.num_tasks = n + self.num_iterated = 0 + self.num_executed = 0 + # 'guard' guards 'task_begin_list' and 'task_end_list' + try: + import threading + self.guard = threading.Lock() + except: + self.guard = DummyLock() + + # keep track of the order tasks are begun in + self.begin_list = [] + + # keep track of the order tasks are completed in + self.end_list = [] + + + def next_task(self): + if self.all_tasks_are_iterated(): + return None + else: + self.num_iterated = self.num_iterated + 1 + return Task(self.num_iterated, self) + + def all_tasks_are_executed(self): + return self.num_executed == self.num_tasks + + def all_tasks_are_iterated(self): + return self.num_iterated == self.num_tasks + + def executed(self, task): + self.num_executed = self.num_executed + 1 + + self.test_case.failUnless(task.was_executed, + "the task wasn't really executed") + self.test_case.failUnless(task.__class__ is Task, + "the task wasn't really a Task instance") + + + def is_blocked(self): + # simulate blocking tasks + return self.num_iterated - self.num_executed >= max(num_jobs/2, 2) + + def tasks_where_serial(self): + "analyze the task order to see if they were serial" + serial = 1 # assume the tasks where serial + for i in range(num_tasks): + serial = serial and (self.begin_list[i] + == self.end_list[i] + == (i + 1)) + return serial + +class ParallelTestCase(unittest.TestCase): + def runTest(self): + "test parallel jobs" + + try: + import threading + except: + raise NoThreadsException() + + taskmaster = Taskmaster(num_tasks, self) + jobs = [] + for i in range(num_jobs): + jobs.append(scons.Job.Parallel(taskmaster)) + + for job in jobs: + job.start() + + for job in jobs: + job.wait() + + self.failUnless(not taskmaster.tasks_where_serial(), + "the tasks where not executed in parallel") + self.failUnless(taskmaster.all_tasks_are_executed(), + "all the tests where not executed") + self.failUnless(taskmaster.all_tasks_are_iterated(), + "all the tests where not iterated over") + +class SerialTestCase(unittest.TestCase): + def runTest(self): + "test a serial job" + + taskmaster = Taskmaster(num_tasks, self) + job = scons.Job.Serial(taskmaster) + job.start() + self.failUnless(taskmaster.tasks_where_serial(), + "the tasks where not executed in series") + self.failUnless(taskmaster.all_tasks_are_executed(), + "all the tests where not executed") + self.failUnless(taskmaster.all_tasks_are_iterated(), + "all the tests where not iterated over") + +def suite(): + suite = unittest.TestSuite() + suite.addTest(ParallelTestCase()) + suite.addTest(SerialTestCase()) + return suite + +if __name__ == "__main__": + runner = unittest.TextTestRunner() + result = runner.run(suite()) + if (len(result.failures) == 0 + and len(result.errors) == 1 + and type(result.errors[0][0]) == SerialTestCase + and type(result.errors[0][1][0]) == NoThreadsException): + sys.exit(2) + elif not result.wasSuccessful(): + sys.exit(1) + + + + + + + + + + |