summaryrefslogtreecommitdiffstats
path: root/Lib/test/test__xxsubinterpreters.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test__xxsubinterpreters.py')
-rw-r--r--Lib/test/test__xxsubinterpreters.py1194
1 files changed, 995 insertions, 199 deletions
diff --git a/Lib/test/test__xxsubinterpreters.py b/Lib/test/test__xxsubinterpreters.py
index 4ef7771..118f2e4 100644
--- a/Lib/test/test__xxsubinterpreters.py
+++ b/Lib/test/test__xxsubinterpreters.py
@@ -1,6 +1,9 @@
+from collections import namedtuple
import contextlib
+import itertools
import os
import pickle
+import sys
from textwrap import dedent, indent
import threading
import time
@@ -12,23 +15,32 @@ from test.support import script_helper
interpreters = support.import_module('_xxsubinterpreters')
+##################################
+# helpers
+
+def powerset(*sets):
+ return itertools.chain.from_iterable(
+ combinations(sets, r)
+ for r in range(len(sets)+1))
+
+
def _captured_script(script):
r, w = os.pipe()
indented = script.replace('\n', '\n ')
wrapped = dedent(f"""
import contextlib
- with open({w}, 'w') as chan:
- with contextlib.redirect_stdout(chan):
+ with open({w}, 'w') as spipe:
+ with contextlib.redirect_stdout(spipe):
{indented}
""")
return wrapped, open(r)
def _run_output(interp, request, shared=None):
- script, chan = _captured_script(request)
- with chan:
+ script, rpipe = _captured_script(request)
+ with rpipe:
interpreters.run_string(interp, script, shared)
- return chan.read()
+ return rpipe.read()
@contextlib.contextmanager
@@ -37,8 +49,8 @@ def _running(interp):
def run():
interpreters.run_string(interp, dedent(f"""
# wait for "signal"
- with open({r}) as chan:
- chan.read()
+ with open({r}) as rpipe:
+ rpipe.read()
"""))
t = threading.Thread(target=run)
@@ -46,11 +58,248 @@ def _running(interp):
yield
- with open(w, 'w') as chan:
- chan.write('done')
+ with open(w, 'w') as spipe:
+ spipe.write('done')
t.join()
+#@contextmanager
+#def run_threaded(id, source, **shared):
+# def run():
+# run_interp(id, source, **shared)
+# t = threading.Thread(target=run)
+# t.start()
+# yield
+# t.join()
+
+
+def run_interp(id, source, **shared):
+ _run_interp(id, source, shared)
+
+
+def _run_interp(id, source, shared, _mainns={}):
+ source = dedent(source)
+ main = interpreters.get_main()
+ if main == id:
+ if interpreters.get_current() != main:
+ raise RuntimeError
+ # XXX Run a func?
+ exec(source, _mainns)
+ else:
+ interpreters.run_string(id, source, shared)
+
+
+def run_interp_threaded(id, source, **shared):
+ def run():
+ _run(id, source, shared)
+ t = threading.Thread(target=run)
+ t.start()
+ t.join()
+
+
+class Interpreter(namedtuple('Interpreter', 'name id')):
+
+ @classmethod
+ def from_raw(cls, raw):
+ if isinstance(raw, cls):
+ return raw
+ elif isinstance(raw, str):
+ return cls(raw)
+ else:
+ raise NotImplementedError
+
+ def __new__(cls, name=None, id=None):
+ main = interpreters.get_main()
+ if id == main:
+ if not name:
+ name = 'main'
+ elif name != 'main':
+ raise ValueError(
+ 'name mismatch (expected "main", got "{}")'.format(name))
+ id = main
+ elif id is not None:
+ if not name:
+ name = 'interp'
+ elif name == 'main':
+ raise ValueError('name mismatch (unexpected "main")')
+ if not isinstance(id, interpreters.InterpreterID):
+ id = interpreters.InterpreterID(id)
+ elif not name or name == 'main':
+ name = 'main'
+ id = main
+ else:
+ id = interpreters.create()
+ self = super().__new__(cls, name, id)
+ return self
+
+
+# XXX expect_channel_closed() is unnecessary once we improve exc propagation.
+
+@contextlib.contextmanager
+def expect_channel_closed():
+ try:
+ yield
+ except interpreters.ChannelClosedError:
+ pass
+ else:
+ assert False, 'channel not closed'
+
+
+class ChannelAction(namedtuple('ChannelAction', 'action end interp')):
+
+ def __new__(cls, action, end=None, interp=None):
+ if not end:
+ end = 'both'
+ if not interp:
+ interp = 'main'
+ self = super().__new__(cls, action, end, interp)
+ return self
+
+ def __init__(self, *args, **kwargs):
+ if self.action == 'use':
+ if self.end not in ('same', 'opposite', 'send', 'recv'):
+ raise ValueError(self.end)
+ elif self.action in ('close', 'force-close'):
+ if self.end not in ('both', 'same', 'opposite', 'send', 'recv'):
+ raise ValueError(self.end)
+ else:
+ raise ValueError(self.action)
+ if self.interp not in ('main', 'same', 'other', 'extra'):
+ raise ValueError(self.interp)
+
+ def resolve_end(self, end):
+ if self.end == 'same':
+ return end
+ elif self.end == 'opposite':
+ return 'recv' if end == 'send' else 'send'
+ else:
+ return self.end
+
+ def resolve_interp(self, interp, other, extra):
+ if self.interp == 'same':
+ return interp
+ elif self.interp == 'other':
+ if other is None:
+ raise RuntimeError
+ return other
+ elif self.interp == 'extra':
+ if extra is None:
+ raise RuntimeError
+ return extra
+ elif self.interp == 'main':
+ if interp.name == 'main':
+ return interp
+ elif other and other.name == 'main':
+ return other
+ else:
+ raise RuntimeError
+ # Per __init__(), there aren't any others.
+
+
+class ChannelState(namedtuple('ChannelState', 'pending closed')):
+
+ def __new__(cls, pending=0, *, closed=False):
+ self = super().__new__(cls, pending, closed)
+ return self
+
+ def incr(self):
+ return type(self)(self.pending + 1, closed=self.closed)
+
+ def decr(self):
+ return type(self)(self.pending - 1, closed=self.closed)
+
+ def close(self, *, force=True):
+ if self.closed:
+ if not force or self.pending == 0:
+ return self
+ return type(self)(0 if force else self.pending, closed=True)
+
+
+def run_action(cid, action, end, state, *, hideclosed=True):
+ if state.closed:
+ if action == 'use' and end == 'recv' and state.pending:
+ expectfail = False
+ else:
+ expectfail = True
+ else:
+ expectfail = False
+
+ try:
+ result = _run_action(cid, action, end, state)
+ except interpreters.ChannelClosedError:
+ if not hideclosed and not expectfail:
+ raise
+ result = state.close()
+ else:
+ if expectfail:
+ raise ... # XXX
+ return result
+
+
+def _run_action(cid, action, end, state):
+ if action == 'use':
+ if end == 'send':
+ interpreters.channel_send(cid, b'spam')
+ return state.incr()
+ elif end == 'recv':
+ if not state.pending:
+ try:
+ interpreters.channel_recv(cid)
+ except interpreters.ChannelEmptyError:
+ return state
+ else:
+ raise Exception('expected ChannelEmptyError')
+ else:
+ interpreters.channel_recv(cid)
+ return state.decr()
+ else:
+ raise ValueError(end)
+ elif action == 'close':
+ kwargs = {}
+ if end in ('recv', 'send'):
+ kwargs[end] = True
+ interpreters.channel_close(cid, **kwargs)
+ return state.close()
+ elif action == 'force-close':
+ kwargs = {
+ 'force': True,
+ }
+ if end in ('recv', 'send'):
+ kwargs[end] = True
+ interpreters.channel_close(cid, **kwargs)
+ return state.close(force=True)
+ else:
+ raise ValueError(action)
+
+
+def clean_up_interpreters():
+ for id in interpreters.list_all():
+ if id == 0: # main
+ continue
+ try:
+ interpreters.destroy(id)
+ except RuntimeError:
+ pass # already destroyed
+
+
+def clean_up_channels():
+ for cid in interpreters.channel_list_all():
+ try:
+ interpreters.channel_destroy(cid)
+ except interpreters.ChannelNotFoundError:
+ pass # already destroyed
+
+
+class TestBase(unittest.TestCase):
+
+ def tearDown(self):
+ clean_up_interpreters()
+ clean_up_channels()
+
+
+##################################
+# misc. tests
+
class IsShareableTests(unittest.TestCase):
def test_default_shareables(self):
@@ -59,6 +308,9 @@ class IsShareableTests(unittest.TestCase):
None,
# builtin objects
b'spam',
+ 'spam',
+ 10,
+ -10,
]
for obj in shareables:
with self.subTest(obj):
@@ -86,37 +338,65 @@ class IsShareableTests(unittest.TestCase):
object,
object(),
Exception(),
- 42,
100.0,
- 'spam',
# user-defined types and objects
Cheese,
Cheese('Wensleydale'),
SubBytes(b'spam'),
]
for obj in not_shareables:
- with self.subTest(obj):
+ with self.subTest(repr(obj)):
self.assertFalse(
interpreters.is_shareable(obj))
-class TestBase(unittest.TestCase):
+class ShareableTypeTests(unittest.TestCase):
+
+ def setUp(self):
+ super().setUp()
+ self.cid = interpreters.channel_create()
def tearDown(self):
- for id in interpreters.list_all():
- if id == 0: # main
- continue
- try:
- interpreters.destroy(id)
- except RuntimeError:
- pass # already destroyed
+ interpreters.channel_destroy(self.cid)
+ super().tearDown()
- for cid in interpreters.channel_list_all():
- try:
- interpreters.channel_destroy(cid)
- except interpreters.ChannelNotFoundError:
- pass # already destroyed
+ def _assert_values(self, values):
+ for obj in values:
+ with self.subTest(obj):
+ interpreters.channel_send(self.cid, obj)
+ got = interpreters.channel_recv(self.cid)
+
+ self.assertEqual(got, obj)
+ self.assertIs(type(got), type(obj))
+ # XXX Check the following in the channel tests?
+ #self.assertIsNot(got, obj)
+
+ def test_singletons(self):
+ for obj in [None]:
+ with self.subTest(obj):
+ interpreters.channel_send(self.cid, obj)
+ got = interpreters.channel_recv(self.cid)
+
+ # XXX What about between interpreters?
+ self.assertIs(got, obj)
+
+ def test_types(self):
+ self._assert_values([
+ b'spam',
+ 9999,
+ self.cid,
+ ])
+
+ def test_bytes(self):
+ self._assert_values(i.to_bytes(2, 'little', signed=True)
+ for i in range(-1, 258))
+ def test_int(self):
+ self._assert_values(range(-1, 258))
+
+
+##################################
+# interpreter tests
class ListAllTests(TestBase):
@@ -147,13 +427,16 @@ class GetCurrentTests(TestBase):
main = interpreters.get_main()
cur = interpreters.get_current()
self.assertEqual(cur, main)
+ self.assertIsInstance(cur, interpreters.InterpreterID)
def test_subinterpreter(self):
main = interpreters.get_main()
interp = interpreters.create()
out = _run_output(interp, dedent("""
import _xxsubinterpreters as _interpreters
- print(int(_interpreters.get_current()))
+ cur = _interpreters.get_current()
+ print(cur)
+ assert isinstance(cur, _interpreters.InterpreterID)
"""))
cur = int(out.strip())
_, expected = interpreters.list_all()
@@ -167,13 +450,16 @@ class GetMainTests(TestBase):
[expected] = interpreters.list_all()
main = interpreters.get_main()
self.assertEqual(main, expected)
+ self.assertIsInstance(main, interpreters.InterpreterID)
def test_from_subinterpreter(self):
[expected] = interpreters.list_all()
interp = interpreters.create()
out = _run_output(interp, dedent("""
import _xxsubinterpreters as _interpreters
- print(int(_interpreters.get_main()))
+ main = _interpreters.get_main()
+ print(main)
+ assert isinstance(main, _interpreters.InterpreterID)
"""))
main = int(out.strip())
self.assertEqual(main, expected)
@@ -197,7 +483,7 @@ class IsRunningTests(TestBase):
interp = interpreters.create()
out = _run_output(interp, dedent(f"""
import _xxsubinterpreters as _interpreters
- if _interpreters.is_running({int(interp)}):
+ if _interpreters.is_running({interp}):
print(True)
else:
print(False)
@@ -257,6 +543,10 @@ class InterpreterIDTests(TestBase):
with self.assertRaises(RuntimeError):
interpreters.InterpreterID(int(id) + 1) # unforced
+ def test_str(self):
+ id = interpreters.InterpreterID(10, force=True)
+ self.assertEqual(str(id), '10')
+
def test_repr(self):
id = interpreters.InterpreterID(10, force=True)
self.assertEqual(repr(id), 'InterpreterID(10)')
@@ -280,6 +570,7 @@ class CreateTests(TestBase):
def test_in_main(self):
id = interpreters.create()
+ self.assertIsInstance(id, interpreters.InterpreterID)
self.assertIn(id, interpreters.list_all())
@@ -314,7 +605,8 @@ class CreateTests(TestBase):
out = _run_output(id1, dedent("""
import _xxsubinterpreters as _interpreters
id = _interpreters.create()
- print(int(id))
+ print(id)
+ assert isinstance(id, _interpreters.InterpreterID)
"""))
id2 = int(out.strip())
@@ -329,7 +621,7 @@ class CreateTests(TestBase):
out = _run_output(id1, dedent("""
import _xxsubinterpreters as _interpreters
id = _interpreters.create()
- print(int(id))
+ print(id)
"""))
id2 = int(out.strip())
@@ -423,7 +715,7 @@ class DestroyTests(TestBase):
script = dedent(f"""
import _xxsubinterpreters as _interpreters
try:
- _interpreters.destroy({int(id)})
+ _interpreters.destroy({id})
except RuntimeError:
pass
""")
@@ -437,7 +729,7 @@ class DestroyTests(TestBase):
id2 = interpreters.create()
script = dedent(f"""
import _xxsubinterpreters as _interpreters
- _interpreters.destroy({int(id2)})
+ _interpreters.destroy({id2})
""")
interpreters.run_string(id1, script)
@@ -783,6 +1075,9 @@ class RunStringTests(TestBase):
self.assertEqual(retcode, 0)
+##################################
+# channel tests
+
class ChannelIDTests(TestBase):
def test_default_kwargs(self):
@@ -842,6 +1137,10 @@ class ChannelIDTests(TestBase):
with self.assertRaises(interpreters.ChannelNotFoundError):
interpreters._channel_id(int(cid) + 1) # unforced
+ def test_str(self):
+ cid = interpreters._channel_id(10, force=True)
+ self.assertEqual(str(cid), '10')
+
def test_repr(self):
cid = interpreters._channel_id(10, force=True)
self.assertEqual(repr(cid), 'ChannelID(10)')
@@ -872,6 +1171,10 @@ class ChannelIDTests(TestBase):
class ChannelTests(TestBase):
+ def test_create_cid(self):
+ cid = interpreters.channel_create()
+ self.assertIsInstance(cid, interpreters.ChannelID)
+
def test_sequential_ids(self):
before = interpreters.channel_list_all()
id1 = interpreters.channel_create()
@@ -888,7 +1191,7 @@ class ChannelTests(TestBase):
out = _run_output(id1, dedent("""
import _xxsubinterpreters as _interpreters
cid = _interpreters.channel_create()
- print(int(cid))
+ print(cid)
"""))
cid1 = int(out.strip())
@@ -896,7 +1199,7 @@ class ChannelTests(TestBase):
out = _run_output(id2, dedent("""
import _xxsubinterpreters as _interpreters
cid = _interpreters.channel_create()
- print(int(cid))
+ print(cid)
"""))
cid2 = int(out.strip())
@@ -904,127 +1207,133 @@ class ChannelTests(TestBase):
####################
- def test_drop_single_user(self):
+ def test_send_recv_main(self):
cid = interpreters.channel_create()
- interpreters.channel_send(cid, b'spam')
- interpreters.channel_recv(cid)
- interpreters.channel_drop_interpreter(cid, send=True, recv=True)
+ orig = b'spam'
+ interpreters.channel_send(cid, orig)
+ obj = interpreters.channel_recv(cid)
- with self.assertRaises(interpreters.ChannelClosedError):
- interpreters.channel_send(cid, b'eggs')
- with self.assertRaises(interpreters.ChannelClosedError):
- interpreters.channel_recv(cid)
+ self.assertEqual(obj, orig)
+ self.assertIsNot(obj, orig)
- def test_drop_multiple_users(self):
- cid = interpreters.channel_create()
+ def test_send_recv_same_interpreter(self):
id1 = interpreters.create()
- id2 = interpreters.create()
- interpreters.run_string(id1, dedent(f"""
+ out = _run_output(id1, dedent("""
import _xxsubinterpreters as _interpreters
- _interpreters.channel_send({int(cid)}, b'spam')
+ cid = _interpreters.channel_create()
+ orig = b'spam'
+ _interpreters.channel_send(cid, orig)
+ obj = _interpreters.channel_recv(cid)
+ assert obj is not orig
+ assert obj == orig
"""))
- out = _run_output(id2, dedent(f"""
+
+ def test_send_recv_different_interpreters(self):
+ cid = interpreters.channel_create()
+ id1 = interpreters.create()
+ out = _run_output(id1, dedent(f"""
import _xxsubinterpreters as _interpreters
- obj = _interpreters.channel_recv({int(cid)})
- _interpreters.channel_drop_interpreter({int(cid)})
- print(repr(obj))
- """))
- interpreters.run_string(id1, dedent(f"""
- _interpreters.channel_drop_interpreter({int(cid)})
+ _interpreters.channel_send({cid}, b'spam')
"""))
+ obj = interpreters.channel_recv(cid)
- self.assertEqual(out.strip(), "b'spam'")
+ self.assertEqual(obj, b'spam')
- def test_drop_no_kwargs(self):
+ def test_send_recv_different_threads(self):
cid = interpreters.channel_create()
- interpreters.channel_send(cid, b'spam')
- interpreters.channel_recv(cid)
- interpreters.channel_drop_interpreter(cid)
- with self.assertRaises(interpreters.ChannelClosedError):
- interpreters.channel_send(cid, b'eggs')
- with self.assertRaises(interpreters.ChannelClosedError):
- interpreters.channel_recv(cid)
+ def f():
+ while True:
+ try:
+ obj = interpreters.channel_recv(cid)
+ break
+ except interpreters.ChannelEmptyError:
+ time.sleep(0.1)
+ interpreters.channel_send(cid, obj)
+ t = threading.Thread(target=f)
+ t.start()
- def test_drop_multiple_times(self):
- cid = interpreters.channel_create()
interpreters.channel_send(cid, b'spam')
- interpreters.channel_recv(cid)
- interpreters.channel_drop_interpreter(cid, send=True, recv=True)
+ t.join()
+ obj = interpreters.channel_recv(cid)
- with self.assertRaises(interpreters.ChannelClosedError):
- interpreters.channel_drop_interpreter(cid, send=True, recv=True)
+ self.assertEqual(obj, b'spam')
- def test_drop_with_unused_items(self):
+ def test_send_recv_different_interpreters_and_threads(self):
cid = interpreters.channel_create()
+ id1 = interpreters.create()
+ out = None
+
+ def f():
+ nonlocal out
+ out = _run_output(id1, dedent(f"""
+ import time
+ import _xxsubinterpreters as _interpreters
+ while True:
+ try:
+ obj = _interpreters.channel_recv({cid})
+ break
+ except _interpreters.ChannelEmptyError:
+ time.sleep(0.1)
+ assert(obj == b'spam')
+ _interpreters.channel_send({cid}, b'eggs')
+ """))
+ t = threading.Thread(target=f)
+ t.start()
+
interpreters.channel_send(cid, b'spam')
- interpreters.channel_send(cid, b'ham')
- interpreters.channel_drop_interpreter(cid, send=True, recv=True)
+ t.join()
+ obj = interpreters.channel_recv(cid)
- with self.assertRaises(interpreters.ChannelClosedError):
- interpreters.channel_recv(cid)
+ self.assertEqual(obj, b'eggs')
- def test_drop_never_used(self):
- cid = interpreters.channel_create()
- interpreters.channel_drop_interpreter(cid)
+ def test_send_not_found(self):
+ with self.assertRaises(interpreters.ChannelNotFoundError):
+ interpreters.channel_send(10, b'spam')
- with self.assertRaises(interpreters.ChannelClosedError):
- interpreters.channel_send(cid, b'spam')
- with self.assertRaises(interpreters.ChannelClosedError):
+ def test_recv_not_found(self):
+ with self.assertRaises(interpreters.ChannelNotFoundError):
+ interpreters.channel_recv(10)
+
+ def test_recv_empty(self):
+ cid = interpreters.channel_create()
+ with self.assertRaises(interpreters.ChannelEmptyError):
interpreters.channel_recv(cid)
- def test_drop_by_unassociated_interp(self):
+ def test_run_string_arg_unresolved(self):
cid = interpreters.channel_create()
- interpreters.channel_send(cid, b'spam')
interp = interpreters.create()
- interpreters.run_string(interp, dedent(f"""
+
+ out = _run_output(interp, dedent("""
import _xxsubinterpreters as _interpreters
- _interpreters.channel_drop_interpreter({int(cid)})
- """))
+ print(cid.end)
+ _interpreters.channel_send(cid, b'spam')
+ """),
+ dict(cid=cid.send))
obj = interpreters.channel_recv(cid)
- interpreters.channel_drop_interpreter(cid)
- with self.assertRaises(interpreters.ChannelClosedError):
- interpreters.channel_send(cid, b'eggs')
self.assertEqual(obj, b'spam')
+ self.assertEqual(out.strip(), 'send')
- def test_drop_close_if_unassociated(self):
+ def test_run_string_arg_resolved(self):
cid = interpreters.channel_create()
+ cid = interpreters._channel_id(cid, _resolve=True)
interp = interpreters.create()
- interpreters.run_string(interp, dedent(f"""
- import _xxsubinterpreters as _interpreters
- obj = _interpreters.channel_send({int(cid)}, b'spam')
- _interpreters.channel_drop_interpreter({int(cid)})
- """))
-
- with self.assertRaises(interpreters.ChannelClosedError):
- interpreters.channel_recv(cid)
- def test_drop_partially(self):
- # XXX Is partial close too weird/confusing?
- cid = interpreters.channel_create()
- interpreters.channel_send(cid, None)
- interpreters.channel_recv(cid)
- interpreters.channel_send(cid, b'spam')
- interpreters.channel_drop_interpreter(cid, send=True)
+ out = _run_output(interp, dedent("""
+ import _xxsubinterpreters as _interpreters
+ print(chan.end)
+ _interpreters.channel_send(chan, b'spam')
+ #print(chan.id.end)
+ #_interpreters.channel_send(chan.id, b'spam')
+ """),
+ dict(chan=cid.send))
obj = interpreters.channel_recv(cid)
self.assertEqual(obj, b'spam')
+ self.assertEqual(out.strip(), 'send')
- def test_drop_used_multiple_times_by_single_user(self):
- cid = interpreters.channel_create()
- interpreters.channel_send(cid, b'spam')
- interpreters.channel_send(cid, b'spam')
- interpreters.channel_send(cid, b'spam')
- interpreters.channel_recv(cid)
- interpreters.channel_drop_interpreter(cid, send=True, recv=True)
-
- with self.assertRaises(interpreters.ChannelClosedError):
- interpreters.channel_send(cid, b'eggs')
- with self.assertRaises(interpreters.ChannelClosedError):
- interpreters.channel_recv(cid)
-
- ####################
+ # close
def test_close_single_user(self):
cid = interpreters.channel_create()
@@ -1043,21 +1352,21 @@ class ChannelTests(TestBase):
id2 = interpreters.create()
interpreters.run_string(id1, dedent(f"""
import _xxsubinterpreters as _interpreters
- _interpreters.channel_send({int(cid)}, b'spam')
+ _interpreters.channel_send({cid}, b'spam')
"""))
interpreters.run_string(id2, dedent(f"""
import _xxsubinterpreters as _interpreters
- _interpreters.channel_recv({int(cid)})
+ _interpreters.channel_recv({cid})
"""))
interpreters.channel_close(cid)
with self.assertRaises(interpreters.RunFailedError) as cm:
interpreters.run_string(id1, dedent(f"""
- _interpreters.channel_send({int(cid)}, b'spam')
+ _interpreters.channel_send({cid}, b'spam')
"""))
self.assertIn('ChannelClosedError', str(cm.exception))
with self.assertRaises(interpreters.RunFailedError) as cm:
interpreters.run_string(id2, dedent(f"""
- _interpreters.channel_send({int(cid)}, b'spam')
+ _interpreters.channel_send({cid}, b'spam')
"""))
self.assertIn('ChannelClosedError', str(cm.exception))
@@ -1094,7 +1403,7 @@ class ChannelTests(TestBase):
interp = interpreters.create()
interpreters.run_string(interp, dedent(f"""
import _xxsubinterpreters as _interpreters
- _interpreters.channel_close({int(cid)})
+ _interpreters.channel_close({cid})
"""))
with self.assertRaises(interpreters.ChannelClosedError):
interpreters.channel_recv(cid)
@@ -1114,115 +1423,602 @@ class ChannelTests(TestBase):
with self.assertRaises(interpreters.ChannelClosedError):
interpreters.channel_recv(cid)
- ####################
- def test_send_recv_main(self):
+class ChannelReleaseTests(TestBase):
+
+ # XXX Add more test coverage a la the tests for close().
+
+ """
+ - main / interp / other
+ - run in: current thread / new thread / other thread / different threads
+ - end / opposite
+ - force / no force
+ - used / not used (associated / not associated)
+ - empty / emptied / never emptied / partly emptied
+ - closed / not closed
+ - released / not released
+ - creator (interp) / other
+ - associated interpreter not running
+ - associated interpreter destroyed
+ """
+
+ """
+ use
+ pre-release
+ release
+ after
+ check
+ """
+
+ """
+ release in: main, interp1
+ creator: same, other (incl. interp2)
+
+ use: None,send,recv,send/recv in None,same,other(incl. interp2),same+other(incl. interp2),all
+ pre-release: None,send,recv,both in None,same,other(incl. interp2),same+other(incl. interp2),all
+ pre-release forced: None,send,recv,both in None,same,other(incl. interp2),same+other(incl. interp2),all
+
+ release: same
+ release forced: same
+
+ use after: None,send,recv,send/recv in None,same,other(incl. interp2),same+other(incl. interp2),all
+ release after: None,send,recv,send/recv in None,same,other(incl. interp2),same+other(incl. interp2),all
+ check released: send/recv for same/other(incl. interp2)
+ check closed: send/recv for same/other(incl. interp2)
+ """
+
+ def test_single_user(self):
cid = interpreters.channel_create()
- orig = b'spam'
- interpreters.channel_send(cid, orig)
- obj = interpreters.channel_recv(cid)
+ interpreters.channel_send(cid, b'spam')
+ interpreters.channel_recv(cid)
+ interpreters.channel_release(cid, send=True, recv=True)
- self.assertEqual(obj, orig)
- self.assertIsNot(obj, orig)
+ with self.assertRaises(interpreters.ChannelClosedError):
+ interpreters.channel_send(cid, b'eggs')
+ with self.assertRaises(interpreters.ChannelClosedError):
+ interpreters.channel_recv(cid)
- def test_send_recv_same_interpreter(self):
+ def test_multiple_users(self):
+ cid = interpreters.channel_create()
id1 = interpreters.create()
- out = _run_output(id1, dedent("""
+ id2 = interpreters.create()
+ interpreters.run_string(id1, dedent(f"""
import _xxsubinterpreters as _interpreters
- cid = _interpreters.channel_create()
- orig = b'spam'
- _interpreters.channel_send(cid, orig)
- obj = _interpreters.channel_recv(cid)
- assert obj is not orig
- assert obj == orig
+ _interpreters.channel_send({cid}, b'spam')
+ """))
+ out = _run_output(id2, dedent(f"""
+ import _xxsubinterpreters as _interpreters
+ obj = _interpreters.channel_recv({cid})
+ _interpreters.channel_release({cid})
+ print(repr(obj))
+ """))
+ interpreters.run_string(id1, dedent(f"""
+ _interpreters.channel_release({cid})
"""))
- def test_send_recv_different_interpreters(self):
+ self.assertEqual(out.strip(), "b'spam'")
+
+ def test_no_kwargs(self):
cid = interpreters.channel_create()
- id1 = interpreters.create()
- out = _run_output(id1, dedent(f"""
+ interpreters.channel_send(cid, b'spam')
+ interpreters.channel_recv(cid)
+ interpreters.channel_release(cid)
+
+ with self.assertRaises(interpreters.ChannelClosedError):
+ interpreters.channel_send(cid, b'eggs')
+ with self.assertRaises(interpreters.ChannelClosedError):
+ interpreters.channel_recv(cid)
+
+ def test_multiple_times(self):
+ cid = interpreters.channel_create()
+ interpreters.channel_send(cid, b'spam')
+ interpreters.channel_recv(cid)
+ interpreters.channel_release(cid, send=True, recv=True)
+
+ with self.assertRaises(interpreters.ChannelClosedError):
+ interpreters.channel_release(cid, send=True, recv=True)
+
+ def test_with_unused_items(self):
+ cid = interpreters.channel_create()
+ interpreters.channel_send(cid, b'spam')
+ interpreters.channel_send(cid, b'ham')
+ interpreters.channel_release(cid, send=True, recv=True)
+
+ with self.assertRaises(interpreters.ChannelClosedError):
+ interpreters.channel_recv(cid)
+
+ def test_never_used(self):
+ cid = interpreters.channel_create()
+ interpreters.channel_release(cid)
+
+ with self.assertRaises(interpreters.ChannelClosedError):
+ interpreters.channel_send(cid, b'spam')
+ with self.assertRaises(interpreters.ChannelClosedError):
+ interpreters.channel_recv(cid)
+
+ def test_by_unassociated_interp(self):
+ cid = interpreters.channel_create()
+ interpreters.channel_send(cid, b'spam')
+ interp = interpreters.create()
+ interpreters.run_string(interp, dedent(f"""
import _xxsubinterpreters as _interpreters
- _interpreters.channel_send({int(cid)}, b'spam')
+ _interpreters.channel_release({cid})
"""))
obj = interpreters.channel_recv(cid)
+ interpreters.channel_release(cid)
+ with self.assertRaises(interpreters.ChannelClosedError):
+ interpreters.channel_send(cid, b'eggs')
self.assertEqual(obj, b'spam')
- def test_send_recv_different_threads(self):
+ def test_close_if_unassociated(self):
+ # XXX Something's not right with this test...
cid = interpreters.channel_create()
+ interp = interpreters.create()
+ interpreters.run_string(interp, dedent(f"""
+ import _xxsubinterpreters as _interpreters
+ obj = _interpreters.channel_send({cid}, b'spam')
+ _interpreters.channel_release({cid})
+ """))
- def f():
- while True:
- try:
- obj = interpreters.channel_recv(cid)
- break
- except interpreters.ChannelEmptyError:
- time.sleep(0.1)
- interpreters.channel_send(cid, obj)
- t = threading.Thread(target=f)
- t.start()
+ with self.assertRaises(interpreters.ChannelClosedError):
+ interpreters.channel_recv(cid)
+ def test_partially(self):
+ # XXX Is partial close too weird/confusing?
+ cid = interpreters.channel_create()
+ interpreters.channel_send(cid, None)
+ interpreters.channel_recv(cid)
interpreters.channel_send(cid, b'spam')
- t.join()
+ interpreters.channel_release(cid, send=True)
obj = interpreters.channel_recv(cid)
self.assertEqual(obj, b'spam')
- def test_send_recv_different_interpreters_and_threads(self):
+ def test_used_multiple_times_by_single_user(self):
cid = interpreters.channel_create()
- id1 = interpreters.create()
- out = None
+ interpreters.channel_send(cid, b'spam')
+ interpreters.channel_send(cid, b'spam')
+ interpreters.channel_send(cid, b'spam')
+ interpreters.channel_recv(cid)
+ interpreters.channel_release(cid, send=True, recv=True)
- def f():
- nonlocal out
- out = _run_output(id1, dedent(f"""
- import time
- import _xxsubinterpreters as _interpreters
- while True:
- try:
- obj = _interpreters.channel_recv({int(cid)})
- break
- except _interpreters.ChannelEmptyError:
- time.sleep(0.1)
- assert(obj == b'spam')
- _interpreters.channel_send({int(cid)}, b'eggs')
- """))
- t = threading.Thread(target=f)
- t.start()
+ with self.assertRaises(interpreters.ChannelClosedError):
+ interpreters.channel_send(cid, b'eggs')
+ with self.assertRaises(interpreters.ChannelClosedError):
+ interpreters.channel_recv(cid)
- interpreters.channel_send(cid, b'spam')
- t.join()
- obj = interpreters.channel_recv(cid)
- self.assertEqual(obj, b'eggs')
+class ChannelCloseFixture(namedtuple('ChannelCloseFixture',
+ 'end interp other extra creator')):
- def test_send_not_found(self):
- with self.assertRaises(interpreters.ChannelNotFoundError):
- interpreters.channel_send(10, b'spam')
+ # Set this to True to avoid creating interpreters, e.g. when
+ # scanning through test permutations without running them.
+ QUICK = False
- def test_recv_not_found(self):
- with self.assertRaises(interpreters.ChannelNotFoundError):
- interpreters.channel_recv(10)
+ def __new__(cls, end, interp, other, extra, creator):
+ assert end in ('send', 'recv')
+ if cls.QUICK:
+ known = {}
+ else:
+ interp = Interpreter.from_raw(interp)
+ other = Interpreter.from_raw(other)
+ extra = Interpreter.from_raw(extra)
+ known = {
+ interp.name: interp,
+ other.name: other,
+ extra.name: extra,
+ }
+ if not creator:
+ creator = 'same'
+ self = super().__new__(cls, end, interp, other, extra, creator)
+ self._prepped = set()
+ self._state = ChannelState()
+ self._known = known
+ return self
- def test_recv_empty(self):
- cid = interpreters.channel_create()
- with self.assertRaises(interpreters.ChannelEmptyError):
- interpreters.channel_recv(cid)
+ @property
+ def state(self):
+ return self._state
- def test_run_string_arg(self):
- cid = interpreters.channel_create()
- interp = interpreters.create()
+ @property
+ def cid(self):
+ try:
+ return self._cid
+ except AttributeError:
+ creator = self._get_interpreter(self.creator)
+ self._cid = self._new_channel(creator)
+ return self._cid
+
+ def get_interpreter(self, interp):
+ interp = self._get_interpreter(interp)
+ self._prep_interpreter(interp)
+ return interp
+
+ def expect_closed_error(self, end=None):
+ if end is None:
+ end = self.end
+ if end == 'recv' and self.state.closed == 'send':
+ return False
+ return bool(self.state.closed)
+
+ def prep_interpreter(self, interp):
+ self._prep_interpreter(interp)
+
+ def record_action(self, action, result):
+ self._state = result
+
+ def clean_up(self):
+ clean_up_interpreters()
+ clean_up_channels()
+
+ # internal methods
+
+ def _new_channel(self, creator):
+ if creator.name == 'main':
+ return interpreters.channel_create()
+ else:
+ ch = interpreters.channel_create()
+ run_interp(creator.id, f"""
+ import _xxsubinterpreters
+ cid = _xxsubinterpreters.channel_create()
+ # We purposefully send back an int to avoid tying the
+ # channel to the other interpreter.
+ _xxsubinterpreters.channel_send({ch}, int(cid))
+ del _xxsubinterpreters
+ """)
+ self._cid = interpreters.channel_recv(ch)
+ return self._cid
+
+ def _get_interpreter(self, interp):
+ if interp in ('same', 'interp'):
+ return self.interp
+ elif interp == 'other':
+ return self.other
+ elif interp == 'extra':
+ return self.extra
+ else:
+ name = interp
+ try:
+ interp = self._known[name]
+ except KeyError:
+ interp = self._known[name] = Interpreter(name)
+ return interp
+
+ def _prep_interpreter(self, interp):
+ if interp.id in self._prepped:
+ return
+ self._prepped.add(interp.id)
+ if interp.name == 'main':
+ return
+ run_interp(interp.id, f"""
+ import _xxsubinterpreters as interpreters
+ import test.test__xxsubinterpreters as helpers
+ ChannelState = helpers.ChannelState
+ try:
+ cid
+ except NameError:
+ cid = interpreters._channel_id({self.cid})
+ """)
- out = _run_output(interp, dedent("""
- import _xxsubinterpreters as _interpreters
- print(cid.end)
- _interpreters.channel_send(cid, b'spam')
- """),
- dict(cid=cid.send))
- obj = interpreters.channel_recv(cid)
- self.assertEqual(obj, b'spam')
- self.assertEqual(out.strip(), 'send')
+@unittest.skip('these tests take several hours to run')
+class ExhaustiveChannelTests(TestBase):
+
+ """
+ - main / interp / other
+ - run in: current thread / new thread / other thread / different threads
+ - end / opposite
+ - force / no force
+ - used / not used (associated / not associated)
+ - empty / emptied / never emptied / partly emptied
+ - closed / not closed
+ - released / not released
+ - creator (interp) / other
+ - associated interpreter not running
+ - associated interpreter destroyed
+
+ - close after unbound
+ """
+
+ """
+ use
+ pre-close
+ close
+ after
+ check
+ """
+
+ """
+ close in: main, interp1
+ creator: same, other, extra
+
+ use: None,send,recv,send/recv in None,same,other,same+other,all
+ pre-close: None,send,recv in None,same,other,same+other,all
+ pre-close forced: None,send,recv in None,same,other,same+other,all
+
+ close: same
+ close forced: same
+
+ use after: None,send,recv,send/recv in None,same,other,extra,same+other,all
+ close after: None,send,recv,send/recv in None,same,other,extra,same+other,all
+ check closed: send/recv for same/other(incl. interp2)
+ """
+
+ def iter_action_sets(self):
+ # - used / not used (associated / not associated)
+ # - empty / emptied / never emptied / partly emptied
+ # - closed / not closed
+ # - released / not released
+
+ # never used
+ yield []
+
+ # only pre-closed (and possible used after)
+ for closeactions in self._iter_close_action_sets('same', 'other'):
+ yield closeactions
+ for postactions in self._iter_post_close_action_sets():
+ yield closeactions + postactions
+ for closeactions in self._iter_close_action_sets('other', 'extra'):
+ yield closeactions
+ for postactions in self._iter_post_close_action_sets():
+ yield closeactions + postactions
+
+ # used
+ for useactions in self._iter_use_action_sets('same', 'other'):
+ yield useactions
+ for closeactions in self._iter_close_action_sets('same', 'other'):
+ actions = useactions + closeactions
+ yield actions
+ for postactions in self._iter_post_close_action_sets():
+ yield actions + postactions
+ for closeactions in self._iter_close_action_sets('other', 'extra'):
+ actions = useactions + closeactions
+ yield actions
+ for postactions in self._iter_post_close_action_sets():
+ yield actions + postactions
+ for useactions in self._iter_use_action_sets('other', 'extra'):
+ yield useactions
+ for closeactions in self._iter_close_action_sets('same', 'other'):
+ actions = useactions + closeactions
+ yield actions
+ for postactions in self._iter_post_close_action_sets():
+ yield actions + postactions
+ for closeactions in self._iter_close_action_sets('other', 'extra'):
+ actions = useactions + closeactions
+ yield actions
+ for postactions in self._iter_post_close_action_sets():
+ yield actions + postactions
+
+ def _iter_use_action_sets(self, interp1, interp2):
+ interps = (interp1, interp2)
+
+ # only recv end used
+ yield [
+ ChannelAction('use', 'recv', interp1),
+ ]
+ yield [
+ ChannelAction('use', 'recv', interp2),
+ ]
+ yield [
+ ChannelAction('use', 'recv', interp1),
+ ChannelAction('use', 'recv', interp2),
+ ]
+
+ # never emptied
+ yield [
+ ChannelAction('use', 'send', interp1),
+ ]
+ yield [
+ ChannelAction('use', 'send', interp2),
+ ]
+ yield [
+ ChannelAction('use', 'send', interp1),
+ ChannelAction('use', 'send', interp2),
+ ]
+
+ # partially emptied
+ for interp1 in interps:
+ for interp2 in interps:
+ for interp3 in interps:
+ yield [
+ ChannelAction('use', 'send', interp1),
+ ChannelAction('use', 'send', interp2),
+ ChannelAction('use', 'recv', interp3),
+ ]
+
+ # fully emptied
+ for interp1 in interps:
+ for interp2 in interps:
+ for interp3 in interps:
+ for interp4 in interps:
+ yield [
+ ChannelAction('use', 'send', interp1),
+ ChannelAction('use', 'send', interp2),
+ ChannelAction('use', 'recv', interp3),
+ ChannelAction('use', 'recv', interp4),
+ ]
+
+ def _iter_close_action_sets(self, interp1, interp2):
+ ends = ('recv', 'send')
+ interps = (interp1, interp2)
+ for force in (True, False):
+ op = 'force-close' if force else 'close'
+ for interp in interps:
+ for end in ends:
+ yield [
+ ChannelAction(op, end, interp),
+ ]
+ for recvop in ('close', 'force-close'):
+ for sendop in ('close', 'force-close'):
+ for recv in interps:
+ for send in interps:
+ yield [
+ ChannelAction(recvop, 'recv', recv),
+ ChannelAction(sendop, 'send', send),
+ ]
+
+ def _iter_post_close_action_sets(self):
+ for interp in ('same', 'extra', 'other'):
+ yield [
+ ChannelAction('use', 'recv', interp),
+ ]
+ yield [
+ ChannelAction('use', 'send', interp),
+ ]
+
+ def run_actions(self, fix, actions):
+ for action in actions:
+ self.run_action(fix, action)
+
+ def run_action(self, fix, action, *, hideclosed=True):
+ end = action.resolve_end(fix.end)
+ interp = action.resolve_interp(fix.interp, fix.other, fix.extra)
+ fix.prep_interpreter(interp)
+ if interp.name == 'main':
+ result = run_action(
+ fix.cid,
+ action.action,
+ end,
+ fix.state,
+ hideclosed=hideclosed,
+ )
+ fix.record_action(action, result)
+ else:
+ _cid = interpreters.channel_create()
+ run_interp(interp.id, f"""
+ result = helpers.run_action(
+ {fix.cid},
+ {repr(action.action)},
+ {repr(end)},
+ {repr(fix.state)},
+ hideclosed={hideclosed},
+ )
+ interpreters.channel_send({_cid}, result.pending.to_bytes(1, 'little'))
+ interpreters.channel_send({_cid}, b'X' if result.closed else b'')
+ """)
+ result = ChannelState(
+ pending=int.from_bytes(interpreters.channel_recv(_cid), 'little'),
+ closed=bool(interpreters.channel_recv(_cid)),
+ )
+ fix.record_action(action, result)
+
+ def iter_fixtures(self):
+ # XXX threads?
+ interpreters = [
+ ('main', 'interp', 'extra'),
+ ('interp', 'main', 'extra'),
+ ('interp1', 'interp2', 'extra'),
+ ('interp1', 'interp2', 'main'),
+ ]
+ for interp, other, extra in interpreters:
+ for creator in ('same', 'other', 'creator'):
+ for end in ('send', 'recv'):
+ yield ChannelCloseFixture(end, interp, other, extra, creator)
+
+ def _close(self, fix, *, force):
+ op = 'force-close' if force else 'close'
+ close = ChannelAction(op, fix.end, 'same')
+ if not fix.expect_closed_error():
+ self.run_action(fix, close, hideclosed=False)
+ else:
+ with self.assertRaises(interpreters.ChannelClosedError):
+ self.run_action(fix, close, hideclosed=False)
+
+ def _assert_closed_in_interp(self, fix, interp=None):
+ if interp is None or interp.name == 'main':
+ with self.assertRaises(interpreters.ChannelClosedError):
+ interpreters.channel_recv(fix.cid)
+ with self.assertRaises(interpreters.ChannelClosedError):
+ interpreters.channel_send(fix.cid, b'spam')
+ with self.assertRaises(interpreters.ChannelClosedError):
+ interpreters.channel_close(fix.cid)
+ with self.assertRaises(interpreters.ChannelClosedError):
+ interpreters.channel_close(fix.cid, force=True)
+ else:
+ run_interp(interp.id, f"""
+ with helpers.expect_channel_closed():
+ interpreters.channel_recv(cid)
+ """)
+ run_interp(interp.id, f"""
+ with helpers.expect_channel_closed():
+ interpreters.channel_send(cid, b'spam')
+ """)
+ run_interp(interp.id, f"""
+ with helpers.expect_channel_closed():
+ interpreters.channel_close(cid)
+ """)
+ run_interp(interp.id, f"""
+ with helpers.expect_channel_closed():
+ interpreters.channel_close(cid, force=True)
+ """)
+
+ def _assert_closed(self, fix):
+ self.assertTrue(fix.state.closed)
+
+ for _ in range(fix.state.pending):
+ interpreters.channel_recv(fix.cid)
+ self._assert_closed_in_interp(fix)
+
+ for interp in ('same', 'other'):
+ interp = fix.get_interpreter(interp)
+ if interp.name == 'main':
+ continue
+ self._assert_closed_in_interp(fix, interp)
+
+ interp = fix.get_interpreter('fresh')
+ self._assert_closed_in_interp(fix, interp)
+
+ def _iter_close_tests(self, verbose=False):
+ i = 0
+ for actions in self.iter_action_sets():
+ print()
+ for fix in self.iter_fixtures():
+ i += 1
+ if i > 1000:
+ return
+ if verbose:
+ if (i - 1) % 6 == 0:
+ print()
+ print(i, fix, '({} actions)'.format(len(actions)))
+ else:
+ if (i - 1) % 6 == 0:
+ print(' ', end='')
+ print('.', end=''); sys.stdout.flush()
+ yield i, fix, actions
+ if verbose:
+ print('---')
+ print()
+
+ # This is useful for scanning through the possible tests.
+ def _skim_close_tests(self):
+ ChannelCloseFixture.QUICK = True
+ for i, fix, actions in self._iter_close_tests():
+ pass
+
+ def test_close(self):
+ for i, fix, actions in self._iter_close_tests():
+ with self.subTest('{} {} {}'.format(i, fix, actions)):
+ fix.prep_interpreter(fix.interp)
+ self.run_actions(fix, actions)
+
+ self._close(fix, force=False)
+
+ self._assert_closed(fix)
+ # XXX Things slow down if we have too many interpreters.
+ fix.clean_up()
+
+ def test_force_close(self):
+ for i, fix, actions in self._iter_close_tests():
+ with self.subTest('{} {} {}'.format(i, fix, actions)):
+ fix.prep_interpreter(fix.interp)
+ self.run_actions(fix, actions)
+
+ self._close(fix, force=True)
+
+ self._assert_closed(fix)
+ # XXX Things slow down if we have too many interpreters.
+ fix.clean_up()
if __name__ == '__main__':