diff options
author | Kumar Aditya <kumaraditya@python.org> | 2025-01-06 12:35:11 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-01-06 12:35:11 (GMT) |
commit | 7e8c571604cd18e65cefd26bfc48082840264549 (patch) | |
tree | 7fddef368e0289da0a6acfeb30b7635005e80689 /Lib/asyncio | |
parent | 657d7b77e5c69967e9c0000b986fa4872d13958c (diff) | |
download | cpython-7e8c571604cd18e65cefd26bfc48082840264549.zip cpython-7e8c571604cd18e65cefd26bfc48082840264549.tar.gz cpython-7e8c571604cd18e65cefd26bfc48082840264549.tar.bz2 |
gh-128340: add thread safe handle for `loop.call_soon_threadsafe` (#128369)
Adds `_ThreadSafeHandle` to be used for callbacks scheduled with `loop.call_soon_threadsafe`.
Diffstat (limited to 'Lib/asyncio')
-rw-r--r-- | Lib/asyncio/base_events.py | 5 | ||||
-rw-r--r-- | Lib/asyncio/events.py | 28 |
2 files changed, 32 insertions, 1 deletions
diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index 5dbe4b2..9e6f6e3 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -873,7 +873,10 @@ class BaseEventLoop(events.AbstractEventLoop): self._check_closed() if self._debug: self._check_callback(callback, 'call_soon_threadsafe') - handle = self._call_soon(callback, args, context) + handle = events._ThreadSafeHandle(callback, args, self, context) + self._ready.append(handle) + if handle._source_traceback: + del handle._source_traceback[-1] if handle._source_traceback: del handle._source_traceback[-1] self._write_to_self() diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py index 6e291d2..2ee9870 100644 --- a/Lib/asyncio/events.py +++ b/Lib/asyncio/events.py @@ -113,6 +113,34 @@ class Handle: self._loop.call_exception_handler(context) self = None # Needed to break cycles when an exception occurs. +# _ThreadSafeHandle is used for callbacks scheduled with call_soon_threadsafe +# and is thread safe unlike Handle which is not thread safe. +class _ThreadSafeHandle(Handle): + + __slots__ = ('_lock',) + + def __init__(self, callback, args, loop, context=None): + super().__init__(callback, args, loop, context) + self._lock = threading.RLock() + + def cancel(self): + with self._lock: + return super().cancel() + + def cancelled(self): + with self._lock: + return super().cancelled() + + def _run(self): + # The event loop checks for cancellation without holding the lock + # It is possible that the handle is cancelled after the check + # but before the callback is called so check it again after acquiring + # the lock and return without calling the callback if it is cancelled. + with self._lock: + if self._cancelled: + return + return super()._run() + class TimerHandle(Handle): """Object returned by timed callback registration methods.""" |