summaryrefslogtreecommitdiffstats
path: root/Lib/asyncio
diff options
context:
space:
mode:
authorKumar Aditya <kumaraditya@python.org>2025-01-06 12:35:11 (GMT)
committerGitHub <noreply@github.com>2025-01-06 12:35:11 (GMT)
commit7e8c571604cd18e65cefd26bfc48082840264549 (patch)
tree7fddef368e0289da0a6acfeb30b7635005e80689 /Lib/asyncio
parent657d7b77e5c69967e9c0000b986fa4872d13958c (diff)
downloadcpython-7e8c571604cd18e65cefd26bfc48082840264549.zip
cpython-7e8c571604cd18e65cefd26bfc48082840264549.tar.gz
cpython-7e8c571604cd18e65cefd26bfc48082840264549.tar.bz2
gh-128340: add thread safe handle for `loop.call_soon_threadsafe` (#128369)
Adds `_ThreadSafeHandle` to be used for callbacks scheduled with `loop.call_soon_threadsafe`.
Diffstat (limited to 'Lib/asyncio')
-rw-r--r--Lib/asyncio/base_events.py5
-rw-r--r--Lib/asyncio/events.py28
2 files changed, 32 insertions, 1 deletions
diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py
index 5dbe4b2..9e6f6e3 100644
--- a/Lib/asyncio/base_events.py
+++ b/Lib/asyncio/base_events.py
@@ -873,7 +873,10 @@ class BaseEventLoop(events.AbstractEventLoop):
self._check_closed()
if self._debug:
self._check_callback(callback, 'call_soon_threadsafe')
- handle = self._call_soon(callback, args, context)
+ handle = events._ThreadSafeHandle(callback, args, self, context)
+ self._ready.append(handle)
+ if handle._source_traceback:
+ del handle._source_traceback[-1]
if handle._source_traceback:
del handle._source_traceback[-1]
self._write_to_self()
diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py
index 6e291d2..2ee9870 100644
--- a/Lib/asyncio/events.py
+++ b/Lib/asyncio/events.py
@@ -113,6 +113,34 @@ class Handle:
self._loop.call_exception_handler(context)
self = None # Needed to break cycles when an exception occurs.
+# _ThreadSafeHandle is used for callbacks scheduled with call_soon_threadsafe
+# and is thread safe unlike Handle which is not thread safe.
+class _ThreadSafeHandle(Handle):
+
+ __slots__ = ('_lock',)
+
+ def __init__(self, callback, args, loop, context=None):
+ super().__init__(callback, args, loop, context)
+ self._lock = threading.RLock()
+
+ def cancel(self):
+ with self._lock:
+ return super().cancel()
+
+ def cancelled(self):
+ with self._lock:
+ return super().cancelled()
+
+ def _run(self):
+ # The event loop checks for cancellation without holding the lock
+ # It is possible that the handle is cancelled after the check
+ # but before the callback is called so check it again after acquiring
+ # the lock and return without calling the callback if it is cancelled.
+ with self._lock:
+ if self._cancelled:
+ return
+ return super()._run()
+
class TimerHandle(Handle):
"""Object returned by timed callback registration methods."""