diff options
Diffstat (limited to 'Lib')
-rw-r--r-- | Lib/asyncio/base_events.py | 38 | ||||
-rw-r--r-- | Lib/asyncio/proactor_events.py | 6 | ||||
-rw-r--r-- | Lib/asyncio/selector_events.py | 2 | ||||
-rw-r--r-- | Lib/test/test_asyncio/test_base_events.py | 76 | ||||
-rw-r--r-- | Lib/test/test_asyncio/test_proactor_events.py | 7 | ||||
-rw-r--r-- | Lib/test/test_asyncio/test_subprocess.py | 22 |
6 files changed, 89 insertions, 62 deletions
diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index b1a5422..684c9ec 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -22,6 +22,7 @@ import logging import os import socket import subprocess +import threading import time import traceback import sys @@ -168,7 +169,9 @@ class BaseEventLoop(events.AbstractEventLoop): self._scheduled = [] self._default_executor = None self._internal_fds = 0 - self._running = False + # Identifier of the thread running the event loop, or None if the + # event loop is not running + self._owner = None self._clock_resolution = time.get_clock_info('monotonic').resolution self._exception_handler = None self._debug = (not sys.flags.ignore_environment @@ -246,9 +249,9 @@ class BaseEventLoop(events.AbstractEventLoop): def run_forever(self): """Run until stop() is called.""" self._check_closed() - if self._running: + if self.is_running(): raise RuntimeError('Event loop is running.') - self._running = True + self._owner = threading.get_ident() try: while True: try: @@ -256,7 +259,7 @@ class BaseEventLoop(events.AbstractEventLoop): except _StopError: break finally: - self._running = False + self._owner = None def run_until_complete(self, future): """Run until the Future is done. @@ -311,7 +314,7 @@ class BaseEventLoop(events.AbstractEventLoop): The event loop must not be running. """ - if self._running: + if self.is_running(): raise RuntimeError("Cannot close a running event loop") if self._closed: return @@ -331,7 +334,7 @@ class BaseEventLoop(events.AbstractEventLoop): def is_running(self): """Returns True if the event loop is running.""" - return self._running + return (self._owner is not None) def time(self): """Return the time according to the event loop's clock. @@ -373,7 +376,7 @@ class BaseEventLoop(events.AbstractEventLoop): raise TypeError("coroutines cannot be used with call_at()") self._check_closed() if self._debug: - self._assert_is_current_event_loop() + self._check_thread() timer = events.TimerHandle(when, callback, args, self) if timer._source_traceback: del timer._source_traceback[-1] @@ -391,17 +394,17 @@ class BaseEventLoop(events.AbstractEventLoop): Any positional arguments after the callback will be passed to the callback when it is called. """ - handle = self._call_soon(callback, args, check_loop=True) + if self._debug: + self._check_thread() + handle = self._call_soon(callback, args) if handle._source_traceback: del handle._source_traceback[-1] return handle - def _call_soon(self, callback, args, check_loop): + def _call_soon(self, callback, args): if (coroutines.iscoroutine(callback) or coroutines.iscoroutinefunction(callback)): raise TypeError("coroutines cannot be used with call_soon()") - if self._debug and check_loop: - self._assert_is_current_event_loop() self._check_closed() handle = events.Handle(callback, args, self) if handle._source_traceback: @@ -409,8 +412,8 @@ class BaseEventLoop(events.AbstractEventLoop): self._ready.append(handle) return handle - def _assert_is_current_event_loop(self): - """Asserts that this event loop is the current event loop. + def _check_thread(self): + """Check that the current thread is the thread running the event loop. Non-thread-safe methods of this class make this assumption and will likely behave incorrectly when the assumption is violated. @@ -418,18 +421,17 @@ class BaseEventLoop(events.AbstractEventLoop): Should only be called when (self._debug == True). The caller is responsible for checking this condition for performance reasons. """ - try: - current = events.get_event_loop() - except RuntimeError: + if self._owner is None: return - if current is not self: + thread_id = threading.get_ident() + if thread_id != self._owner: raise RuntimeError( "Non-thread-safe operation invoked on an event loop other " "than the current one") def call_soon_threadsafe(self, callback, *args): """Like call_soon(), but thread-safe.""" - handle = self._call_soon(callback, args, check_loop=False) + handle = self._call_soon(callback, args) if handle._source_traceback: del handle._source_traceback[-1] self._write_to_self() diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py index e67cf65..44a8197 100644 --- a/Lib/asyncio/proactor_events.py +++ b/Lib/asyncio/proactor_events.py @@ -383,7 +383,7 @@ class BaseProactorEventLoop(base_events.BaseEventLoop): sock, protocol, waiter, extra) def close(self): - if self._running: + if self.is_running(): raise RuntimeError("Cannot close a running event loop") if self.is_closed(): return @@ -432,9 +432,7 @@ class BaseProactorEventLoop(base_events.BaseEventLoop): self._ssock.setblocking(False) self._csock.setblocking(False) self._internal_fds += 1 - # don't check the current loop because _make_self_pipe() is called - # from the event loop constructor - self._call_soon(self._loop_self_reading, (), check_loop=False) + self.call_soon(self._loop_self_reading) def _loop_self_reading(self, f=None): try: diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index 7df8b86..a97709d 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -68,7 +68,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): address, waiter, extra) def close(self): - if self._running: + if self.is_running(): raise RuntimeError("Cannot close a running event loop") if self.is_closed(): return diff --git a/Lib/test/test_asyncio/test_base_events.py b/Lib/test/test_asyncio/test_base_events.py index e840915..6599e4e 100644 --- a/Lib/test/test_asyncio/test_base_events.py +++ b/Lib/test/test_asyncio/test_base_events.py @@ -5,6 +5,7 @@ import logging import math import socket import sys +import threading import time import unittest from unittest import mock @@ -148,28 +149,71 @@ class BaseEventLoopTests(test_utils.TestCase): # are really slow self.assertLessEqual(dt, 0.9, dt) - def test_assert_is_current_event_loop(self): + def check_thread(self, loop, debug): def cb(): pass - other_loop = base_events.BaseEventLoop() - other_loop._selector = mock.Mock() - asyncio.set_event_loop(other_loop) + loop.set_debug(debug) + if debug: + msg = ("Non-thread-safe operation invoked on an event loop other " + "than the current one") + with self.assertRaisesRegex(RuntimeError, msg): + loop.call_soon(cb) + with self.assertRaisesRegex(RuntimeError, msg): + loop.call_later(60, cb) + with self.assertRaisesRegex(RuntimeError, msg): + loop.call_at(loop.time() + 60, cb) + else: + loop.call_soon(cb) + loop.call_later(60, cb) + loop.call_at(loop.time() + 60, cb) + + def test_check_thread(self): + def check_in_thread(loop, event, debug, create_loop, fut): + # wait until the event loop is running + event.wait() + + try: + if create_loop: + loop2 = base_events.BaseEventLoop() + try: + asyncio.set_event_loop(loop2) + self.check_thread(loop, debug) + finally: + asyncio.set_event_loop(None) + loop2.close() + else: + self.check_thread(loop, debug) + except Exception as exc: + loop.call_soon_threadsafe(fut.set_exception, exc) + else: + loop.call_soon_threadsafe(fut.set_result, None) + + def test_thread(loop, debug, create_loop=False): + event = threading.Event() + fut = asyncio.Future(loop=loop) + loop.call_soon(event.set) + args = (loop, event, debug, create_loop, fut) + thread = threading.Thread(target=check_in_thread, args=args) + thread.start() + loop.run_until_complete(fut) + thread.join() - # raise RuntimeError if the event loop is different in debug mode - self.loop.set_debug(True) - with self.assertRaises(RuntimeError): - self.loop.call_soon(cb) - with self.assertRaises(RuntimeError): - self.loop.call_later(60, cb) - with self.assertRaises(RuntimeError): - self.loop.call_at(self.loop.time() + 60, cb) + self.loop._process_events = mock.Mock() + self.loop._write_to_self = mock.Mock() + + # raise RuntimeError if the thread has no event loop + test_thread(self.loop, True) # check disabled if debug mode is disabled - self.loop.set_debug(False) - self.loop.call_soon(cb) - self.loop.call_later(60, cb) - self.loop.call_at(self.loop.time() + 60, cb) + test_thread(self.loop, False) + + # raise RuntimeError if the event loop of the thread is not the called + # event loop + test_thread(self.loop, True, create_loop=True) + + # check disabled if debug mode is disabled + test_thread(self.loop, False, create_loop=True) def test_run_once_in_executor_handle(self): def cb(): diff --git a/Lib/test/test_asyncio/test_proactor_events.py b/Lib/test/test_asyncio/test_proactor_events.py index 9e9b41a..8258238 100644 --- a/Lib/test/test_asyncio/test_proactor_events.py +++ b/Lib/test/test_asyncio/test_proactor_events.py @@ -440,17 +440,16 @@ class BaseProactorEventLoopTests(test_utils.TestCase): self.loop = EventLoop(self.proactor) self.set_event_loop(self.loop, cleanup=False) - @mock.patch.object(BaseProactorEventLoop, '_call_soon') + @mock.patch.object(BaseProactorEventLoop, 'call_soon') @mock.patch.object(BaseProactorEventLoop, '_socketpair') - def test_ctor(self, socketpair, _call_soon): + def test_ctor(self, socketpair, call_soon): ssock, csock = socketpair.return_value = ( mock.Mock(), mock.Mock()) loop = BaseProactorEventLoop(self.proactor) self.assertIs(loop._ssock, ssock) self.assertIs(loop._csock, csock) self.assertEqual(loop._internal_fds, 1) - _call_soon.assert_called_with(loop._loop_self_reading, (), - check_loop=False) + call_soon.assert_called_with(loop._loop_self_reading) def test_close_self_pipe(self): self.loop._close_self_pipe() diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py index 55c47b5..d82cbbf 100644 --- a/Lib/test/test_asyncio/test_subprocess.py +++ b/Lib/test/test_asyncio/test_subprocess.py @@ -233,19 +233,12 @@ if sys.platform != 'win32': def setUp(self): policy = asyncio.get_event_loop_policy() self.loop = policy.new_event_loop() - - # ensure that the event loop is passed explicitly in asyncio - policy.set_event_loop(None) + self.set_event_loop(self.loop) watcher = self.Watcher() watcher.attach_loop(self.loop) policy.set_child_watcher(watcher) - - def tearDown(self): - policy = asyncio.get_event_loop_policy() - policy.set_child_watcher(None) - self.loop.close() - super().tearDown() + self.addCleanup(policy.set_child_watcher, None) class SubprocessSafeWatcherTests(SubprocessWatcherMixin, test_utils.TestCase): @@ -262,17 +255,8 @@ else: class SubprocessProactorTests(SubprocessMixin, test_utils.TestCase): def setUp(self): - policy = asyncio.get_event_loop_policy() self.loop = asyncio.ProactorEventLoop() - - # ensure that the event loop is passed explicitly in asyncio - policy.set_event_loop(None) - - def tearDown(self): - policy = asyncio.get_event_loop_policy() - self.loop.close() - policy.set_event_loop(None) - super().tearDown() + self.set_event_loop(self.loop) if __name__ == '__main__': |