summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_asyncio
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_asyncio')
-rw-r--r--Lib/test/test_asyncio/test_events.py9
-rw-r--r--Lib/test/test_asyncio/test_streams.py46
-rw-r--r--Lib/test/test_asyncio/test_subprocess.py29
-rw-r--r--Lib/test/test_asyncio/test_unix_events.py636
4 files changed, 1 insertions, 719 deletions
diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py
index 88c85a3..06eb4d3 100644
--- a/Lib/test/test_asyncio/test_events.py
+++ b/Lib/test/test_asyncio/test_events.py
@@ -2214,7 +2214,7 @@ else:
super().setUp()
with warnings.catch_warnings():
warnings.simplefilter('ignore', DeprecationWarning)
- watcher = asyncio.SafeChildWatcher()
+ watcher = asyncio.ThreadedChildWatcher()
watcher.attach_loop(self.loop)
asyncio.set_child_watcher(watcher)
@@ -2833,13 +2833,6 @@ class GetEventLoopTestsMixin:
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
- if sys.platform != 'win32':
- with warnings.catch_warnings():
- warnings.simplefilter('ignore', DeprecationWarning)
- watcher = asyncio.SafeChildWatcher()
- watcher.attach_loop(self.loop)
- asyncio.set_child_watcher(watcher)
-
def tearDown(self):
try:
if sys.platform != 'win32':
diff --git a/Lib/test/test_asyncio/test_streams.py b/Lib/test/test_asyncio/test_streams.py
index ae943f3..d32b7ff 100644
--- a/Lib/test/test_asyncio/test_streams.py
+++ b/Lib/test/test_asyncio/test_streams.py
@@ -822,52 +822,6 @@ class StreamTests(test_utils.TestCase):
self.assertEqual(msg1, b"hello world 1!\n")
self.assertEqual(msg2, b"hello world 2!\n")
- @unittest.skipIf(sys.platform == 'win32', "Don't have pipes")
- @requires_subprocess()
- def test_read_all_from_pipe_reader(self):
- # See asyncio issue 168. This test is derived from the example
- # subprocess_attach_read_pipe.py, but we configure the
- # StreamReader's limit so that twice it is less than the size
- # of the data writer. Also we must explicitly attach a child
- # watcher to the event loop.
-
- code = """\
-import os, sys
-fd = int(sys.argv[1])
-os.write(fd, b'data')
-os.close(fd)
-"""
- rfd, wfd = os.pipe()
- args = [sys.executable, '-c', code, str(wfd)]
-
- pipe = open(rfd, 'rb', 0)
- reader = asyncio.StreamReader(loop=self.loop, limit=1)
- protocol = asyncio.StreamReaderProtocol(reader, loop=self.loop)
- transport, _ = self.loop.run_until_complete(
- self.loop.connect_read_pipe(lambda: protocol, pipe))
- with warnings.catch_warnings():
- warnings.simplefilter('ignore', DeprecationWarning)
- watcher = asyncio.SafeChildWatcher()
- watcher.attach_loop(self.loop)
- try:
- with warnings.catch_warnings():
- warnings.simplefilter('ignore', DeprecationWarning)
- asyncio.set_child_watcher(watcher)
- create = asyncio.create_subprocess_exec(
- *args,
- pass_fds={wfd},
- )
- proc = self.loop.run_until_complete(create)
- self.loop.run_until_complete(proc.wait())
- finally:
- with warnings.catch_warnings():
- warnings.simplefilter('ignore', DeprecationWarning)
- asyncio.set_child_watcher(None)
-
- os.close(wfd)
- data = self.loop.run_until_complete(reader.read(-1))
- self.assertEqual(data, b'data')
-
def test_streamreader_constructor_without_loop(self):
with self.assertRaisesRegex(RuntimeError, 'no current event loop'):
asyncio.StreamReader()
diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py
index cf1a198..27ae766 100644
--- a/Lib/test/test_asyncio/test_subprocess.py
+++ b/Lib/test/test_asyncio/test_subprocess.py
@@ -631,15 +631,6 @@ class SubprocessMixin:
# the transport was not notified yet
self.assertFalse(killed)
- # Unlike SafeChildWatcher, FastChildWatcher does not pop the
- # callbacks if waitpid() is called elsewhere. Let's clear them
- # manually to avoid a warning when the watcher is detached.
- if (sys.platform != 'win32' and
- isinstance(self, SubprocessFastWatcherTests)):
- with warnings.catch_warnings():
- warnings.simplefilter('ignore', DeprecationWarning)
- asyncio.get_child_watcher()._callbacks.clear()
-
async def _test_popen_error(self, stdin):
if sys.platform == 'win32':
target = 'asyncio.windows_utils.Popen'
@@ -908,26 +899,6 @@ if sys.platform != 'win32':
def _get_watcher(self):
return unix_events.ThreadedChildWatcher()
- class SubprocessSafeWatcherTests(SubprocessWatcherMixin,
- test_utils.TestCase):
-
- def _get_watcher(self):
- with self.assertWarns(DeprecationWarning):
- return unix_events.SafeChildWatcher()
-
- class MultiLoopChildWatcherTests(test_utils.TestCase):
-
- def test_warns(self):
- with self.assertWarns(DeprecationWarning):
- unix_events.MultiLoopChildWatcher()
-
- class SubprocessFastWatcherTests(SubprocessWatcherMixin,
- test_utils.TestCase):
-
- def _get_watcher(self):
- with self.assertWarns(DeprecationWarning):
- return unix_events.FastChildWatcher()
-
@unittest.skipUnless(
unix_events.can_use_pidfd(),
"operating system does not support pidfds",
diff --git a/Lib/test/test_asyncio/test_unix_events.py b/Lib/test/test_asyncio/test_unix_events.py
index 9452213..42fb54a 100644
--- a/Lib/test/test_asyncio/test_unix_events.py
+++ b/Lib/test/test_asyncio/test_unix_events.py
@@ -1138,578 +1138,6 @@ class AbstractChildWatcherTests(unittest.TestCase):
NotImplementedError, watcher.__exit__, f, f, f)
-class BaseChildWatcherTests(unittest.TestCase):
-
- def test_not_implemented(self):
- f = mock.Mock()
- watcher = unix_events.BaseChildWatcher()
- self.assertRaises(
- NotImplementedError, watcher._do_waitpid, f)
-
-
-class ChildWatcherTestsMixin:
-
- ignore_warnings = mock.patch.object(log.logger, "warning")
-
- def setUp(self):
- super().setUp()
- self.loop = self.new_test_loop()
- self.running = False
- self.zombies = {}
-
- with mock.patch.object(
- self.loop, "add_signal_handler") as self.m_add_signal_handler:
- self.watcher = self.create_watcher()
- self.watcher.attach_loop(self.loop)
-
- def waitpid(self, pid, flags):
- if isinstance(self.watcher, asyncio.SafeChildWatcher) or pid != -1:
- self.assertGreater(pid, 0)
- try:
- if pid < 0:
- return self.zombies.popitem()
- else:
- return pid, self.zombies.pop(pid)
- except KeyError:
- pass
- if self.running:
- return 0, 0
- else:
- raise ChildProcessError()
-
- def add_zombie(self, pid, status):
- self.zombies[pid] = status
-
- def waitstatus_to_exitcode(self, status):
- if status > 32768:
- return status - 32768
- elif 32700 < status < 32768:
- return status - 32768
- else:
- return status
-
- def test_create_watcher(self):
- self.m_add_signal_handler.assert_called_once_with(
- signal.SIGCHLD, self.watcher._sig_chld)
-
- def waitpid_mocks(func):
- def wrapped_func(self):
- def patch(target, wrapper):
- return mock.patch(target, wraps=wrapper,
- new_callable=mock.Mock)
-
- with patch('asyncio.unix_events.waitstatus_to_exitcode', self.waitstatus_to_exitcode), \
- patch('os.waitpid', self.waitpid) as m_waitpid:
- func(self, m_waitpid)
- return wrapped_func
-
- @waitpid_mocks
- def test_sigchld(self, m_waitpid):
- # register a child
- callback = mock.Mock()
-
- with self.watcher:
- self.running = True
- self.watcher.add_child_handler(42, callback, 9, 10, 14)
-
- self.assertFalse(callback.called)
-
- # child is running
- self.watcher._sig_chld()
-
- self.assertFalse(callback.called)
-
- # child terminates (returncode 12)
- self.running = False
- self.add_zombie(42, EXITCODE(12))
- self.watcher._sig_chld()
-
- callback.assert_called_once_with(42, 12, 9, 10, 14)
-
- callback.reset_mock()
-
- # ensure that the child is effectively reaped
- self.add_zombie(42, EXITCODE(13))
- with self.ignore_warnings:
- self.watcher._sig_chld()
-
- self.assertFalse(callback.called)
-
- # sigchld called again
- self.zombies.clear()
- self.watcher._sig_chld()
-
- self.assertFalse(callback.called)
-
- @waitpid_mocks
- def test_sigchld_two_children(self, m_waitpid):
- callback1 = mock.Mock()
- callback2 = mock.Mock()
-
- # register child 1
- with self.watcher:
- self.running = True
- self.watcher.add_child_handler(43, callback1, 7, 8)
-
- self.assertFalse(callback1.called)
- self.assertFalse(callback2.called)
-
- # register child 2
- with self.watcher:
- self.watcher.add_child_handler(44, callback2, 147, 18)
-
- self.assertFalse(callback1.called)
- self.assertFalse(callback2.called)
-
- # children are running
- self.watcher._sig_chld()
-
- self.assertFalse(callback1.called)
- self.assertFalse(callback2.called)
-
- # child 1 terminates (signal 3)
- self.add_zombie(43, SIGNAL(3))
- self.watcher._sig_chld()
-
- callback1.assert_called_once_with(43, -3, 7, 8)
- self.assertFalse(callback2.called)
-
- callback1.reset_mock()
-
- # child 2 still running
- self.watcher._sig_chld()
-
- self.assertFalse(callback1.called)
- self.assertFalse(callback2.called)
-
- # child 2 terminates (code 108)
- self.add_zombie(44, EXITCODE(108))
- self.running = False
- self.watcher._sig_chld()
-
- callback2.assert_called_once_with(44, 108, 147, 18)
- self.assertFalse(callback1.called)
-
- callback2.reset_mock()
-
- # ensure that the children are effectively reaped
- self.add_zombie(43, EXITCODE(14))
- self.add_zombie(44, EXITCODE(15))
- with self.ignore_warnings:
- self.watcher._sig_chld()
-
- self.assertFalse(callback1.called)
- self.assertFalse(callback2.called)
-
- # sigchld called again
- self.zombies.clear()
- self.watcher._sig_chld()
-
- self.assertFalse(callback1.called)
- self.assertFalse(callback2.called)
-
- @waitpid_mocks
- def test_sigchld_two_children_terminating_together(self, m_waitpid):
- callback1 = mock.Mock()
- callback2 = mock.Mock()
-
- # register child 1
- with self.watcher:
- self.running = True
- self.watcher.add_child_handler(45, callback1, 17, 8)
-
- self.assertFalse(callback1.called)
- self.assertFalse(callback2.called)
-
- # register child 2
- with self.watcher:
- self.watcher.add_child_handler(46, callback2, 1147, 18)
-
- self.assertFalse(callback1.called)
- self.assertFalse(callback2.called)
-
- # children are running
- self.watcher._sig_chld()
-
- self.assertFalse(callback1.called)
- self.assertFalse(callback2.called)
-
- # child 1 terminates (code 78)
- # child 2 terminates (signal 5)
- self.add_zombie(45, EXITCODE(78))
- self.add_zombie(46, SIGNAL(5))
- self.running = False
- self.watcher._sig_chld()
-
- callback1.assert_called_once_with(45, 78, 17, 8)
- callback2.assert_called_once_with(46, -5, 1147, 18)
-
- callback1.reset_mock()
- callback2.reset_mock()
-
- # ensure that the children are effectively reaped
- self.add_zombie(45, EXITCODE(14))
- self.add_zombie(46, EXITCODE(15))
- with self.ignore_warnings:
- self.watcher._sig_chld()
-
- self.assertFalse(callback1.called)
- self.assertFalse(callback2.called)
-
- @waitpid_mocks
- def test_sigchld_race_condition(self, m_waitpid):
- # register a child
- callback = mock.Mock()
-
- with self.watcher:
- # child terminates before being registered
- self.add_zombie(50, EXITCODE(4))
- self.watcher._sig_chld()
-
- self.watcher.add_child_handler(50, callback, 1, 12)
-
- callback.assert_called_once_with(50, 4, 1, 12)
- callback.reset_mock()
-
- # ensure that the child is effectively reaped
- self.add_zombie(50, SIGNAL(1))
- with self.ignore_warnings:
- self.watcher._sig_chld()
-
- self.assertFalse(callback.called)
-
- @waitpid_mocks
- def test_sigchld_replace_handler(self, m_waitpid):
- callback1 = mock.Mock()
- callback2 = mock.Mock()
-
- # register a child
- with self.watcher:
- self.running = True
- self.watcher.add_child_handler(51, callback1, 19)
-
- self.assertFalse(callback1.called)
- self.assertFalse(callback2.called)
-
- # register the same child again
- with self.watcher:
- self.watcher.add_child_handler(51, callback2, 21)
-
- self.assertFalse(callback1.called)
- self.assertFalse(callback2.called)
-
- # child terminates (signal 8)
- self.running = False
- self.add_zombie(51, SIGNAL(8))
- self.watcher._sig_chld()
-
- callback2.assert_called_once_with(51, -8, 21)
- self.assertFalse(callback1.called)
-
- callback2.reset_mock()
-
- # ensure that the child is effectively reaped
- self.add_zombie(51, EXITCODE(13))
- with self.ignore_warnings:
- self.watcher._sig_chld()
-
- self.assertFalse(callback1.called)
- self.assertFalse(callback2.called)
-
- @waitpid_mocks
- def test_sigchld_remove_handler(self, m_waitpid):
- callback = mock.Mock()
-
- # register a child
- with self.watcher:
- self.running = True
- self.watcher.add_child_handler(52, callback, 1984)
-
- self.assertFalse(callback.called)
-
- # unregister the child
- self.watcher.remove_child_handler(52)
-
- self.assertFalse(callback.called)
-
- # child terminates (code 99)
- self.running = False
- self.add_zombie(52, EXITCODE(99))
- with self.ignore_warnings:
- self.watcher._sig_chld()
-
- self.assertFalse(callback.called)
-
- @waitpid_mocks
- def test_sigchld_unknown_status(self, m_waitpid):
- callback = mock.Mock()
-
- # register a child
- with self.watcher:
- self.running = True
- self.watcher.add_child_handler(53, callback, -19)
-
- self.assertFalse(callback.called)
-
- # terminate with unknown status
- self.zombies[53] = 1178
- self.running = False
- self.watcher._sig_chld()
-
- callback.assert_called_once_with(53, 1178, -19)
-
- callback.reset_mock()
-
- # ensure that the child is effectively reaped
- self.add_zombie(53, EXITCODE(101))
- with self.ignore_warnings:
- self.watcher._sig_chld()
-
- self.assertFalse(callback.called)
-
- @waitpid_mocks
- def test_remove_child_handler(self, m_waitpid):
- callback1 = mock.Mock()
- callback2 = mock.Mock()
- callback3 = mock.Mock()
-
- # register children
- with self.watcher:
- self.running = True
- self.watcher.add_child_handler(54, callback1, 1)
- self.watcher.add_child_handler(55, callback2, 2)
- self.watcher.add_child_handler(56, callback3, 3)
-
- # remove child handler 1
- self.assertTrue(self.watcher.remove_child_handler(54))
-
- # remove child handler 2 multiple times
- self.assertTrue(self.watcher.remove_child_handler(55))
- self.assertFalse(self.watcher.remove_child_handler(55))
- self.assertFalse(self.watcher.remove_child_handler(55))
-
- # all children terminate
- self.add_zombie(54, EXITCODE(0))
- self.add_zombie(55, EXITCODE(1))
- self.add_zombie(56, EXITCODE(2))
- self.running = False
- with self.ignore_warnings:
- self.watcher._sig_chld()
-
- self.assertFalse(callback1.called)
- self.assertFalse(callback2.called)
- callback3.assert_called_once_with(56, 2, 3)
-
- @waitpid_mocks
- def test_sigchld_unhandled_exception(self, m_waitpid):
- callback = mock.Mock()
-
- # register a child
- with self.watcher:
- self.running = True
- self.watcher.add_child_handler(57, callback)
-
- # raise an exception
- m_waitpid.side_effect = ValueError
-
- with mock.patch.object(log.logger,
- 'error') as m_error:
-
- self.assertEqual(self.watcher._sig_chld(), None)
- self.assertTrue(m_error.called)
-
- @waitpid_mocks
- def test_sigchld_child_reaped_elsewhere(self, m_waitpid):
- # register a child
- callback = mock.Mock()
-
- with self.watcher:
- self.running = True
- self.watcher.add_child_handler(58, callback)
-
- self.assertFalse(callback.called)
-
- # child terminates
- self.running = False
- self.add_zombie(58, EXITCODE(4))
-
- # waitpid is called elsewhere
- os.waitpid(58, os.WNOHANG)
-
- m_waitpid.reset_mock()
-
- # sigchld
- with self.ignore_warnings:
- self.watcher._sig_chld()
-
- if isinstance(self.watcher, asyncio.FastChildWatcher):
- # here the FastChildWatcher enters a deadlock
- # (there is no way to prevent it)
- self.assertFalse(callback.called)
- else:
- callback.assert_called_once_with(58, 255)
-
- @waitpid_mocks
- def test_sigchld_unknown_pid_during_registration(self, m_waitpid):
- # register two children
- callback1 = mock.Mock()
- callback2 = mock.Mock()
-
- with self.ignore_warnings, self.watcher:
- self.running = True
- # child 1 terminates
- self.add_zombie(591, EXITCODE(7))
- # an unknown child terminates
- self.add_zombie(593, EXITCODE(17))
-
- self.watcher._sig_chld()
-
- self.watcher.add_child_handler(591, callback1)
- self.watcher.add_child_handler(592, callback2)
-
- callback1.assert_called_once_with(591, 7)
- self.assertFalse(callback2.called)
-
- @waitpid_mocks
- def test_set_loop(self, m_waitpid):
- # register a child
- callback = mock.Mock()
-
- with self.watcher:
- self.running = True
- self.watcher.add_child_handler(60, callback)
-
- # attach a new loop
- old_loop = self.loop
- self.loop = self.new_test_loop()
- patch = mock.patch.object
-
- with patch(old_loop, "remove_signal_handler") as m_old_remove, \
- patch(self.loop, "add_signal_handler") as m_new_add:
-
- self.watcher.attach_loop(self.loop)
-
- m_old_remove.assert_called_once_with(
- signal.SIGCHLD)
- m_new_add.assert_called_once_with(
- signal.SIGCHLD, self.watcher._sig_chld)
-
- # child terminates
- self.running = False
- self.add_zombie(60, EXITCODE(9))
- self.watcher._sig_chld()
-
- callback.assert_called_once_with(60, 9)
-
- @waitpid_mocks
- def test_set_loop_race_condition(self, m_waitpid):
- # register 3 children
- callback1 = mock.Mock()
- callback2 = mock.Mock()
- callback3 = mock.Mock()
-
- with self.watcher:
- self.running = True
- self.watcher.add_child_handler(61, callback1)
- self.watcher.add_child_handler(62, callback2)
- self.watcher.add_child_handler(622, callback3)
-
- # detach the loop
- old_loop = self.loop
- self.loop = None
-
- with mock.patch.object(
- old_loop, "remove_signal_handler") as m_remove_signal_handler:
-
- with self.assertWarnsRegex(
- RuntimeWarning, 'A loop is being detached'):
- self.watcher.attach_loop(None)
-
- m_remove_signal_handler.assert_called_once_with(
- signal.SIGCHLD)
-
- # child 1 & 2 terminate
- self.add_zombie(61, EXITCODE(11))
- self.add_zombie(62, SIGNAL(5))
-
- # SIGCHLD was not caught
- self.assertFalse(callback1.called)
- self.assertFalse(callback2.called)
- self.assertFalse(callback3.called)
-
- # attach a new loop
- self.loop = self.new_test_loop()
-
- with mock.patch.object(
- self.loop, "add_signal_handler") as m_add_signal_handler:
-
- self.watcher.attach_loop(self.loop)
-
- m_add_signal_handler.assert_called_once_with(
- signal.SIGCHLD, self.watcher._sig_chld)
- callback1.assert_called_once_with(61, 11) # race condition!
- callback2.assert_called_once_with(62, -5) # race condition!
- self.assertFalse(callback3.called)
-
- callback1.reset_mock()
- callback2.reset_mock()
-
- # child 3 terminates
- self.running = False
- self.add_zombie(622, EXITCODE(19))
- self.watcher._sig_chld()
-
- self.assertFalse(callback1.called)
- self.assertFalse(callback2.called)
- callback3.assert_called_once_with(622, 19)
-
- @waitpid_mocks
- def test_close(self, m_waitpid):
- # register two children
- callback1 = mock.Mock()
-
- with self.watcher:
- self.running = True
- # child 1 terminates
- self.add_zombie(63, EXITCODE(9))
- # other child terminates
- self.add_zombie(65, EXITCODE(18))
- self.watcher._sig_chld()
-
- self.watcher.add_child_handler(63, callback1)
- self.watcher.add_child_handler(64, callback1)
-
- self.assertEqual(len(self.watcher._callbacks), 1)
- if isinstance(self.watcher, asyncio.FastChildWatcher):
- self.assertEqual(len(self.watcher._zombies), 1)
-
- with mock.patch.object(
- self.loop,
- "remove_signal_handler") as m_remove_signal_handler:
-
- self.watcher.close()
-
- m_remove_signal_handler.assert_called_once_with(
- signal.SIGCHLD)
- self.assertFalse(self.watcher._callbacks)
- if isinstance(self.watcher, asyncio.FastChildWatcher):
- self.assertFalse(self.watcher._zombies)
-
-
-class SafeChildWatcherTests (ChildWatcherTestsMixin, test_utils.TestCase):
- def create_watcher(self):
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", DeprecationWarning)
- return asyncio.SafeChildWatcher()
-
-
-class FastChildWatcherTests (ChildWatcherTestsMixin, test_utils.TestCase):
- def create_watcher(self):
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", DeprecationWarning)
- return asyncio.FastChildWatcher()
-
-
class PolicyTests(unittest.TestCase):
def create_policy(self):
@@ -1739,70 +1167,6 @@ class PolicyTests(unittest.TestCase):
with self.assertWarns(DeprecationWarning):
self.assertIs(watcher, policy.get_child_watcher())
- def test_get_child_watcher_after_set(self):
- policy = self.create_policy()
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", DeprecationWarning)
- watcher = asyncio.FastChildWatcher()
- policy.set_child_watcher(watcher)
-
- self.assertIs(policy._watcher, watcher)
- with self.assertWarns(DeprecationWarning):
- self.assertIs(watcher, policy.get_child_watcher())
-
- def test_get_child_watcher_thread(self):
-
- def f():
- policy.set_event_loop(policy.new_event_loop())
-
- self.assertIsInstance(policy.get_event_loop(),
- asyncio.AbstractEventLoop)
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", DeprecationWarning)
- watcher = policy.get_child_watcher()
-
- self.assertIsInstance(watcher, asyncio.SafeChildWatcher)
- self.assertIsNone(watcher._loop)
-
- policy.get_event_loop().close()
-
- policy = self.create_policy()
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", DeprecationWarning)
- policy.set_child_watcher(asyncio.SafeChildWatcher())
-
- th = threading.Thread(target=f)
- th.start()
- th.join()
-
- def test_child_watcher_replace_mainloop_existing(self):
- policy = self.create_policy()
- loop = policy.new_event_loop()
- policy.set_event_loop(loop)
-
- # Explicitly setup SafeChildWatcher,
- # default ThreadedChildWatcher has no _loop property
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", DeprecationWarning)
- watcher = asyncio.SafeChildWatcher()
- policy.set_child_watcher(watcher)
- watcher.attach_loop(loop)
-
- self.assertIs(watcher._loop, loop)
-
- new_loop = policy.new_event_loop()
- policy.set_event_loop(new_loop)
-
- self.assertIs(watcher._loop, new_loop)
-
- policy.set_event_loop(None)
-
- self.assertIs(watcher._loop, None)
-
- loop.close()
- new_loop.close()
-
-
class TestFunctional(unittest.TestCase):
def setUp(self):