summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_builtin.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_builtin.py')
-rw-r--r--Lib/test/test_builtin.py73
1 files changed, 72 insertions, 1 deletions
diff --git a/Lib/test/test_builtin.py b/Lib/test/test_builtin.py
index 5674ea8..4a358e8 100644
--- a/Lib/test/test_builtin.py
+++ b/Lib/test/test_builtin.py
@@ -1,6 +1,7 @@
# Python test set -- built-in functions
import ast
+import asyncio
import builtins
import collections
import decimal
@@ -18,9 +19,14 @@ import types
import unittest
import warnings
from contextlib import ExitStack
+from inspect import CO_COROUTINE
+from itertools import product
+from textwrap import dedent
+from types import AsyncGeneratorType, FunctionType
from operator import neg
from test.support import (
- EnvironmentVarGuard, TESTFN, check_warnings, swap_attr, unlink)
+ EnvironmentVarGuard, TESTFN, check_warnings, swap_attr, unlink,
+ maybe_get_event_loop_policy)
from test.support.script_helper import assert_python_ok
from unittest.mock import MagicMock, patch
try:
@@ -358,6 +364,71 @@ class BuiltinTest(unittest.TestCase):
rv = ns['f']()
self.assertEqual(rv, tuple(expected))
+ def test_compile_top_level_await(self):
+ """Test whether code some top level await can be compiled.
+
+ Make sure it compiles only with the PyCF_ALLOW_TOP_LEVEL_AWAIT flag set,
+ and make sure the generated code object has the CO_COROUTINE flag set in
+ order to execute it with `await eval(.....)` instead of exec, or via a
+ FunctionType.
+ """
+
+ # helper function just to check we can run top=level async-for
+ async def arange(n):
+ for i in range(n):
+ yield i
+
+ modes = ('single', 'exec')
+ code_samples = ['''a = await asyncio.sleep(0, result=1)''',
+ '''async for i in arange(1):
+ a = 1''',
+ '''async with asyncio.Lock() as l:
+ a = 1''']
+ policy = maybe_get_event_loop_policy()
+ try:
+ for mode, code_sample in product(modes,code_samples):
+ source = dedent(code_sample)
+ with self.assertRaises(SyntaxError, msg=f"{source=} {mode=}"):
+ compile(source, '?' , mode)
+
+ co = compile(source,
+ '?',
+ mode,
+ flags=ast.PyCF_ALLOW_TOP_LEVEL_AWAIT)
+
+ self.assertEqual(co.co_flags & CO_COROUTINE, CO_COROUTINE,
+ msg=f"{source=} {mode=}")
+
+
+ # test we can create and advance a function type
+ globals_ = {'asyncio': asyncio, 'a':0, 'arange': arange}
+ async_f = FunctionType(co, globals_)
+ asyncio.run(async_f())
+ self.assertEqual(globals_['a'], 1)
+
+ # test we can await-eval,
+ globals_ = {'asyncio': asyncio, 'a':0, 'arange': arange}
+ asyncio.run(eval(co, globals_))
+ self.assertEqual(globals_['a'], 1)
+ finally:
+ asyncio.set_event_loop_policy(policy)
+
+ def test_compile_async_generator(self):
+ """
+ With the PyCF_ALLOW_TOP_LEVEL_AWAIT flag added in 3.8, we want to
+ make sure AsyncGenerators are still properly not marked with CO_COROUTINE
+ """
+ code = dedent("""async def ticker():
+ for i in range(10):
+ yield i
+ await asyncio.sleep(0)""")
+
+ co = compile(code, '?', 'exec', flags=ast.PyCF_ALLOW_TOP_LEVEL_AWAIT)
+ glob = {}
+ exec(co, glob)
+ self.assertEqual(type(glob['ticker']()), AsyncGeneratorType)
+
+
def test_delattr(self):
sys.spam = 1
delattr(sys, 'spam')