summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Doc/library/sched.rst30
-rw-r--r--Doc/whatsnew/3.3.rst4
-rw-r--r--Lib/sched.py62
-rw-r--r--Misc/NEWS3
4 files changed, 45 insertions, 54 deletions
diff --git a/Doc/library/sched.rst b/Doc/library/sched.rst
index a644ed2..455cc70 100644
--- a/Doc/library/sched.rst
+++ b/Doc/library/sched.rst
@@ -27,6 +27,9 @@ scheduler:
.. versionchanged:: 3.3
*timefunc* and *delayfunc* parameters are optional.
+ .. versionchanged:: 3.3
+ :class:`scheduler` class can be safely used in multi-threaded
+ environments.
Example::
@@ -47,33 +50,6 @@ Example::
From print_time 930343700.273
930343700.276
-In multi-threaded environments, the :class:`scheduler` class has limitations
-with respect to thread-safety, inability to insert a new task before
-the one currently pending in a running scheduler, and holding up the main
-thread until the event queue is empty. Instead, the preferred approach
-is to use the :class:`threading.Timer` class instead.
-
-Example::
-
- >>> import time
- >>> from threading import Timer
- >>> def print_time():
- ... print("From print_time", time.time())
- ...
- >>> def print_some_times():
- ... print(time.time())
- ... Timer(5, print_time, ()).start()
- ... Timer(10, print_time, ()).start()
- ... time.sleep(11) # sleep while time-delay events execute
- ... print(time.time())
- ...
- >>> print_some_times()
- 930343690.257
- From print_time 930343695.274
- From print_time 930343700.273
- 930343701.301
-
-
.. _scheduler-objects:
Scheduler Objects
diff --git a/Doc/whatsnew/3.3.rst b/Doc/whatsnew/3.3.rst
index ae275fc..91d3f90 100644
--- a/Doc/whatsnew/3.3.rst
+++ b/Doc/whatsnew/3.3.rst
@@ -662,6 +662,10 @@ should be used. For example, this will send a ``'HEAD'`` request::
sched
-----
+* :class:`~sched.scheduler` class can now be safely used in multi-threaded
+ environments. (Contributed by Josiah Carlson and Giampaolo RodolĂ  in
+ :issue:`8684`)
+
* *timefunc* and *delayfunct* parameters of :class:`~sched.scheduler` class
constructor are now optional and defaults to :func:`time.time` and
:func:`time.sleep` respectively. (Contributed by Chris Clark in
diff --git a/Lib/sched.py b/Lib/sched.py
index 6c01e69..5c4a7b6 100644
--- a/Lib/sched.py
+++ b/Lib/sched.py
@@ -30,6 +30,7 @@ has another way to reference private data (besides global variables).
import time
import heapq
+import threading
from collections import namedtuple
__all__ = ["scheduler"]
@@ -48,6 +49,7 @@ class scheduler:
"""Initialize a new instance, passing the time and delay
functions"""
self._queue = []
+ self._lock = threading.RLock()
self.timefunc = timefunc
self.delayfunc = delayfunc
@@ -58,9 +60,10 @@ class scheduler:
if necessary.
"""
- event = Event(time, priority, action, argument, kwargs)
- heapq.heappush(self._queue, event)
- return event # The ID
+ with self._lock:
+ event = Event(time, priority, action, argument, kwargs)
+ heapq.heappush(self._queue, event)
+ return event # The ID
def enter(self, delay, priority, action, argument=[], kwargs={}):
"""A variant that specifies the time as a relative time.
@@ -68,8 +71,9 @@ class scheduler:
This is actually the more commonly used interface.
"""
- time = self.timefunc() + delay
- return self.enterabs(time, priority, action, argument, kwargs)
+ with self._lock:
+ time = self.timefunc() + delay
+ return self.enterabs(time, priority, action, argument, kwargs)
def cancel(self, event):
"""Remove an event from the queue.
@@ -78,12 +82,14 @@ class scheduler:
If the event is not in the queue, this raises ValueError.
"""
- self._queue.remove(event)
- heapq.heapify(self._queue)
+ with self._lock:
+ self._queue.remove(event)
+ heapq.heapify(self._queue)
def empty(self):
"""Check whether the queue is empty."""
- return not self._queue
+ with self._lock:
+ return not self._queue
def run(self):
"""Execute events until the queue is empty.
@@ -108,24 +114,25 @@ class scheduler:
"""
# localize variable access to minimize overhead
# and to improve thread safety
- q = self._queue
- delayfunc = self.delayfunc
- timefunc = self.timefunc
- pop = heapq.heappop
- while q:
- time, priority, action, argument, kwargs = checked_event = q[0]
- now = timefunc()
- if now < time:
- delayfunc(time - now)
- else:
- event = pop(q)
- # Verify that the event was not removed or altered
- # by another thread after we last looked at q[0].
- if event is checked_event:
- action(*argument, **kwargs)
- delayfunc(0) # Let other threads run
+ with self._lock:
+ q = self._queue
+ delayfunc = self.delayfunc
+ timefunc = self.timefunc
+ pop = heapq.heappop
+ while q:
+ time, priority, action, argument, kwargs = checked_event = q[0]
+ now = timefunc()
+ if now < time:
+ delayfunc(time - now)
else:
- heapq.heappush(q, event)
+ event = pop(q)
+ # Verify that the event was not removed or altered
+ # by another thread after we last looked at q[0].
+ if event is checked_event:
+ action(*argument, **kwargs)
+ delayfunc(0) # Let other threads run
+ else:
+ heapq.heappush(q, event)
@property
def queue(self):
@@ -138,5 +145,6 @@ class scheduler:
# Use heapq to sort the queue rather than using 'sorted(self._queue)'.
# With heapq, two events scheduled at the same time will show in
# the actual order they would be retrieved.
- events = self._queue[:]
- return map(heapq.heappop, [events]*len(events))
+ with self._lock:
+ events = self._queue[:]
+ return map(heapq.heappop, [events]*len(events))
diff --git a/Misc/NEWS b/Misc/NEWS
index bf7399d..a7a3506 100644
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -409,6 +409,9 @@ Core and Builtins
Library
-------
+- Issue #8684 sched.scheduler class can be safely used in multi-threaded
+ environments.
+
- Alias resource.error to OSError ala PEP 3151.
- Issue #5689: Add support for lzma compression to the tarfile module.