summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_asyncio/test_events.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_asyncio/test_events.py')
-rw-r--r--Lib/test/test_asyncio/test_events.py269
1 files changed, 155 insertions, 114 deletions
diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py
index 7a4d698..44879ff 100644
--- a/Lib/test/test_asyncio/test_events.py
+++ b/Lib/test/test_asyncio/test_events.py
@@ -23,14 +23,9 @@ import unittest.mock
from test import support # find_unused_port, IPV6_ENABLED, TEST_HOME_DIR
-from asyncio import futures
+import asyncio
from asyncio import events
-from asyncio import transports
-from asyncio import protocols
-from asyncio import selector_events
-from asyncio import tasks
from asyncio import test_utils
-from asyncio import locks
def data_file(filename):
@@ -49,7 +44,7 @@ SIGNED_CERTFILE = data_file('keycert3.pem')
SIGNING_CA = data_file('pycacert.pem')
-class MyProto(protocols.Protocol):
+class MyProto(asyncio.Protocol):
done = None
def __init__(self, loop=None):
@@ -57,7 +52,7 @@ class MyProto(protocols.Protocol):
self.state = 'INITIAL'
self.nbytes = 0
if loop is not None:
- self.done = futures.Future(loop=loop)
+ self.done = asyncio.Future(loop=loop)
def connection_made(self, transport):
self.transport = transport
@@ -80,14 +75,14 @@ class MyProto(protocols.Protocol):
self.done.set_result(None)
-class MyDatagramProto(protocols.DatagramProtocol):
+class MyDatagramProto(asyncio.DatagramProtocol):
done = None
def __init__(self, loop=None):
self.state = 'INITIAL'
self.nbytes = 0
if loop is not None:
- self.done = futures.Future(loop=loop)
+ self.done = asyncio.Future(loop=loop)
def connection_made(self, transport):
self.transport = transport
@@ -108,7 +103,7 @@ class MyDatagramProto(protocols.DatagramProtocol):
self.done.set_result(None)
-class MyReadPipeProto(protocols.Protocol):
+class MyReadPipeProto(asyncio.Protocol):
done = None
def __init__(self, loop=None):
@@ -116,7 +111,7 @@ class MyReadPipeProto(protocols.Protocol):
self.nbytes = 0
self.transport = None
if loop is not None:
- self.done = futures.Future(loop=loop)
+ self.done = asyncio.Future(loop=loop)
def connection_made(self, transport):
self.transport = transport
@@ -140,14 +135,14 @@ class MyReadPipeProto(protocols.Protocol):
self.done.set_result(None)
-class MyWritePipeProto(protocols.BaseProtocol):
+class MyWritePipeProto(asyncio.BaseProtocol):
done = None
def __init__(self, loop=None):
self.state = 'INITIAL'
self.transport = None
if loop is not None:
- self.done = futures.Future(loop=loop)
+ self.done = asyncio.Future(loop=loop)
def connection_made(self, transport):
self.transport = transport
@@ -161,18 +156,18 @@ class MyWritePipeProto(protocols.BaseProtocol):
self.done.set_result(None)
-class MySubprocessProtocol(protocols.SubprocessProtocol):
+class MySubprocessProtocol(asyncio.SubprocessProtocol):
def __init__(self, loop):
self.state = 'INITIAL'
self.transport = None
- self.connected = futures.Future(loop=loop)
- self.completed = futures.Future(loop=loop)
- self.disconnects = {fd: futures.Future(loop=loop) for fd in range(3)}
+ self.connected = asyncio.Future(loop=loop)
+ self.completed = asyncio.Future(loop=loop)
+ self.disconnects = {fd: asyncio.Future(loop=loop) for fd in range(3)}
self.data = {1: b'', 2: b''}
self.returncode = None
- self.got_data = {1: locks.Event(loop=loop),
- 2: locks.Event(loop=loop)}
+ self.got_data = {1: asyncio.Event(loop=loop),
+ 2: asyncio.Event(loop=loop)}
def connection_made(self, transport):
self.transport = transport
@@ -207,7 +202,7 @@ class EventLoopTestsMixin:
def setUp(self):
super().setUp()
self.loop = self.create_event_loop()
- events.set_event_loop(None)
+ asyncio.set_event_loop(None)
def tearDown(self):
# just in case if we have transport close callbacks
@@ -218,11 +213,11 @@ class EventLoopTestsMixin:
super().tearDown()
def test_run_until_complete_nesting(self):
- @tasks.coroutine
+ @asyncio.coroutine
def coro1():
yield
- @tasks.coroutine
+ @asyncio.coroutine
def coro2():
self.assertTrue(self.loop.is_running())
self.loop.run_until_complete(coro1())
@@ -235,15 +230,15 @@ class EventLoopTestsMixin:
def test_run_until_complete(self):
t0 = self.loop.time()
- self.loop.run_until_complete(tasks.sleep(0.1, loop=self.loop))
+ self.loop.run_until_complete(asyncio.sleep(0.1, loop=self.loop))
t1 = self.loop.time()
self.assertTrue(0.08 <= t1-t0 <= 0.8, t1-t0)
def test_run_until_complete_stopped(self):
- @tasks.coroutine
+ @asyncio.coroutine
def cb():
self.loop.stop()
- yield from tasks.sleep(0.1, loop=self.loop)
+ yield from asyncio.sleep(0.1, loop=self.loop)
task = cb()
self.assertRaises(RuntimeError,
self.loop.run_until_complete, task)
@@ -494,8 +489,8 @@ class EventLoopTestsMixin:
f = self.loop.create_connection(
lambda: MyProto(loop=self.loop), *httpd.address)
tr, pr = self.loop.run_until_complete(f)
- self.assertIsInstance(tr, transports.Transport)
- self.assertIsInstance(pr, protocols.Protocol)
+ self.assertIsInstance(tr, asyncio.Transport)
+ self.assertIsInstance(pr, asyncio.Protocol)
self.loop.run_until_complete(pr.done)
self.assertGreater(pr.nbytes, 0)
tr.close()
@@ -522,8 +517,8 @@ class EventLoopTestsMixin:
f = self.loop.create_connection(
lambda: MyProto(loop=self.loop), sock=sock)
tr, pr = self.loop.run_until_complete(f)
- self.assertIsInstance(tr, transports.Transport)
- self.assertIsInstance(pr, protocols.Protocol)
+ self.assertIsInstance(tr, asyncio.Transport)
+ self.assertIsInstance(pr, asyncio.Protocol)
self.loop.run_until_complete(pr.done)
self.assertGreater(pr.nbytes, 0)
tr.close()
@@ -535,8 +530,8 @@ class EventLoopTestsMixin:
lambda: MyProto(loop=self.loop), *httpd.address,
ssl=test_utils.dummy_ssl_context())
tr, pr = self.loop.run_until_complete(f)
- self.assertIsInstance(tr, transports.Transport)
- self.assertIsInstance(pr, protocols.Protocol)
+ self.assertIsInstance(tr, asyncio.Transport)
+ self.assertIsInstance(pr, asyncio.Protocol)
self.assertTrue('ssl' in tr.__class__.__name__.lower())
self.assertIsNotNone(tr.get_extra_info('sockname'))
self.loop.run_until_complete(pr.done)
@@ -762,7 +757,7 @@ class EventLoopTestsMixin:
server.close()
def test_create_server_sock(self):
- proto = futures.Future(loop=self.loop)
+ proto = asyncio.Future(loop=self.loop)
class TestMyProto(MyProto):
def connection_made(self, transport):
@@ -805,7 +800,7 @@ class EventLoopTestsMixin:
@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 not supported or enabled')
def test_create_server_dual_stack(self):
- f_proto = futures.Future(loop=self.loop)
+ f_proto = asyncio.Future(loop=self.loop)
class TestMyProto(MyProto):
def connection_made(self, transport):
@@ -834,7 +829,7 @@ class EventLoopTestsMixin:
proto.transport.close()
client.close()
- f_proto = futures.Future(loop=self.loop)
+ f_proto = asyncio.Future(loop=self.loop)
client = socket.socket(socket.AF_INET6)
client.connect(('::1', port))
client.send(b'xxx')
@@ -907,7 +902,7 @@ class EventLoopTestsMixin:
def test_internal_fds(self):
loop = self.create_event_loop()
- if not isinstance(loop, selector_events.BaseSelectorEventLoop):
+ if not isinstance(loop, asyncio.BaseSelectorEventLoop):
self.skipTest('loop is not a BaseSelectorEventLoop')
self.assertEqual(1, loop._internal_fds)
@@ -929,7 +924,7 @@ class EventLoopTestsMixin:
rpipe, wpipe = os.pipe()
pipeobj = io.open(rpipe, 'rb', 1024)
- @tasks.coroutine
+ @asyncio.coroutine
def connect():
t, p = yield from self.loop.connect_read_pipe(factory, pipeobj)
self.assertIs(p, proto)
@@ -957,9 +952,6 @@ class EventLoopTestsMixin:
@unittest.skipUnless(sys.platform != 'win32',
"Don't support pipes for Windows")
- # kqueue doesn't support character devices (PTY) on Mac OS X older
- # than 10.9 (Maverick)
- @support.requires_mac_ver(10, 9)
def test_read_pty_output(self):
proto = None
@@ -971,7 +963,7 @@ class EventLoopTestsMixin:
master, slave = os.openpty()
master_read_obj = io.open(master, 'rb', 0)
- @tasks.coroutine
+ @asyncio.coroutine
def connect():
t, p = yield from self.loop.connect_read_pipe(factory,
master_read_obj)
@@ -1012,7 +1004,7 @@ class EventLoopTestsMixin:
rpipe, wpipe = os.pipe()
pipeobj = io.open(wpipe, 'wb', 1024)
- @tasks.coroutine
+ @asyncio.coroutine
def connect():
nonlocal transport
t, p = yield from self.loop.connect_write_pipe(factory, pipeobj)
@@ -1058,7 +1050,7 @@ class EventLoopTestsMixin:
rsock, wsock = test_utils.socketpair()
pipeobj = io.open(wsock.detach(), 'wb', 1024)
- @tasks.coroutine
+ @asyncio.coroutine
def connect():
nonlocal transport
t, p = yield from self.loop.connect_write_pipe(factory,
@@ -1080,6 +1072,53 @@ class EventLoopTestsMixin:
self.loop.run_until_complete(proto.done)
self.assertEqual('CLOSED', proto.state)
+ @unittest.skipUnless(sys.platform != 'win32',
+ "Don't support pipes for Windows")
+ def test_write_pty(self):
+ proto = None
+ transport = None
+
+ def factory():
+ nonlocal proto
+ proto = MyWritePipeProto(loop=self.loop)
+ return proto
+
+ master, slave = os.openpty()
+ slave_write_obj = io.open(slave, 'wb', 0)
+
+ @asyncio.coroutine
+ def connect():
+ nonlocal transport
+ t, p = yield from self.loop.connect_write_pipe(factory,
+ slave_write_obj)
+ self.assertIs(p, proto)
+ self.assertIs(t, proto.transport)
+ self.assertEqual('CONNECTED', proto.state)
+ transport = t
+
+ self.loop.run_until_complete(connect())
+
+ transport.write(b'1')
+ test_utils.run_briefly(self.loop)
+ data = os.read(master, 1024)
+ self.assertEqual(b'1', data)
+
+ transport.write(b'2345')
+ test_utils.run_briefly(self.loop)
+ data = os.read(master, 1024)
+ self.assertEqual(b'2345', data)
+ self.assertEqual('CONNECTED', proto.state)
+
+ os.close(master)
+
+ # extra info is available
+ self.assertIsNotNone(proto.transport.get_extra_info('pipe'))
+
+ # close connection
+ proto.transport.close()
+ self.loop.run_until_complete(proto.done)
+ self.assertEqual('CLOSED', proto.state)
+
def test_prompt_cancellation(self):
r, w = test_utils.socketpair()
r.setblocking(False)
@@ -1088,12 +1127,12 @@ class EventLoopTestsMixin:
if ov is not None:
self.assertTrue(ov.pending)
- @tasks.coroutine
+ @asyncio.coroutine
def main():
try:
self.loop.call_soon(f.cancel)
yield from f
- except futures.CancelledError:
+ except asyncio.CancelledError:
res = 'cancelled'
else:
res = None
@@ -1102,13 +1141,13 @@ class EventLoopTestsMixin:
return res
start = time.monotonic()
- t = tasks.Task(main(), loop=self.loop)
+ t = asyncio.Task(main(), loop=self.loop)
self.loop.run_forever()
elapsed = time.monotonic() - start
self.assertLess(elapsed, 0.1)
self.assertEqual(t.result(), 'cancelled')
- self.assertRaises(futures.CancelledError, f.result)
+ self.assertRaises(asyncio.CancelledError, f.result)
if ov is not None:
self.assertFalse(ov.pending)
self.loop._stop_serving(r)
@@ -1126,13 +1165,13 @@ class EventLoopTestsMixin:
self.loop._run_once = _run_once
calls = []
- @tasks.coroutine
+ @asyncio.coroutine
def wait():
loop = self.loop
calls.append(loop._run_once_counter)
- yield from tasks.sleep(loop.granularity * 10, loop=loop)
+ yield from asyncio.sleep(loop.granularity * 10, loop=loop)
calls.append(loop._run_once_counter)
- yield from tasks.sleep(loop.granularity / 10, loop=loop)
+ yield from asyncio.sleep(loop.granularity / 10, loop=loop)
calls.append(loop._run_once_counter)
self.loop.run_until_complete(wait())
@@ -1162,7 +1201,7 @@ class SubprocessTestsMixin:
prog = os.path.join(os.path.dirname(__file__), 'echo.py')
- @tasks.coroutine
+ @asyncio.coroutine
def connect():
nonlocal proto, transp
transp, proto = yield from self.loop.subprocess_exec(
@@ -1188,7 +1227,7 @@ class SubprocessTestsMixin:
prog = os.path.join(os.path.dirname(__file__), 'echo.py')
- @tasks.coroutine
+ @asyncio.coroutine
def connect():
nonlocal proto, transp
transp, proto = yield from self.loop.subprocess_exec(
@@ -1220,7 +1259,7 @@ class SubprocessTestsMixin:
proto = None
transp = None
- @tasks.coroutine
+ @asyncio.coroutine
def connect():
nonlocal proto, transp
transp, proto = yield from self.loop.subprocess_shell(
@@ -1241,7 +1280,7 @@ class SubprocessTestsMixin:
def test_subprocess_exitcode(self):
proto = None
- @tasks.coroutine
+ @asyncio.coroutine
def connect():
nonlocal proto
transp, proto = yield from self.loop.subprocess_shell(
@@ -1257,7 +1296,7 @@ class SubprocessTestsMixin:
proto = None
transp = None
- @tasks.coroutine
+ @asyncio.coroutine
def connect():
nonlocal proto, transp
transp, proto = yield from self.loop.subprocess_shell(
@@ -1279,7 +1318,7 @@ class SubprocessTestsMixin:
prog = os.path.join(os.path.dirname(__file__), 'echo.py')
- @tasks.coroutine
+ @asyncio.coroutine
def connect():
nonlocal proto, transp
transp, proto = yield from self.loop.subprocess_exec(
@@ -1300,7 +1339,7 @@ class SubprocessTestsMixin:
prog = os.path.join(os.path.dirname(__file__), 'echo.py')
- @tasks.coroutine
+ @asyncio.coroutine
def connect():
nonlocal proto, transp
transp, proto = yield from self.loop.subprocess_exec(
@@ -1322,7 +1361,7 @@ class SubprocessTestsMixin:
prog = os.path.join(os.path.dirname(__file__), 'echo.py')
- @tasks.coroutine
+ @asyncio.coroutine
def connect():
nonlocal proto, transp
transp, proto = yield from self.loop.subprocess_exec(
@@ -1343,7 +1382,7 @@ class SubprocessTestsMixin:
prog = os.path.join(os.path.dirname(__file__), 'echo2.py')
- @tasks.coroutine
+ @asyncio.coroutine
def connect():
nonlocal proto, transp
transp, proto = yield from self.loop.subprocess_exec(
@@ -1370,7 +1409,7 @@ class SubprocessTestsMixin:
prog = os.path.join(os.path.dirname(__file__), 'echo2.py')
- @tasks.coroutine
+ @asyncio.coroutine
def connect():
nonlocal proto, transp
transp, proto = yield from self.loop.subprocess_exec(
@@ -1400,7 +1439,7 @@ class SubprocessTestsMixin:
prog = os.path.join(os.path.dirname(__file__), 'echo3.py')
- @tasks.coroutine
+ @asyncio.coroutine
def connect():
nonlocal proto, transp
transp, proto = yield from self.loop.subprocess_exec(
@@ -1437,7 +1476,7 @@ class SubprocessTestsMixin:
proto = None
transp = None
- @tasks.coroutine
+ @asyncio.coroutine
def connect():
nonlocal proto
# start the new process in a new session
@@ -1453,19 +1492,18 @@ class SubprocessTestsMixin:
if sys.platform == 'win32':
- from asyncio import windows_events
class SelectEventLoopTests(EventLoopTestsMixin, unittest.TestCase):
def create_event_loop(self):
- return windows_events.SelectorEventLoop()
+ return asyncio.SelectorEventLoop()
class ProactorEventLoopTests(EventLoopTestsMixin,
SubprocessTestsMixin,
unittest.TestCase):
def create_event_loop(self):
- return windows_events.ProactorEventLoop()
+ return asyncio.ProactorEventLoop()
def test_create_ssl_connection(self):
raise unittest.SkipTest("IocpEventLoop incompatible with SSL")
@@ -1499,17 +1537,16 @@ if sys.platform == 'win32':
"IocpEventLoop does not have create_datagram_endpoint()")
else:
from asyncio import selectors
- from asyncio import unix_events
class UnixEventLoopTestsMixin(EventLoopTestsMixin):
def setUp(self):
super().setUp()
- watcher = unix_events.SafeChildWatcher()
+ watcher = asyncio.SafeChildWatcher()
watcher.attach_loop(self.loop)
- events.set_child_watcher(watcher)
+ asyncio.set_child_watcher(watcher)
def tearDown(self):
- events.set_child_watcher(None)
+ asyncio.set_child_watcher(None)
super().tearDown()
if hasattr(selectors, 'KqueueSelector'):
@@ -1518,16 +1555,28 @@ else:
unittest.TestCase):
def create_event_loop(self):
- return unix_events.SelectorEventLoop(
+ return asyncio.SelectorEventLoop(
selectors.KqueueSelector())
+ # kqueue doesn't support character devices (PTY) on Mac OS X older
+ # than 10.9 (Maverick)
+ @support.requires_mac_ver(10, 9)
+ def test_read_pty_output(self):
+ super().test_read_pty_output()
+
+ # kqueue doesn't support character devices (PTY) on Mac OS X older
+ # than 10.9 (Maverick)
+ @support.requires_mac_ver(10, 9)
+ def test_write_pty(self):
+ super().test_write_pty()
+
if hasattr(selectors, 'EpollSelector'):
class EPollEventLoopTests(UnixEventLoopTestsMixin,
SubprocessTestsMixin,
unittest.TestCase):
def create_event_loop(self):
- return unix_events.SelectorEventLoop(selectors.EpollSelector())
+ return asyncio.SelectorEventLoop(selectors.EpollSelector())
if hasattr(selectors, 'PollSelector'):
class PollEventLoopTests(UnixEventLoopTestsMixin,
@@ -1535,7 +1584,7 @@ else:
unittest.TestCase):
def create_event_loop(self):
- return unix_events.SelectorEventLoop(selectors.PollSelector())
+ return asyncio.SelectorEventLoop(selectors.PollSelector())
# Should always exist.
class SelectEventLoopTests(UnixEventLoopTestsMixin,
@@ -1543,7 +1592,7 @@ else:
unittest.TestCase):
def create_event_loop(self):
- return unix_events.SelectorEventLoop(selectors.SelectSelector())
+ return asyncio.SelectorEventLoop(selectors.SelectSelector())
class HandleTests(unittest.TestCase):
@@ -1553,7 +1602,7 @@ class HandleTests(unittest.TestCase):
return args
args = ()
- h = events.Handle(callback, args)
+ h = asyncio.Handle(callback, args)
self.assertIs(h._callback, callback)
self.assertIs(h._args, args)
self.assertFalse(h._cancelled)
@@ -1576,16 +1625,16 @@ class HandleTests(unittest.TestCase):
def test_make_handle(self):
def callback(*args):
return args
- h1 = events.Handle(callback, ())
+ h1 = asyncio.Handle(callback, ())
self.assertRaises(
- AssertionError, events.make_handle, h1, ())
+ AssertionError, asyncio.events.make_handle, h1, ())
@unittest.mock.patch('asyncio.events.logger')
def test_callback_with_exception(self, log):
def callback():
raise ValueError()
- h = events.Handle(callback, ())
+ h = asyncio.Handle(callback, ())
h._run()
self.assertTrue(log.exception.called)
@@ -1594,7 +1643,7 @@ class TimerTests(unittest.TestCase):
def test_hash(self):
when = time.monotonic()
- h = events.TimerHandle(when, lambda: False, ())
+ h = asyncio.TimerHandle(when, lambda: False, ())
self.assertEqual(hash(h), hash(when))
def test_timer(self):
@@ -1603,7 +1652,7 @@ class TimerTests(unittest.TestCase):
args = ()
when = time.monotonic()
- h = events.TimerHandle(when, callback, args)
+ h = asyncio.TimerHandle(when, callback, args)
self.assertIs(h._callback, callback)
self.assertIs(h._args, args)
self.assertFalse(h._cancelled)
@@ -1618,7 +1667,7 @@ class TimerTests(unittest.TestCase):
self.assertTrue(r.endswith('())<cancelled>'), r)
self.assertRaises(AssertionError,
- events.TimerHandle, None, callback, args)
+ asyncio.TimerHandle, None, callback, args)
def test_timer_comparison(self):
def callback(*args):
@@ -1626,8 +1675,8 @@ class TimerTests(unittest.TestCase):
when = time.monotonic()
- h1 = events.TimerHandle(when, callback, ())
- h2 = events.TimerHandle(when, callback, ())
+ h1 = asyncio.TimerHandle(when, callback, ())
+ h2 = asyncio.TimerHandle(when, callback, ())
# TODO: Use assertLess etc.
self.assertFalse(h1 < h2)
self.assertFalse(h2 < h1)
@@ -1643,8 +1692,8 @@ class TimerTests(unittest.TestCase):
h2.cancel()
self.assertFalse(h1 == h2)
- h1 = events.TimerHandle(when, callback, ())
- h2 = events.TimerHandle(when + 10.0, callback, ())
+ h1 = asyncio.TimerHandle(when, callback, ())
+ h2 = asyncio.TimerHandle(when + 10.0, callback, ())
self.assertTrue(h1 < h2)
self.assertFalse(h2 < h1)
self.assertTrue(h1 <= h2)
@@ -1656,7 +1705,7 @@ class TimerTests(unittest.TestCase):
self.assertFalse(h1 == h2)
self.assertTrue(h1 != h2)
- h3 = events.Handle(callback, ())
+ h3 = asyncio.Handle(callback, ())
self.assertIs(NotImplemented, h1.__eq__(h3))
self.assertIs(NotImplemented, h1.__ne__(h3))
@@ -1665,7 +1714,7 @@ class AbstractEventLoopTests(unittest.TestCase):
def test_not_implemented(self):
f = unittest.mock.Mock()
- loop = events.AbstractEventLoop()
+ loop = asyncio.AbstractEventLoop()
self.assertRaises(
NotImplementedError, loop.run_forever)
self.assertRaises(
@@ -1739,19 +1788,19 @@ class ProtocolsAbsTests(unittest.TestCase):
def test_empty(self):
f = unittest.mock.Mock()
- p = protocols.Protocol()
+ p = asyncio.Protocol()
self.assertIsNone(p.connection_made(f))
self.assertIsNone(p.connection_lost(f))
self.assertIsNone(p.data_received(f))
self.assertIsNone(p.eof_received())
- dp = protocols.DatagramProtocol()
+ dp = asyncio.DatagramProtocol()
self.assertIsNone(dp.connection_made(f))
self.assertIsNone(dp.connection_lost(f))
self.assertIsNone(dp.error_received(f))
self.assertIsNone(dp.datagram_received(f, f))
- sp = protocols.SubprocessProtocol()
+ sp = asyncio.SubprocessProtocol()
self.assertIsNone(sp.connection_made(f))
self.assertIsNone(sp.connection_lost(f))
self.assertIsNone(sp.pipe_data_received(1, f))
@@ -1761,16 +1810,8 @@ class ProtocolsAbsTests(unittest.TestCase):
class PolicyTests(unittest.TestCase):
- def create_policy(self):
- if sys.platform == "win32":
- from asyncio import windows_events
- return windows_events.DefaultEventLoopPolicy()
- else:
- from asyncio import unix_events
- return unix_events.DefaultEventLoopPolicy()
-
def test_event_loop_policy(self):
- policy = events.AbstractEventLoopPolicy()
+ policy = asyncio.AbstractEventLoopPolicy()
self.assertRaises(NotImplementedError, policy.get_event_loop)
self.assertRaises(NotImplementedError, policy.set_event_loop, object())
self.assertRaises(NotImplementedError, policy.new_event_loop)
@@ -1779,18 +1820,18 @@ class PolicyTests(unittest.TestCase):
object())
def test_get_event_loop(self):
- policy = self.create_policy()
+ policy = asyncio.DefaultEventLoopPolicy()
self.assertIsNone(policy._local._loop)
loop = policy.get_event_loop()
- self.assertIsInstance(loop, events.AbstractEventLoop)
+ self.assertIsInstance(loop, asyncio.AbstractEventLoop)
self.assertIs(policy._local._loop, loop)
self.assertIs(loop, policy.get_event_loop())
loop.close()
def test_get_event_loop_calls_set_event_loop(self):
- policy = self.create_policy()
+ policy = asyncio.DefaultEventLoopPolicy()
with unittest.mock.patch.object(
policy, "set_event_loop",
@@ -1806,7 +1847,7 @@ class PolicyTests(unittest.TestCase):
loop.close()
def test_get_event_loop_after_set_none(self):
- policy = self.create_policy()
+ policy = asyncio.DefaultEventLoopPolicy()
policy.set_event_loop(None)
self.assertRaises(AssertionError, policy.get_event_loop)
@@ -1814,7 +1855,7 @@ class PolicyTests(unittest.TestCase):
def test_get_event_loop_thread(self, m_current_thread):
def f():
- policy = self.create_policy()
+ policy = asyncio.DefaultEventLoopPolicy()
self.assertRaises(AssertionError, policy.get_event_loop)
th = threading.Thread(target=f)
@@ -1822,14 +1863,14 @@ class PolicyTests(unittest.TestCase):
th.join()
def test_new_event_loop(self):
- policy = self.create_policy()
+ policy = asyncio.DefaultEventLoopPolicy()
loop = policy.new_event_loop()
- self.assertIsInstance(loop, events.AbstractEventLoop)
+ self.assertIsInstance(loop, asyncio.AbstractEventLoop)
loop.close()
def test_set_event_loop(self):
- policy = self.create_policy()
+ policy = asyncio.DefaultEventLoopPolicy()
old_loop = policy.get_event_loop()
self.assertRaises(AssertionError, policy.set_event_loop, object())
@@ -1842,19 +1883,19 @@ class PolicyTests(unittest.TestCase):
old_loop.close()
def test_get_event_loop_policy(self):
- policy = events.get_event_loop_policy()
- self.assertIsInstance(policy, events.AbstractEventLoopPolicy)
- self.assertIs(policy, events.get_event_loop_policy())
+ policy = asyncio.get_event_loop_policy()
+ self.assertIsInstance(policy, asyncio.AbstractEventLoopPolicy)
+ self.assertIs(policy, asyncio.get_event_loop_policy())
def test_set_event_loop_policy(self):
self.assertRaises(
- AssertionError, events.set_event_loop_policy, object())
+ AssertionError, asyncio.set_event_loop_policy, object())
- old_policy = events.get_event_loop_policy()
+ old_policy = asyncio.get_event_loop_policy()
- policy = self.create_policy()
- events.set_event_loop_policy(policy)
- self.assertIs(policy, events.get_event_loop_policy())
+ policy = asyncio.DefaultEventLoopPolicy()
+ asyncio.set_event_loop_policy(policy)
+ self.assertIs(policy, asyncio.get_event_loop_policy())
self.assertIsNot(policy, old_policy)