diff options
Diffstat (limited to 'Lib/test/test_builtin.py')
-rw-r--r-- | Lib/test/test_builtin.py | 73 |
1 files changed, 72 insertions, 1 deletions
diff --git a/Lib/test/test_builtin.py b/Lib/test/test_builtin.py index 5674ea8..4a358e8 100644 --- a/Lib/test/test_builtin.py +++ b/Lib/test/test_builtin.py @@ -1,6 +1,7 @@ # Python test set -- built-in functions import ast +import asyncio import builtins import collections import decimal @@ -18,9 +19,14 @@ import types import unittest import warnings from contextlib import ExitStack +from inspect import CO_COROUTINE +from itertools import product +from textwrap import dedent +from types import AsyncGeneratorType, FunctionType from operator import neg from test.support import ( - EnvironmentVarGuard, TESTFN, check_warnings, swap_attr, unlink) + EnvironmentVarGuard, TESTFN, check_warnings, swap_attr, unlink, + maybe_get_event_loop_policy) from test.support.script_helper import assert_python_ok from unittest.mock import MagicMock, patch try: @@ -358,6 +364,71 @@ class BuiltinTest(unittest.TestCase): rv = ns['f']() self.assertEqual(rv, tuple(expected)) + def test_compile_top_level_await(self): + """Test whether code some top level await can be compiled. + + Make sure it compiles only with the PyCF_ALLOW_TOP_LEVEL_AWAIT flag set, + and make sure the generated code object has the CO_COROUTINE flag set in + order to execute it with `await eval(.....)` instead of exec, or via a + FunctionType. + """ + + # helper function just to check we can run top=level async-for + async def arange(n): + for i in range(n): + yield i + + modes = ('single', 'exec') + code_samples = ['''a = await asyncio.sleep(0, result=1)''', + '''async for i in arange(1): + a = 1''', + '''async with asyncio.Lock() as l: + a = 1'''] + policy = maybe_get_event_loop_policy() + try: + for mode, code_sample in product(modes,code_samples): + source = dedent(code_sample) + with self.assertRaises(SyntaxError, msg=f"{source=} {mode=}"): + compile(source, '?' , mode) + + co = compile(source, + '?', + mode, + flags=ast.PyCF_ALLOW_TOP_LEVEL_AWAIT) + + self.assertEqual(co.co_flags & CO_COROUTINE, CO_COROUTINE, + msg=f"{source=} {mode=}") + + + # test we can create and advance a function type + globals_ = {'asyncio': asyncio, 'a':0, 'arange': arange} + async_f = FunctionType(co, globals_) + asyncio.run(async_f()) + self.assertEqual(globals_['a'], 1) + + # test we can await-eval, + globals_ = {'asyncio': asyncio, 'a':0, 'arange': arange} + asyncio.run(eval(co, globals_)) + self.assertEqual(globals_['a'], 1) + finally: + asyncio.set_event_loop_policy(policy) + + def test_compile_async_generator(self): + """ + With the PyCF_ALLOW_TOP_LEVEL_AWAIT flag added in 3.8, we want to + make sure AsyncGenerators are still properly not marked with CO_COROUTINE + """ + code = dedent("""async def ticker(): + for i in range(10): + yield i + await asyncio.sleep(0)""") + + co = compile(code, '?', 'exec', flags=ast.PyCF_ALLOW_TOP_LEVEL_AWAIT) + glob = {} + exec(co, glob) + self.assertEqual(type(glob['ticker']()), AsyncGeneratorType) + + def test_delattr(self): sys.spam = 1 delattr(sys, 'spam') |