summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_interpreters/test_api.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_interpreters/test_api.py')
-rw-r--r--Lib/test/test_interpreters/test_api.py291
1 files changed, 237 insertions, 54 deletions
diff --git a/Lib/test/test_interpreters/test_api.py b/Lib/test/test_interpreters/test_api.py
index aefd326..363143f 100644
--- a/Lib/test/test_interpreters/test_api.py
+++ b/Lib/test/test_interpreters/test_api.py
@@ -280,7 +280,7 @@ class TestInterpreterIsRunning(TestBase):
def test_finished(self):
r, w = self.pipe()
interp = interpreters.create()
- interp.exec_sync(f"""if True:
+ interp.exec(f"""if True:
import os
os.write({w}, b'x')
""")
@@ -312,7 +312,7 @@ class TestInterpreterIsRunning(TestBase):
FINISHED = b'F'
interp = interpreters.create()
- interp.exec_sync(f"""if True:
+ interp.exec(f"""if True:
import os
import threading
@@ -326,7 +326,7 @@ class TestInterpreterIsRunning(TestBase):
self.assertFalse(interp.is_running())
os.write(w_thread, DONE)
- interp.exec_sync('t.join()')
+ interp.exec('t.join()')
self.assertEqual(os.read(r_interp, 1), FINISHED)
@@ -393,7 +393,7 @@ class TestInterpreterClose(TestBase):
interp2 = interpreters.create()
self.assertEqual(set(interpreters.list_all()),
{main, interp1, interp2})
- interp1.exec_sync(dedent(f"""
+ interp1.exec(dedent(f"""
from test.support import interpreters
interp2 = interpreters.Interpreter({interp2.id})
interp2.close()
@@ -427,7 +427,7 @@ class TestInterpreterClose(TestBase):
FINISHED = b'F'
interp = interpreters.create()
- interp.exec_sync(f"""if True:
+ interp.exec(f"""if True:
import os
import threading
import time
@@ -503,27 +503,27 @@ class TestInterpreterPrepareMain(TestBase):
interp.prepare_main(spam={'spam': 'eggs', 'foo': 'bar'})
# Make sure neither was actually bound.
- with self.assertRaises(interpreters.ExecFailure):
- interp.exec_sync('print(foo)')
- with self.assertRaises(interpreters.ExecFailure):
- interp.exec_sync('print(spam)')
+ with self.assertRaises(interpreters.ExecutionFailed):
+ interp.exec('print(foo)')
+ with self.assertRaises(interpreters.ExecutionFailed):
+ interp.exec('print(spam)')
-class TestInterpreterExecSync(TestBase):
+class TestInterpreterExec(TestBase):
def test_success(self):
interp = interpreters.create()
script, file = _captured_script('print("it worked!", end="")')
with file:
- interp.exec_sync(script)
+ interp.exec(script)
out = file.read()
self.assertEqual(out, 'it worked!')
def test_failure(self):
interp = interpreters.create()
- with self.assertRaises(interpreters.ExecFailure):
- interp.exec_sync('raise Exception')
+ with self.assertRaises(interpreters.ExecutionFailed):
+ interp.exec('raise Exception')
def test_display_preserved_exception(self):
tempdir = self.temp_dir()
@@ -542,21 +542,21 @@ class TestInterpreterExecSync(TestBase):
spam.eggs()
interp = interpreters.create()
- interp.exec_sync(script)
+ interp.exec(script)
""")
stdout, stderr = self.assert_python_failure(scriptfile)
self.maxDiff = None
- interpmod_line, = (l for l in stderr.splitlines() if ' exec_sync' in l)
- # File "{interpreters.__file__}", line 179, in exec_sync
+ interpmod_line, = (l for l in stderr.splitlines() if ' exec' in l)
+ # File "{interpreters.__file__}", line 179, in exec
self.assertEqual(stderr, dedent(f"""\
Traceback (most recent call last):
File "{scriptfile}", line 9, in <module>
- interp.exec_sync(script)
- ~~~~~~~~~~~~~~~~^^^^^^^^
+ interp.exec(script)
+ ~~~~~~~~~~~^^^^^^^^
{interpmod_line.strip()}
- raise ExecFailure(excinfo)
- test.support.interpreters.ExecFailure: RuntimeError: uh-oh!
+ raise ExecutionFailed(excinfo)
+ test.support.interpreters.ExecutionFailed: RuntimeError: uh-oh!
Uncaught in the interpreter:
@@ -578,7 +578,7 @@ class TestInterpreterExecSync(TestBase):
script, file = _captured_script('print("it worked!", end="")')
with file:
def f():
- interp.exec_sync(script)
+ interp.exec(script)
t = threading.Thread(target=f)
t.start()
@@ -604,7 +604,7 @@ class TestInterpreterExecSync(TestBase):
with open('{file.name}', 'w', encoding='utf-8') as out:
out.write('{expected}')
""")
- interp.exec_sync(script)
+ interp.exec(script)
file.seek(0)
content = file.read()
@@ -615,17 +615,17 @@ class TestInterpreterExecSync(TestBase):
interp = interpreters.create()
with _running(interp):
with self.assertRaises(RuntimeError):
- interp.exec_sync('print("spam")')
+ interp.exec('print("spam")')
def test_bad_script(self):
interp = interpreters.create()
with self.assertRaises(TypeError):
- interp.exec_sync(10)
+ interp.exec(10)
def test_bytes_for_script(self):
interp = interpreters.create()
with self.assertRaises(TypeError):
- interp.exec_sync(b'print("spam")')
+ interp.exec(b'print("spam")')
def test_with_background_threads_still_running(self):
r_interp, w_interp = self.pipe()
@@ -636,7 +636,7 @@ class TestInterpreterExecSync(TestBase):
FINISHED = b'F'
interp = interpreters.create()
- interp.exec_sync(f"""if True:
+ interp.exec(f"""if True:
import os
import threading
@@ -648,46 +648,229 @@ class TestInterpreterExecSync(TestBase):
t.start()
os.write({w_interp}, {RAN!r})
""")
- interp.exec_sync(f"""if True:
+ interp.exec(f"""if True:
os.write({w_interp}, {RAN!r})
""")
os.write(w_thread, DONE)
- interp.exec_sync('t.join()')
+ interp.exec('t.join()')
self.assertEqual(os.read(r_interp, 1), RAN)
self.assertEqual(os.read(r_interp, 1), RAN)
self.assertEqual(os.read(r_interp, 1), FINISHED)
# test_xxsubinterpreters covers the remaining
- # Interpreter.exec_sync() behavior.
+ # Interpreter.exec() behavior.
-class TestInterpreterRun(TestBase):
-
- def test_success(self):
- interp = interpreters.create()
- script, file = _captured_script('print("it worked!", end="")')
- with file:
- t = interp.run(script)
+def call_func_noop():
+ pass
+
+
+def call_func_return_shareable():
+ return (1, None)
+
+
+def call_func_return_not_shareable():
+ return [1, 2, 3]
+
+
+def call_func_failure():
+ raise Exception('spam!')
+
+
+def call_func_ident(value):
+ return value
+
+
+def get_call_func_closure(value):
+ def call_func_closure():
+ return value
+ return call_func_closure
+
+
+class Spam:
+
+ @staticmethod
+ def noop():
+ pass
+
+ @classmethod
+ def from_values(cls, *values):
+ return cls(values)
+
+ def __init__(self, value):
+ self.value = value
+
+ def __call__(self, *args, **kwargs):
+ return (self.value, args, kwargs)
+
+ def __eq__(self, other):
+ if not isinstance(other, Spam):
+ return NotImplemented
+ return self.value == other.value
+
+ def run(self, *args, **kwargs):
+ return (self.value, args, kwargs)
+
+
+def call_func_complex(op, /, value=None, *args, exc=None, **kwargs):
+ if exc is not None:
+ raise exc
+ if op == '':
+ raise ValueError('missing op')
+ elif op == 'ident':
+ if args or kwargs:
+ raise Exception((args, kwargs))
+ return value
+ elif op == 'full-ident':
+ return (value, args, kwargs)
+ elif op == 'globals':
+ if value is not None or args or kwargs:
+ raise Exception((value, args, kwargs))
+ return __name__
+ elif op == 'interpid':
+ if value is not None or args or kwargs:
+ raise Exception((value, args, kwargs))
+ return interpreters.get_current().id
+ elif op == 'closure':
+ if args or kwargs:
+ raise Exception((args, kwargs))
+ return get_call_func_closure(value)
+ elif op == 'custom':
+ if args or kwargs:
+ raise Exception((args, kwargs))
+ return Spam(value)
+ elif op == 'custom-inner':
+ if args or kwargs:
+ raise Exception((args, kwargs))
+ class Eggs(Spam):
+ pass
+ return Eggs(value)
+ elif not isinstance(op, str):
+ raise TypeError(op)
+ else:
+ raise NotImplementedError(op)
+
+
+class TestInterpreterCall(TestBase):
+
+ # signature
+ # - blank
+ # - args
+ # - kwargs
+ # - args, kwargs
+ # return
+ # - nothing (None)
+ # - simple
+ # - closure
+ # - custom
+ # ops:
+ # - do nothing
+ # - fail
+ # - echo
+ # - do complex, relative to interpreter
+ # scope
+ # - global func
+ # - local closure
+ # - returned closure
+ # - callable type instance
+ # - type
+ # - classmethod
+ # - staticmethod
+ # - instance method
+ # exception
+ # - builtin
+ # - custom
+ # - preserves info (e.g. SyntaxError)
+ # - matching error display
+
+ def test_call(self):
+ interp = interpreters.create()
+
+ for i, (callable, args, kwargs) in enumerate([
+ (call_func_noop, (), {}),
+ (call_func_return_shareable, (), {}),
+ (call_func_return_not_shareable, (), {}),
+ (Spam.noop, (), {}),
+ ]):
+ with self.subTest(f'success case #{i+1}'):
+ res = interp.call(callable)
+ self.assertIs(res, None)
+
+ for i, (callable, args, kwargs) in enumerate([
+ (call_func_ident, ('spamspamspam',), {}),
+ (get_call_func_closure, (42,), {}),
+ (get_call_func_closure(42), (), {}),
+ (Spam.from_values, (), {}),
+ (Spam.from_values, (1, 2, 3), {}),
+ (Spam, ('???'), {}),
+ (Spam(101), (), {}),
+ (Spam(10101).run, (), {}),
+ (call_func_complex, ('ident', 'spam'), {}),
+ (call_func_complex, ('full-ident', 'spam'), {}),
+ (call_func_complex, ('full-ident', 'spam', 'ham'), {'eggs': '!!!'}),
+ (call_func_complex, ('globals',), {}),
+ (call_func_complex, ('interpid',), {}),
+ (call_func_complex, ('closure',), {'value': '~~~'}),
+ (call_func_complex, ('custom', 'spam!'), {}),
+ (call_func_complex, ('custom-inner', 'eggs!'), {}),
+ (call_func_complex, ('???',), {'exc': ValueError('spam')}),
+ ]):
+ with self.subTest(f'invalid case #{i+1}'):
+ with self.assertRaises(Exception):
+ if args or kwargs:
+ raise Exception((args, kwargs))
+ interp.call(callable)
+
+ with self.assertRaises(interpreters.ExecutionFailed):
+ interp.call(call_func_failure)
+
+ def test_call_in_thread(self):
+ interp = interpreters.create()
+
+ for i, (callable, args, kwargs) in enumerate([
+ (call_func_noop, (), {}),
+ (call_func_return_shareable, (), {}),
+ (call_func_return_not_shareable, (), {}),
+ (Spam.noop, (), {}),
+ ]):
+ with self.subTest(f'success case #{i+1}'):
+ with self.captured_thread_exception() as ctx:
+ t = interp.call_in_thread(callable)
+ t.join()
+ self.assertIsNone(ctx.caught)
+
+ for i, (callable, args, kwargs) in enumerate([
+ (call_func_ident, ('spamspamspam',), {}),
+ (get_call_func_closure, (42,), {}),
+ (get_call_func_closure(42), (), {}),
+ (Spam.from_values, (), {}),
+ (Spam.from_values, (1, 2, 3), {}),
+ (Spam, ('???'), {}),
+ (Spam(101), (), {}),
+ (Spam(10101).run, (), {}),
+ (call_func_complex, ('ident', 'spam'), {}),
+ (call_func_complex, ('full-ident', 'spam'), {}),
+ (call_func_complex, ('full-ident', 'spam', 'ham'), {'eggs': '!!!'}),
+ (call_func_complex, ('globals',), {}),
+ (call_func_complex, ('interpid',), {}),
+ (call_func_complex, ('closure',), {'value': '~~~'}),
+ (call_func_complex, ('custom', 'spam!'), {}),
+ (call_func_complex, ('custom-inner', 'eggs!'), {}),
+ (call_func_complex, ('???',), {'exc': ValueError('spam')}),
+ ]):
+ with self.subTest(f'invalid case #{i+1}'):
+ if args or kwargs:
+ continue
+ with self.captured_thread_exception() as ctx:
+ t = interp.call_in_thread(callable)
+ t.join()
+ self.assertIsNotNone(ctx.caught)
+
+ with self.captured_thread_exception() as ctx:
+ t = interp.call_in_thread(call_func_failure)
t.join()
- out = file.read()
-
- self.assertEqual(out, 'it worked!')
-
- def test_failure(self):
- caught = False
- def excepthook(args):
- nonlocal caught
- caught = True
- threading.excepthook = excepthook
- try:
- interp = interpreters.create()
- t = interp.run('raise Exception')
- t.join()
-
- self.assertTrue(caught)
- except BaseException:
- threading.excepthook = threading.__excepthook__
+ self.assertIsNotNone(ctx.caught)
class TestIsShareable(TestBase):