summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_asyncio
diff options
context:
space:
mode:
authorIllia Volochii <illia.volochii@gmail.com>2021-07-01 13:13:59 (GMT)
committerGitHub <noreply@github.com>2021-07-01 13:13:59 (GMT)
commita1092f62492a3fcd6195bea94eccf8d5a300acb1 (patch)
treee6771f532ecc51ff4665e0f4dd6d715c31b0c04d /Lib/test/test_asyncio
parent3623aaa78cb9c50edb6da5ac37000446f138b91c (diff)
downloadcpython-a1092f62492a3fcd6195bea94eccf8d5a300acb1.zip
cpython-a1092f62492a3fcd6195bea94eccf8d5a300acb1.tar.gz
cpython-a1092f62492a3fcd6195bea94eccf8d5a300acb1.tar.bz2
bpo-43216: Remove @asyncio.coroutine (GH-26369)
Remove the @asyncio.coroutine decorator enabling legacy generator-based coroutines to be compatible with async/await code; remove asyncio.coroutines.CoroWrapper used for wrapping legacy coroutine objects in the debug mode. The decorator has been deprecated since Python 3.8 and the removal was initially scheduled for Python 3.10.
Diffstat (limited to 'Lib/test/test_asyncio')
-rw-r--r--Lib/test/test_asyncio/test_base_events.py6
-rw-r--r--Lib/test/test_asyncio/test_events.py4
-rw-r--r--Lib/test/test_asyncio/test_locks.py38
-rw-r--r--Lib/test/test_asyncio/test_pep492.py14
-rw-r--r--Lib/test/test_asyncio/test_tasks.py540
5 files changed, 64 insertions, 538 deletions
diff --git a/Lib/test/test_asyncio/test_base_events.py b/Lib/test/test_asyncio/test_base_events.py
index 5691d42..47a9fb9 100644
--- a/Lib/test/test_asyncio/test_base_events.py
+++ b/Lib/test/test_asyncio/test_base_events.py
@@ -1884,10 +1884,8 @@ class BaseEventLoopWithSelectorTests(test_utils.TestCase):
MyProto, sock, None, None, mock.ANY, mock.ANY)
def test_call_coroutine(self):
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def simple_coroutine():
- pass
+ async def simple_coroutine():
+ pass
self.loop.set_debug(True)
coro_func = simple_coroutine
diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py
index 55fc266..e781769 100644
--- a/Lib/test/test_asyncio/test_events.py
+++ b/Lib/test/test_asyncio/test_events.py
@@ -17,6 +17,7 @@ import subprocess
import sys
import threading
import time
+import types
import errno
import unittest
from unittest import mock
@@ -2163,8 +2164,7 @@ class HandleTests(test_utils.TestCase):
'<Handle cancelled>')
# decorated function
- with self.assertWarns(DeprecationWarning):
- cb = asyncio.coroutine(noop)
+ cb = types.coroutine(noop)
h = asyncio.Handle(cb, (), self.loop)
self.assertEqual(repr(h),
'<Handle noop() at %s:%s>'
diff --git a/Lib/test/test_asyncio/test_locks.py b/Lib/test/test_asyncio/test_locks.py
index 6194cd0..441adee 100644
--- a/Lib/test/test_asyncio/test_locks.py
+++ b/Lib/test/test_asyncio/test_locks.py
@@ -38,14 +38,12 @@ class LockTests(test_utils.TestCase):
def test_lock(self):
lock = asyncio.Lock()
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def acquire_lock():
- return (yield from lock)
+ async def acquire_lock():
+ return await lock
with self.assertRaisesRegex(
TypeError,
- "object is not iterable"
+ "object Lock can't be used in 'await' expression"
):
self.loop.run_until_complete(acquire_lock())
@@ -78,18 +76,16 @@ class LockTests(test_utils.TestCase):
asyncio.BoundedSemaphore(),
]
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def test(lock):
- yield from asyncio.sleep(0.01)
- self.assertFalse(lock.locked())
- with self.assertRaisesRegex(
- TypeError,
- "object is not iterable"
- ):
- with (yield from lock):
- pass
- self.assertFalse(lock.locked())
+ async def test(lock):
+ await asyncio.sleep(0.01)
+ self.assertFalse(lock.locked())
+ with self.assertRaisesRegex(
+ TypeError,
+ r"object \w+ can't be used in 'await' expression"
+ ):
+ with await lock:
+ pass
+ self.assertFalse(lock.locked())
for primitive in primitives:
loop.run_until_complete(test(primitive))
@@ -788,14 +784,12 @@ class SemaphoreTests(test_utils.TestCase):
sem = asyncio.Semaphore()
self.assertEqual(1, sem._value)
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def acquire_lock():
- return (yield from sem)
+ async def acquire_lock():
+ return await sem
with self.assertRaisesRegex(
TypeError,
- "'Semaphore' object is not iterable",
+ "object Semaphore can't be used in 'await' expression",
):
self.loop.run_until_complete(acquire_lock())
diff --git a/Lib/test/test_asyncio/test_pep492.py b/Lib/test/test_asyncio/test_pep492.py
index 4bd50f4..f833f78 100644
--- a/Lib/test/test_asyncio/test_pep492.py
+++ b/Lib/test/test_asyncio/test_pep492.py
@@ -123,20 +123,6 @@ class CoroutineTests(BaseTest):
async def foo(): pass
self.assertTrue(asyncio.iscoroutinefunction(foo))
- def test_function_returning_awaitable(self):
- class Awaitable:
- def __await__(self):
- return ('spam',)
-
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def func():
- return Awaitable()
-
- coro = func()
- self.assertEqual(coro.send(None), 'spam')
- coro.close()
-
def test_async_def_coroutines(self):
async def bar():
return 'spam'
diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py
index a9e4cf5..8671087 100644
--- a/Lib/test/test_asyncio/test_tasks.py
+++ b/Lib/test/test_asyncio/test_tasks.py
@@ -33,18 +33,6 @@ async def coroutine_function():
pass
-@contextlib.contextmanager
-def set_coroutine_debug(enabled):
- coroutines = asyncio.coroutines
-
- old_debug = coroutines._DEBUG
- try:
- coroutines._DEBUG = enabled
- yield
- finally:
- coroutines._DEBUG = old_debug
-
-
def format_coroutine(qualname, state, src, source_traceback, generator=False):
if generator:
state = '%s' % state
@@ -234,43 +222,6 @@ class BaseTaskTests:
self.assertTrue(t.done())
self.assertEqual(t.result(), 'ok')
- def test_ensure_future_coroutine_2(self):
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def notmuch():
- return 'ok'
- t = asyncio.ensure_future(notmuch(), loop=self.loop)
- self.assertIs(t._loop, self.loop)
- self.loop.run_until_complete(t)
- self.assertTrue(t.done())
- self.assertEqual(t.result(), 'ok')
-
- a = notmuch()
- self.addCleanup(a.close)
- with self.assertWarns(DeprecationWarning) as cm:
- with self.assertRaisesRegex(RuntimeError, 'There is no current event loop'):
- asyncio.ensure_future(a)
- self.assertEqual(cm.warnings[0].filename, __file__)
-
- async def test():
- return asyncio.ensure_future(notmuch())
- t = self.loop.run_until_complete(test())
- self.assertIs(t._loop, self.loop)
- self.loop.run_until_complete(t)
- self.assertTrue(t.done())
- self.assertEqual(t.result(), 'ok')
-
- # Deprecated in 3.10
- asyncio.set_event_loop(self.loop)
- self.addCleanup(asyncio.set_event_loop, None)
- with self.assertWarns(DeprecationWarning) as cm:
- t = asyncio.ensure_future(notmuch())
- self.assertEqual(cm.warnings[0].filename, __file__)
- self.assertIs(t._loop, self.loop)
- self.loop.run_until_complete(t)
- self.assertTrue(t.done())
- self.assertEqual(t.result(), 'ok')
-
def test_ensure_future_future(self):
f_orig = self.new_future(self.loop)
f_orig.set_result('ko')
@@ -318,12 +269,10 @@ class BaseTaskTests:
def __init__(self, coro):
self.coro = coro
def __await__(self):
- return (yield from self.coro)
+ return self.coro.__await__()
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def coro():
- return 'ok'
+ async def coro():
+ return 'ok'
loop = asyncio.new_event_loop()
self.set_event_loop(loop)
@@ -450,68 +399,6 @@ class BaseTaskTests:
self.assertEqual(t.get_name(), '{6}')
self.loop.run_until_complete(t)
- def test_task_repr_coro_decorator(self):
- self.loop.set_debug(False)
-
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def notmuch():
- # notmuch() function doesn't use yield from: it will be wrapped by
- # @coroutine decorator
- return 123
-
- # test coroutine function
- self.assertEqual(notmuch.__name__, 'notmuch')
- self.assertRegex(notmuch.__qualname__,
- r'\w+.test_task_repr_coro_decorator'
- r'\.<locals>\.notmuch')
- self.assertEqual(notmuch.__module__, __name__)
-
- # test coroutine object
- gen = notmuch()
- # On Python >= 3.5, generators now inherit the name of the
- # function, as expected, and have a qualified name (__qualname__
- # attribute).
- coro_name = 'notmuch'
- coro_qualname = ('BaseTaskTests.test_task_repr_coro_decorator'
- '.<locals>.notmuch')
- self.assertEqual(gen.__name__, coro_name)
- self.assertEqual(gen.__qualname__, coro_qualname)
-
- # test repr(CoroWrapper)
- if coroutines._DEBUG:
- # format the coroutine object
- if coroutines._DEBUG:
- filename, lineno = test_utils.get_function_source(notmuch)
- frame = gen._source_traceback[-1]
- coro = ('%s() running, defined at %s:%s, created at %s:%s'
- % (coro_qualname, filename, lineno,
- frame[0], frame[1]))
- else:
- code = gen.gi_code
- coro = ('%s() running at %s:%s'
- % (coro_qualname, code.co_filename,
- code.co_firstlineno))
-
- self.assertEqual(repr(gen), '<CoroWrapper %s>' % coro)
-
- # test pending Task
- t = self.new_task(self.loop, gen)
- t.add_done_callback(Dummy())
-
- # format the coroutine object
- if coroutines._DEBUG:
- src = '%s:%s' % test_utils.get_function_source(notmuch)
- else:
- code = gen.gi_code
- src = '%s:%s' % (code.co_filename, code.co_firstlineno)
- coro = format_coroutine(coro_qualname, 'running', src,
- t._source_traceback,
- generator=not coroutines._DEBUG)
- self.assertEqual(repr(t),
- "<Task pending name='TestTask' %s cb=[<Dummy>()]>" % coro)
- self.loop.run_until_complete(t)
-
def test_task_repr_wait_for(self):
self.loop.set_debug(False)
@@ -527,30 +414,6 @@ class BaseTaskTests:
fut.set_result(None)
self.loop.run_until_complete(task)
- def test_task_repr_partial_corowrapper(self):
- # Issue #222: repr(CoroWrapper) must not fail in debug mode if the
- # coroutine is a partial function
- with set_coroutine_debug(True):
- self.loop.set_debug(True)
-
- async def func(x, y):
- await asyncio.sleep(0)
-
- with self.assertWarns(DeprecationWarning):
- partial_func = asyncio.coroutine(functools.partial(func, 1))
- task = self.loop.create_task(partial_func(2))
-
- # make warnings quiet
- task._log_destroy_pending = False
- self.addCleanup(task._coro.close)
-
- coro_repr = repr(task._coro)
- expected = (
- r'<coroutine object \w+\.test_task_repr_partial_corowrapper'
- r'\.<locals>\.func at'
- )
- self.assertRegex(coro_repr, expected)
-
def test_task_basics(self):
async def outer():
@@ -741,12 +604,10 @@ class BaseTaskTests:
(asyncio.CancelledError, ('my message',), 2))
def test_cancel_yield(self):
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def task():
- yield
- yield
- return 12
+ async def task():
+ await asyncio.sleep(0)
+ await asyncio.sleep(0)
+ return 12
t = self.new_task(self.loop, task())
test_utils.run_briefly(self.loop) # start coro
@@ -1322,10 +1183,8 @@ class BaseTaskTests:
def test_wait_duplicate_coroutines(self):
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def coro(s):
- return s
+ async def coro(s):
+ return s
c = coro('test')
task = self.new_task(
self.loop,
@@ -1587,16 +1446,14 @@ class BaseTaskTests:
completed = set()
time_shifted = False
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def sleeper(dt, x):
- nonlocal time_shifted
- yield from asyncio.sleep(dt)
- completed.add(x)
- if not time_shifted and 'a' in completed and 'b' in completed:
- time_shifted = True
- loop.advance_time(0.14)
- return x
+ async def sleeper(dt, x):
+ nonlocal time_shifted
+ await asyncio.sleep(dt)
+ completed.add(x)
+ if not time_shifted and 'a' in completed and 'b' in completed:
+ time_shifted = True
+ loop.advance_time(0.14)
+ return x
a = sleeper(0.01, 'a')
b = sleeper(0.01, 'b')
@@ -1614,10 +1471,6 @@ class BaseTaskTests:
self.assertTrue('b' in res[:2])
self.assertEqual(res[2], 'c')
- # Doing it again should take no time and exercise a different path.
- res = loop.run_until_complete(self.new_task(loop, foo()))
- self.assertAlmostEqual(0.15, loop.time())
-
def test_as_completed_with_timeout(self):
def gen():
@@ -1727,19 +1580,15 @@ class BaseTaskTests:
def test_as_completed_duplicate_coroutines(self):
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def coro(s):
- return s
+ async def coro(s):
+ return s
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def runner():
- result = []
- c = coro('ham')
- for f in asyncio.as_completed([c, c, coro('spam')]):
- result.append((yield from f))
- return result
+ async def runner():
+ result = []
+ c = coro('ham')
+ for f in asyncio.as_completed([c, c, coro('spam')]):
+ result.append(await f)
+ return result
fut = self.new_task(self.loop, runner())
self.loop.run_until_complete(fut)
@@ -1900,17 +1749,6 @@ class BaseTaskTests:
self.loop.run_until_complete(task),
'ko')
- def test_step_result(self):
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def notmuch():
- yield None
- yield 1
- return 'ko'
-
- self.assertRaises(
- RuntimeError, self.loop.run_until_complete, notmuch())
-
def test_step_result_future(self):
# If coroutine returns future, task waits on this future.
@@ -1983,53 +1821,15 @@ class BaseTaskTests:
yield
self.assertFalse(asyncio.iscoroutinefunction(fn1))
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def fn2():
- yield
+ async def fn2():
+ pass
self.assertTrue(asyncio.iscoroutinefunction(fn2))
self.assertFalse(asyncio.iscoroutinefunction(mock.Mock()))
- def test_yield_vs_yield_from(self):
- fut = self.new_future(self.loop)
-
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def wait_for_future():
- yield fut
-
- task = wait_for_future()
- with self.assertRaises(RuntimeError):
- self.loop.run_until_complete(task)
-
- self.assertFalse(fut.done())
-
- def test_yield_vs_yield_from_generator(self):
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def coro():
- yield
-
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def wait_for_future():
- gen = coro()
- try:
- yield gen
- finally:
- gen.close()
-
- task = wait_for_future()
- self.assertRaises(
- RuntimeError,
- self.loop.run_until_complete, task)
-
def test_coroutine_non_gen_function(self):
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def func():
- return 'test'
+ async def func():
+ return 'test'
self.assertTrue(asyncio.iscoroutinefunction(func))
@@ -2042,10 +1842,8 @@ class BaseTaskTests:
def test_coroutine_non_gen_function_return_future(self):
fut = self.new_future(self.loop)
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def func():
- return fut
+ async def func():
+ return fut
async def coro():
fut.set_result('test')
@@ -2053,7 +1851,7 @@ class BaseTaskTests:
t1 = self.new_task(self.loop, func())
t2 = self.new_task(self.loop, coro())
res = self.loop.run_until_complete(t1)
- self.assertEqual(res, 'test')
+ self.assertEqual(res, fut)
self.assertIsNone(t2.result())
def test_current_task(self):
@@ -2309,136 +2107,15 @@ class BaseTaskTests:
self.assertRaises(ValueError, self.loop.run_until_complete,
asyncio.wait([]))
- def test_corowrapper_mocks_generator(self):
-
- def check():
- # A function that asserts various things.
- # Called twice, with different debug flag values.
-
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def coro():
- # The actual coroutine.
- self.assertTrue(gen.gi_running)
- yield from fut
-
- # A completed Future used to run the coroutine.
- fut = self.new_future(self.loop)
- fut.set_result(None)
-
- # Call the coroutine.
- gen = coro()
-
- # Check some properties.
- self.assertTrue(asyncio.iscoroutine(gen))
- self.assertIsInstance(gen.gi_frame, types.FrameType)
- self.assertFalse(gen.gi_running)
- self.assertIsInstance(gen.gi_code, types.CodeType)
-
- # Run it.
- self.loop.run_until_complete(gen)
-
- # The frame should have changed.
- self.assertIsNone(gen.gi_frame)
-
- # Test with debug flag cleared.
- with set_coroutine_debug(False):
- check()
-
- # Test with debug flag set.
- with set_coroutine_debug(True):
- check()
-
- def test_yield_from_corowrapper(self):
- with set_coroutine_debug(True):
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def t1():
- return (yield from t2())
-
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def t2():
- f = self.new_future(self.loop)
- self.new_task(self.loop, t3(f))
- return (yield from f)
-
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def t3(f):
- f.set_result((1, 2, 3))
-
- task = self.new_task(self.loop, t1())
- val = self.loop.run_until_complete(task)
- self.assertEqual(val, (1, 2, 3))
-
- def test_yield_from_corowrapper_send(self):
- def foo():
- a = yield
- return a
-
- def call(arg):
- cw = asyncio.coroutines.CoroWrapper(foo())
- cw.send(None)
- try:
- cw.send(arg)
- except StopIteration as ex:
- return ex.args[0]
- else:
- raise AssertionError('StopIteration was expected')
-
- self.assertEqual(call((1, 2)), (1, 2))
- self.assertEqual(call('spam'), 'spam')
-
- def test_corowrapper_weakref(self):
- wd = weakref.WeakValueDictionary()
- def foo(): yield from []
- cw = asyncio.coroutines.CoroWrapper(foo())
- wd['cw'] = cw # Would fail without __weakref__ slot.
- cw.gen = None # Suppress warning from __del__.
-
- def test_corowrapper_throw(self):
- # Issue 429: CoroWrapper.throw must be compatible with gen.throw
- def foo():
- value = None
- while True:
- try:
- value = yield value
- except Exception as e:
- value = e
-
- exception = Exception("foo")
- cw = asyncio.coroutines.CoroWrapper(foo())
- cw.send(None)
- self.assertIs(exception, cw.throw(exception))
-
- cw = asyncio.coroutines.CoroWrapper(foo())
- cw.send(None)
- self.assertIs(exception, cw.throw(Exception, exception))
-
- cw = asyncio.coroutines.CoroWrapper(foo())
- cw.send(None)
- exception = cw.throw(Exception, "foo")
- self.assertIsInstance(exception, Exception)
- self.assertEqual(exception.args, ("foo", ))
-
- cw = asyncio.coroutines.CoroWrapper(foo())
- cw.send(None)
- exception = cw.throw(Exception, "foo", None)
- self.assertIsInstance(exception, Exception)
- self.assertEqual(exception.args, ("foo", ))
-
def test_log_destroyed_pending_task(self):
Task = self.__class__.Task
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def kill_me(loop):
- future = self.new_future(loop)
- yield from future
- # at this point, the only reference to kill_me() task is
- # the Task._wakeup() method in future._callbacks
- raise Exception("code never reached")
+ async def kill_me(loop):
+ future = self.new_future(loop)
+ await future
+ # at this point, the only reference to kill_me() task is
+ # the Task._wakeup() method in future._callbacks
+ raise Exception("code never reached")
mock_handler = mock.Mock()
self.loop.set_debug(True)
@@ -2456,8 +2133,6 @@ class BaseTaskTests:
self.loop._run_once()
self.assertEqual(len(self.loop._ready), 0)
- # remove the future used in kill_me(), and references to the task
- del coro.gi_frame.f_locals['future']
coro = None
source_traceback = task._source_traceback
task = None
@@ -2491,62 +2166,6 @@ class BaseTaskTests:
loop.run_until_complete(runner())
self.assertFalse(m_log.error.called)
- @mock.patch('asyncio.coroutines.logger')
- def test_coroutine_never_yielded(self, m_log):
- with set_coroutine_debug(True):
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def coro_noop():
- pass
-
- tb_filename = __file__
- tb_lineno = sys._getframe().f_lineno + 2
- # create a coroutine object but don't use it
- coro_noop()
- support.gc_collect()
-
- self.assertTrue(m_log.error.called)
- message = m_log.error.call_args[0][0]
- func_filename, func_lineno = test_utils.get_function_source(coro_noop)
-
- regex = (r'^<CoroWrapper %s\(?\)? .* at %s:%s, .*> '
- r'was never yielded from\n'
- r'Coroutine object created at \(most recent call last, truncated to \d+ last lines\):\n'
- r'.*\n'
- r' File "%s", line %s, in test_coroutine_never_yielded\n'
- r' coro_noop\(\)$'
- % (re.escape(coro_noop.__qualname__),
- re.escape(func_filename), func_lineno,
- re.escape(tb_filename), tb_lineno))
-
- self.assertRegex(message, re.compile(regex, re.DOTALL))
-
- def test_return_coroutine_from_coroutine(self):
- """Return of @asyncio.coroutine()-wrapped function generator object
- from @asyncio.coroutine()-wrapped function should have same effect as
- returning generator object or Future."""
- def check():
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def outer_coro():
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def inner_coro():
- return 1
-
- return inner_coro()
-
- result = self.loop.run_until_complete(outer_coro())
- self.assertEqual(result, 1)
-
- # Test with debug flag cleared.
- with set_coroutine_debug(False):
- check()
-
- # Test with debug flag set.
- with set_coroutine_debug(True):
- check()
-
def test_task_source_traceback(self):
self.loop.set_debug(True)
@@ -2677,10 +2296,8 @@ class BaseTaskTests:
raise ValueError
self.loop.call_soon = call_soon
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def coro():
- pass
+ async def coro():
+ pass
self.assertFalse(m_log.error.called)
@@ -2708,23 +2325,6 @@ class BaseTaskTests:
"a coroutine was expected, got 123"):
self.new_task(self.loop, 123)
- def test_create_task_with_oldstyle_coroutine(self):
-
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def coro():
- pass
-
- task = self.new_task(self.loop, coro())
- self.assertIsInstance(task, self.Task)
- self.loop.run_until_complete(task)
-
- # test it for the second time to ensure that caching
- # in asyncio.iscoroutine() doesn't break things.
- task = self.new_task(self.loop, coro())
- self.assertIsInstance(task, self.Task)
- self.loop.run_until_complete(task)
-
def test_create_task_with_async_function(self):
async def coro():
@@ -3365,7 +2965,7 @@ class GatherTestsBase:
def test_env_var_debug(self):
code = '\n'.join((
'import asyncio.coroutines',
- 'print(asyncio.coroutines._DEBUG)'))
+ 'print(asyncio.coroutines._is_debug_mode())'))
# Test with -E to not fail if the unit test was run with
# PYTHONASYNCIODEBUG set to a non-empty string
@@ -3542,10 +3142,8 @@ class CoroutineGatherTests(GatherTestsBase, test_utils.TestCase):
self.other_loop.run_until_complete(fut)
def test_duplicate_coroutines(self):
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def coro(s):
- return s
+ async def coro(s):
+ return s
c = coro('abc')
fut = self._gather(c, c, coro('def'), c)
self._run_loop(self.one_loop)
@@ -3633,9 +3231,7 @@ class RunCoroutineThreadsafeTests(test_utils.TestCase):
# otherwise it spills errors and breaks **other** unittests, since
# 'target' is interacting with threads.
- # With this call, `coro` will be advanced, so that
- # CoroWrapper.__del__ won't do anything when asyncio tests run
- # in debug mode.
+ # With this call, `coro` will be advanced.
self.loop.call_soon_threadsafe(coro.send, None)
try:
return future.result(timeout)
@@ -3771,54 +3367,6 @@ class CompatibilityTests(test_utils.TestCase):
self.loop = None
super().tearDown()
- def test_yield_from_awaitable(self):
-
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def coro():
- yield from asyncio.sleep(0)
- return 'ok'
-
- result = self.loop.run_until_complete(coro())
- self.assertEqual('ok', result)
-
- def test_await_old_style_coro(self):
-
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def coro1():
- return 'ok1'
-
- with self.assertWarns(DeprecationWarning):
- @asyncio.coroutine
- def coro2():
- yield from asyncio.sleep(0)
- return 'ok2'
-
- async def inner():
- return await asyncio.gather(coro1(), coro2())
-
- result = self.loop.run_until_complete(inner())
- self.assertEqual(['ok1', 'ok2'], result)
-
- def test_debug_mode_interop(self):
- # https://bugs.python.org/issue32636
- code = textwrap.dedent("""
- import asyncio
-
- async def native_coro():
- pass
-
- @asyncio.coroutine
- def old_style_coro():
- yield from native_coro()
-
- asyncio.run(old_style_coro())
- """)
-
- assert_python_ok("-Wignore::DeprecationWarning", "-c", code,
- PYTHONASYNCIODEBUG="1")
-
if __name__ == '__main__':
unittest.main()