diff options
author | Giampaolo Rodola' <g.rodola@gmail.com> | 2011-12-14 12:34:26 (GMT) |
---|---|---|
committer | Giampaolo Rodola' <g.rodola@gmail.com> | 2011-12-14 12:34:26 (GMT) |
commit | 73520d57ebfb1272d009a070191e749caebf64ae (patch) | |
tree | 1a8febdb66d40004af7a2668bd5b7d1249602cf5 /Lib/sched.py | |
parent | a23d65ccfe3b8f618bf3dde4d0d27895e764367e (diff) | |
download | cpython-73520d57ebfb1272d009a070191e749caebf64ae.zip cpython-73520d57ebfb1272d009a070191e749caebf64ae.tar.gz cpython-73520d57ebfb1272d009a070191e749caebf64ae.tar.bz2 |
Fix #8684: make sched.scheduler class thread-safe
Diffstat (limited to 'Lib/sched.py')
-rw-r--r-- | Lib/sched.py | 62 |
1 files changed, 35 insertions, 27 deletions
diff --git a/Lib/sched.py b/Lib/sched.py index 6c01e69..5c4a7b6 100644 --- a/Lib/sched.py +++ b/Lib/sched.py @@ -30,6 +30,7 @@ has another way to reference private data (besides global variables). import time import heapq +import threading from collections import namedtuple __all__ = ["scheduler"] @@ -48,6 +49,7 @@ class scheduler: """Initialize a new instance, passing the time and delay functions""" self._queue = [] + self._lock = threading.RLock() self.timefunc = timefunc self.delayfunc = delayfunc @@ -58,9 +60,10 @@ class scheduler: if necessary. """ - event = Event(time, priority, action, argument, kwargs) - heapq.heappush(self._queue, event) - return event # The ID + with self._lock: + event = Event(time, priority, action, argument, kwargs) + heapq.heappush(self._queue, event) + return event # The ID def enter(self, delay, priority, action, argument=[], kwargs={}): """A variant that specifies the time as a relative time. @@ -68,8 +71,9 @@ class scheduler: This is actually the more commonly used interface. """ - time = self.timefunc() + delay - return self.enterabs(time, priority, action, argument, kwargs) + with self._lock: + time = self.timefunc() + delay + return self.enterabs(time, priority, action, argument, kwargs) def cancel(self, event): """Remove an event from the queue. @@ -78,12 +82,14 @@ class scheduler: If the event is not in the queue, this raises ValueError. """ - self._queue.remove(event) - heapq.heapify(self._queue) + with self._lock: + self._queue.remove(event) + heapq.heapify(self._queue) def empty(self): """Check whether the queue is empty.""" - return not self._queue + with self._lock: + return not self._queue def run(self): """Execute events until the queue is empty. @@ -108,24 +114,25 @@ class scheduler: """ # localize variable access to minimize overhead # and to improve thread safety - q = self._queue - delayfunc = self.delayfunc - timefunc = self.timefunc - pop = heapq.heappop - while q: - time, priority, action, argument, kwargs = checked_event = q[0] - now = timefunc() - if now < time: - delayfunc(time - now) - else: - event = pop(q) - # Verify that the event was not removed or altered - # by another thread after we last looked at q[0]. - if event is checked_event: - action(*argument, **kwargs) - delayfunc(0) # Let other threads run + with self._lock: + q = self._queue + delayfunc = self.delayfunc + timefunc = self.timefunc + pop = heapq.heappop + while q: + time, priority, action, argument, kwargs = checked_event = q[0] + now = timefunc() + if now < time: + delayfunc(time - now) else: - heapq.heappush(q, event) + event = pop(q) + # Verify that the event was not removed or altered + # by another thread after we last looked at q[0]. + if event is checked_event: + action(*argument, **kwargs) + delayfunc(0) # Let other threads run + else: + heapq.heappush(q, event) @property def queue(self): @@ -138,5 +145,6 @@ class scheduler: # Use heapq to sort the queue rather than using 'sorted(self._queue)'. # With heapq, two events scheduled at the same time will show in # the actual order they would be retrieved. - events = self._queue[:] - return map(heapq.heappop, [events]*len(events)) + with self._lock: + events = self._queue[:] + return map(heapq.heappop, [events]*len(events)) |