diff options
Diffstat (limited to 'Lib/test/test_sched.py')
-rw-r--r-- | Lib/test/test_sched.py | 135 |
1 files changed, 129 insertions, 6 deletions
diff --git a/Lib/test/test_sched.py b/Lib/test/test_sched.py index 91b8f0c..1fe6ad4 100644 --- a/Lib/test/test_sched.py +++ b/Lib/test/test_sched.py @@ -1,9 +1,44 @@ #!/usr/bin/env python +import queue import sched import time import unittest from test import support +try: + import threading +except ImportError: + threading = None + +TIMEOUT = 10 + + +class Timer: + def __init__(self): + self._cond = threading.Condition() + self._time = 0 + self._stop = 0 + + def time(self): + with self._cond: + return self._time + + # increase the time but not beyond the established limit + def sleep(self, t): + assert t >= 0 + with self._cond: + t += self._time + while self._stop < t: + self._time = self._stop + self._cond.wait() + self._time = t + + # advance time limit for user code + def advance(self, t): + assert t >= 0 + with self._cond: + self._stop += t + self._cond.notify_all() class TestCase(unittest.TestCase): @@ -26,6 +61,37 @@ class TestCase(unittest.TestCase): scheduler.run() self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05]) + @unittest.skipUnless(threading, 'Threading required for this test.') + def test_enter_concurrent(self): + q = queue.Queue() + fun = q.put + timer = Timer() + scheduler = sched.scheduler(timer.time, timer.sleep) + scheduler.enter(1, 1, fun, (1,)) + scheduler.enter(3, 1, fun, (3,)) + t = threading.Thread(target=scheduler.run) + t.start() + timer.advance(1) + self.assertEqual(q.get(timeout=TIMEOUT), 1) + self.assertTrue(q.empty()) + for x in [4, 5, 2]: + z = scheduler.enter(x - 1, 1, fun, (x,)) + timer.advance(2) + self.assertEqual(q.get(timeout=TIMEOUT), 2) + self.assertEqual(q.get(timeout=TIMEOUT), 3) + self.assertTrue(q.empty()) + timer.advance(1) + self.assertEqual(q.get(timeout=TIMEOUT), 4) + self.assertTrue(q.empty()) + timer.advance(1) + self.assertEqual(q.get(timeout=TIMEOUT), 5) + self.assertTrue(q.empty()) + timer.advance(1000) + t.join(timeout=TIMEOUT) + self.assertFalse(t.is_alive()) + self.assertTrue(q.empty()) + self.assertEqual(timer.time(), 5) + def test_priority(self): l = [] fun = lambda x: l.append(x) @@ -50,6 +116,39 @@ class TestCase(unittest.TestCase): scheduler.run() self.assertEqual(l, [0.02, 0.03, 0.04]) + @unittest.skipUnless(threading, 'Threading required for this test.') + def test_cancel_concurrent(self): + q = queue.Queue() + fun = q.put + timer = Timer() + scheduler = sched.scheduler(timer.time, timer.sleep) + now = timer.time() + event1 = scheduler.enterabs(now + 1, 1, fun, (1,)) + event2 = scheduler.enterabs(now + 2, 1, fun, (2,)) + event4 = scheduler.enterabs(now + 4, 1, fun, (4,)) + event5 = scheduler.enterabs(now + 5, 1, fun, (5,)) + event3 = scheduler.enterabs(now + 3, 1, fun, (3,)) + t = threading.Thread(target=scheduler.run) + t.start() + timer.advance(1) + self.assertEqual(q.get(timeout=TIMEOUT), 1) + self.assertTrue(q.empty()) + scheduler.cancel(event2) + scheduler.cancel(event5) + timer.advance(1) + self.assertTrue(q.empty()) + timer.advance(1) + self.assertEqual(q.get(timeout=TIMEOUT), 3) + self.assertTrue(q.empty()) + timer.advance(1) + self.assertEqual(q.get(timeout=TIMEOUT), 4) + self.assertTrue(q.empty()) + timer.advance(1000) + t.join(timeout=TIMEOUT) + self.assertFalse(t.is_alive()) + self.assertTrue(q.empty()) + self.assertEqual(timer.time(), 4) + def test_empty(self): l = [] fun = lambda x: l.append(x) @@ -63,15 +162,39 @@ class TestCase(unittest.TestCase): def test_queue(self): l = [] - events = [] fun = lambda x: l.append(x) scheduler = sched.scheduler(time.time, time.sleep) - self.assertEqual(scheduler._queue, []) - for x in [0.05, 0.04, 0.03, 0.02, 0.01]: - events.append(scheduler.enterabs(x, 1, fun, (x,))) - self.assertEqual(scheduler._queue.sort(), events.sort()) + now = time.time() + e5 = scheduler.enterabs(now + 0.05, 1, fun) + e1 = scheduler.enterabs(now + 0.01, 1, fun) + e2 = scheduler.enterabs(now + 0.02, 1, fun) + e4 = scheduler.enterabs(now + 0.04, 1, fun) + e3 = scheduler.enterabs(now + 0.03, 1, fun) + # queue property is supposed to return an order list of + # upcoming events + self.assertEqual(list(scheduler.queue), [e1, e2, e3, e4, e5]) + + def test_args_kwargs(self): + flag = [] + + def fun(*a, **b): + flag.append(None) + self.assertEqual(a, (1,2,3)) + self.assertEqual(b, {"foo":1}) + + scheduler = sched.scheduler(time.time, time.sleep) + z = scheduler.enterabs(0.01, 1, fun, argument=(1,2,3), kwargs={"foo":1}) scheduler.run() - self.assertEqual(scheduler._queue, []) + self.assertEqual(flag, [None]) + + def test_run_non_blocking(self): + l = [] + fun = lambda x: l.append(x) + scheduler = sched.scheduler(time.time, time.sleep) + for x in [10, 9, 8, 7, 6]: + scheduler.enter(x, 1, fun, (x,)) + scheduler.run(blocking=False) + self.assertEqual(l, []) def test_main(): |