diff options
Diffstat (limited to 'Lib/test/test__xxsubinterpreters.py')
-rw-r--r-- | Lib/test/test__xxsubinterpreters.py | 1194 |
1 files changed, 995 insertions, 199 deletions
diff --git a/Lib/test/test__xxsubinterpreters.py b/Lib/test/test__xxsubinterpreters.py index 4ef7771..118f2e4 100644 --- a/Lib/test/test__xxsubinterpreters.py +++ b/Lib/test/test__xxsubinterpreters.py @@ -1,6 +1,9 @@ +from collections import namedtuple import contextlib +import itertools import os import pickle +import sys from textwrap import dedent, indent import threading import time @@ -12,23 +15,32 @@ from test.support import script_helper interpreters = support.import_module('_xxsubinterpreters') +################################## +# helpers + +def powerset(*sets): + return itertools.chain.from_iterable( + combinations(sets, r) + for r in range(len(sets)+1)) + + def _captured_script(script): r, w = os.pipe() indented = script.replace('\n', '\n ') wrapped = dedent(f""" import contextlib - with open({w}, 'w') as chan: - with contextlib.redirect_stdout(chan): + with open({w}, 'w') as spipe: + with contextlib.redirect_stdout(spipe): {indented} """) return wrapped, open(r) def _run_output(interp, request, shared=None): - script, chan = _captured_script(request) - with chan: + script, rpipe = _captured_script(request) + with rpipe: interpreters.run_string(interp, script, shared) - return chan.read() + return rpipe.read() @contextlib.contextmanager @@ -37,8 +49,8 @@ def _running(interp): def run(): interpreters.run_string(interp, dedent(f""" # wait for "signal" - with open({r}) as chan: - chan.read() + with open({r}) as rpipe: + rpipe.read() """)) t = threading.Thread(target=run) @@ -46,11 +58,248 @@ def _running(interp): yield - with open(w, 'w') as chan: - chan.write('done') + with open(w, 'w') as spipe: + spipe.write('done') t.join() +#@contextmanager +#def run_threaded(id, source, **shared): +# def run(): +# run_interp(id, source, **shared) +# t = threading.Thread(target=run) +# t.start() +# yield +# t.join() + + +def run_interp(id, source, **shared): + _run_interp(id, source, shared) + + +def _run_interp(id, source, shared, _mainns={}): + source = dedent(source) + main = interpreters.get_main() + if main == id: + if interpreters.get_current() != main: + raise RuntimeError + # XXX Run a func? + exec(source, _mainns) + else: + interpreters.run_string(id, source, shared) + + +def run_interp_threaded(id, source, **shared): + def run(): + _run(id, source, shared) + t = threading.Thread(target=run) + t.start() + t.join() + + +class Interpreter(namedtuple('Interpreter', 'name id')): + + @classmethod + def from_raw(cls, raw): + if isinstance(raw, cls): + return raw + elif isinstance(raw, str): + return cls(raw) + else: + raise NotImplementedError + + def __new__(cls, name=None, id=None): + main = interpreters.get_main() + if id == main: + if not name: + name = 'main' + elif name != 'main': + raise ValueError( + 'name mismatch (expected "main", got "{}")'.format(name)) + id = main + elif id is not None: + if not name: + name = 'interp' + elif name == 'main': + raise ValueError('name mismatch (unexpected "main")') + if not isinstance(id, interpreters.InterpreterID): + id = interpreters.InterpreterID(id) + elif not name or name == 'main': + name = 'main' + id = main + else: + id = interpreters.create() + self = super().__new__(cls, name, id) + return self + + +# XXX expect_channel_closed() is unnecessary once we improve exc propagation. + +@contextlib.contextmanager +def expect_channel_closed(): + try: + yield + except interpreters.ChannelClosedError: + pass + else: + assert False, 'channel not closed' + + +class ChannelAction(namedtuple('ChannelAction', 'action end interp')): + + def __new__(cls, action, end=None, interp=None): + if not end: + end = 'both' + if not interp: + interp = 'main' + self = super().__new__(cls, action, end, interp) + return self + + def __init__(self, *args, **kwargs): + if self.action == 'use': + if self.end not in ('same', 'opposite', 'send', 'recv'): + raise ValueError(self.end) + elif self.action in ('close', 'force-close'): + if self.end not in ('both', 'same', 'opposite', 'send', 'recv'): + raise ValueError(self.end) + else: + raise ValueError(self.action) + if self.interp not in ('main', 'same', 'other', 'extra'): + raise ValueError(self.interp) + + def resolve_end(self, end): + if self.end == 'same': + return end + elif self.end == 'opposite': + return 'recv' if end == 'send' else 'send' + else: + return self.end + + def resolve_interp(self, interp, other, extra): + if self.interp == 'same': + return interp + elif self.interp == 'other': + if other is None: + raise RuntimeError + return other + elif self.interp == 'extra': + if extra is None: + raise RuntimeError + return extra + elif self.interp == 'main': + if interp.name == 'main': + return interp + elif other and other.name == 'main': + return other + else: + raise RuntimeError + # Per __init__(), there aren't any others. + + +class ChannelState(namedtuple('ChannelState', 'pending closed')): + + def __new__(cls, pending=0, *, closed=False): + self = super().__new__(cls, pending, closed) + return self + + def incr(self): + return type(self)(self.pending + 1, closed=self.closed) + + def decr(self): + return type(self)(self.pending - 1, closed=self.closed) + + def close(self, *, force=True): + if self.closed: + if not force or self.pending == 0: + return self + return type(self)(0 if force else self.pending, closed=True) + + +def run_action(cid, action, end, state, *, hideclosed=True): + if state.closed: + if action == 'use' and end == 'recv' and state.pending: + expectfail = False + else: + expectfail = True + else: + expectfail = False + + try: + result = _run_action(cid, action, end, state) + except interpreters.ChannelClosedError: + if not hideclosed and not expectfail: + raise + result = state.close() + else: + if expectfail: + raise ... # XXX + return result + + +def _run_action(cid, action, end, state): + if action == 'use': + if end == 'send': + interpreters.channel_send(cid, b'spam') + return state.incr() + elif end == 'recv': + if not state.pending: + try: + interpreters.channel_recv(cid) + except interpreters.ChannelEmptyError: + return state + else: + raise Exception('expected ChannelEmptyError') + else: + interpreters.channel_recv(cid) + return state.decr() + else: + raise ValueError(end) + elif action == 'close': + kwargs = {} + if end in ('recv', 'send'): + kwargs[end] = True + interpreters.channel_close(cid, **kwargs) + return state.close() + elif action == 'force-close': + kwargs = { + 'force': True, + } + if end in ('recv', 'send'): + kwargs[end] = True + interpreters.channel_close(cid, **kwargs) + return state.close(force=True) + else: + raise ValueError(action) + + +def clean_up_interpreters(): + for id in interpreters.list_all(): + if id == 0: # main + continue + try: + interpreters.destroy(id) + except RuntimeError: + pass # already destroyed + + +def clean_up_channels(): + for cid in interpreters.channel_list_all(): + try: + interpreters.channel_destroy(cid) + except interpreters.ChannelNotFoundError: + pass # already destroyed + + +class TestBase(unittest.TestCase): + + def tearDown(self): + clean_up_interpreters() + clean_up_channels() + + +################################## +# misc. tests + class IsShareableTests(unittest.TestCase): def test_default_shareables(self): @@ -59,6 +308,9 @@ class IsShareableTests(unittest.TestCase): None, # builtin objects b'spam', + 'spam', + 10, + -10, ] for obj in shareables: with self.subTest(obj): @@ -86,37 +338,65 @@ class IsShareableTests(unittest.TestCase): object, object(), Exception(), - 42, 100.0, - 'spam', # user-defined types and objects Cheese, Cheese('Wensleydale'), SubBytes(b'spam'), ] for obj in not_shareables: - with self.subTest(obj): + with self.subTest(repr(obj)): self.assertFalse( interpreters.is_shareable(obj)) -class TestBase(unittest.TestCase): +class ShareableTypeTests(unittest.TestCase): + + def setUp(self): + super().setUp() + self.cid = interpreters.channel_create() def tearDown(self): - for id in interpreters.list_all(): - if id == 0: # main - continue - try: - interpreters.destroy(id) - except RuntimeError: - pass # already destroyed + interpreters.channel_destroy(self.cid) + super().tearDown() - for cid in interpreters.channel_list_all(): - try: - interpreters.channel_destroy(cid) - except interpreters.ChannelNotFoundError: - pass # already destroyed + def _assert_values(self, values): + for obj in values: + with self.subTest(obj): + interpreters.channel_send(self.cid, obj) + got = interpreters.channel_recv(self.cid) + + self.assertEqual(got, obj) + self.assertIs(type(got), type(obj)) + # XXX Check the following in the channel tests? + #self.assertIsNot(got, obj) + + def test_singletons(self): + for obj in [None]: + with self.subTest(obj): + interpreters.channel_send(self.cid, obj) + got = interpreters.channel_recv(self.cid) + + # XXX What about between interpreters? + self.assertIs(got, obj) + + def test_types(self): + self._assert_values([ + b'spam', + 9999, + self.cid, + ]) + + def test_bytes(self): + self._assert_values(i.to_bytes(2, 'little', signed=True) + for i in range(-1, 258)) + def test_int(self): + self._assert_values(range(-1, 258)) + + +################################## +# interpreter tests class ListAllTests(TestBase): @@ -147,13 +427,16 @@ class GetCurrentTests(TestBase): main = interpreters.get_main() cur = interpreters.get_current() self.assertEqual(cur, main) + self.assertIsInstance(cur, interpreters.InterpreterID) def test_subinterpreter(self): main = interpreters.get_main() interp = interpreters.create() out = _run_output(interp, dedent(""" import _xxsubinterpreters as _interpreters - print(int(_interpreters.get_current())) + cur = _interpreters.get_current() + print(cur) + assert isinstance(cur, _interpreters.InterpreterID) """)) cur = int(out.strip()) _, expected = interpreters.list_all() @@ -167,13 +450,16 @@ class GetMainTests(TestBase): [expected] = interpreters.list_all() main = interpreters.get_main() self.assertEqual(main, expected) + self.assertIsInstance(main, interpreters.InterpreterID) def test_from_subinterpreter(self): [expected] = interpreters.list_all() interp = interpreters.create() out = _run_output(interp, dedent(""" import _xxsubinterpreters as _interpreters - print(int(_interpreters.get_main())) + main = _interpreters.get_main() + print(main) + assert isinstance(main, _interpreters.InterpreterID) """)) main = int(out.strip()) self.assertEqual(main, expected) @@ -197,7 +483,7 @@ class IsRunningTests(TestBase): interp = interpreters.create() out = _run_output(interp, dedent(f""" import _xxsubinterpreters as _interpreters - if _interpreters.is_running({int(interp)}): + if _interpreters.is_running({interp}): print(True) else: print(False) @@ -257,6 +543,10 @@ class InterpreterIDTests(TestBase): with self.assertRaises(RuntimeError): interpreters.InterpreterID(int(id) + 1) # unforced + def test_str(self): + id = interpreters.InterpreterID(10, force=True) + self.assertEqual(str(id), '10') + def test_repr(self): id = interpreters.InterpreterID(10, force=True) self.assertEqual(repr(id), 'InterpreterID(10)') @@ -280,6 +570,7 @@ class CreateTests(TestBase): def test_in_main(self): id = interpreters.create() + self.assertIsInstance(id, interpreters.InterpreterID) self.assertIn(id, interpreters.list_all()) @@ -314,7 +605,8 @@ class CreateTests(TestBase): out = _run_output(id1, dedent(""" import _xxsubinterpreters as _interpreters id = _interpreters.create() - print(int(id)) + print(id) + assert isinstance(id, _interpreters.InterpreterID) """)) id2 = int(out.strip()) @@ -329,7 +621,7 @@ class CreateTests(TestBase): out = _run_output(id1, dedent(""" import _xxsubinterpreters as _interpreters id = _interpreters.create() - print(int(id)) + print(id) """)) id2 = int(out.strip()) @@ -423,7 +715,7 @@ class DestroyTests(TestBase): script = dedent(f""" import _xxsubinterpreters as _interpreters try: - _interpreters.destroy({int(id)}) + _interpreters.destroy({id}) except RuntimeError: pass """) @@ -437,7 +729,7 @@ class DestroyTests(TestBase): id2 = interpreters.create() script = dedent(f""" import _xxsubinterpreters as _interpreters - _interpreters.destroy({int(id2)}) + _interpreters.destroy({id2}) """) interpreters.run_string(id1, script) @@ -783,6 +1075,9 @@ class RunStringTests(TestBase): self.assertEqual(retcode, 0) +################################## +# channel tests + class ChannelIDTests(TestBase): def test_default_kwargs(self): @@ -842,6 +1137,10 @@ class ChannelIDTests(TestBase): with self.assertRaises(interpreters.ChannelNotFoundError): interpreters._channel_id(int(cid) + 1) # unforced + def test_str(self): + cid = interpreters._channel_id(10, force=True) + self.assertEqual(str(cid), '10') + def test_repr(self): cid = interpreters._channel_id(10, force=True) self.assertEqual(repr(cid), 'ChannelID(10)') @@ -872,6 +1171,10 @@ class ChannelIDTests(TestBase): class ChannelTests(TestBase): + def test_create_cid(self): + cid = interpreters.channel_create() + self.assertIsInstance(cid, interpreters.ChannelID) + def test_sequential_ids(self): before = interpreters.channel_list_all() id1 = interpreters.channel_create() @@ -888,7 +1191,7 @@ class ChannelTests(TestBase): out = _run_output(id1, dedent(""" import _xxsubinterpreters as _interpreters cid = _interpreters.channel_create() - print(int(cid)) + print(cid) """)) cid1 = int(out.strip()) @@ -896,7 +1199,7 @@ class ChannelTests(TestBase): out = _run_output(id2, dedent(""" import _xxsubinterpreters as _interpreters cid = _interpreters.channel_create() - print(int(cid)) + print(cid) """)) cid2 = int(out.strip()) @@ -904,127 +1207,133 @@ class ChannelTests(TestBase): #################### - def test_drop_single_user(self): + def test_send_recv_main(self): cid = interpreters.channel_create() - interpreters.channel_send(cid, b'spam') - interpreters.channel_recv(cid) - interpreters.channel_drop_interpreter(cid, send=True, recv=True) + orig = b'spam' + interpreters.channel_send(cid, orig) + obj = interpreters.channel_recv(cid) - with self.assertRaises(interpreters.ChannelClosedError): - interpreters.channel_send(cid, b'eggs') - with self.assertRaises(interpreters.ChannelClosedError): - interpreters.channel_recv(cid) + self.assertEqual(obj, orig) + self.assertIsNot(obj, orig) - def test_drop_multiple_users(self): - cid = interpreters.channel_create() + def test_send_recv_same_interpreter(self): id1 = interpreters.create() - id2 = interpreters.create() - interpreters.run_string(id1, dedent(f""" + out = _run_output(id1, dedent(""" import _xxsubinterpreters as _interpreters - _interpreters.channel_send({int(cid)}, b'spam') + cid = _interpreters.channel_create() + orig = b'spam' + _interpreters.channel_send(cid, orig) + obj = _interpreters.channel_recv(cid) + assert obj is not orig + assert obj == orig """)) - out = _run_output(id2, dedent(f""" + + def test_send_recv_different_interpreters(self): + cid = interpreters.channel_create() + id1 = interpreters.create() + out = _run_output(id1, dedent(f""" import _xxsubinterpreters as _interpreters - obj = _interpreters.channel_recv({int(cid)}) - _interpreters.channel_drop_interpreter({int(cid)}) - print(repr(obj)) - """)) - interpreters.run_string(id1, dedent(f""" - _interpreters.channel_drop_interpreter({int(cid)}) + _interpreters.channel_send({cid}, b'spam') """)) + obj = interpreters.channel_recv(cid) - self.assertEqual(out.strip(), "b'spam'") + self.assertEqual(obj, b'spam') - def test_drop_no_kwargs(self): + def test_send_recv_different_threads(self): cid = interpreters.channel_create() - interpreters.channel_send(cid, b'spam') - interpreters.channel_recv(cid) - interpreters.channel_drop_interpreter(cid) - with self.assertRaises(interpreters.ChannelClosedError): - interpreters.channel_send(cid, b'eggs') - with self.assertRaises(interpreters.ChannelClosedError): - interpreters.channel_recv(cid) + def f(): + while True: + try: + obj = interpreters.channel_recv(cid) + break + except interpreters.ChannelEmptyError: + time.sleep(0.1) + interpreters.channel_send(cid, obj) + t = threading.Thread(target=f) + t.start() - def test_drop_multiple_times(self): - cid = interpreters.channel_create() interpreters.channel_send(cid, b'spam') - interpreters.channel_recv(cid) - interpreters.channel_drop_interpreter(cid, send=True, recv=True) + t.join() + obj = interpreters.channel_recv(cid) - with self.assertRaises(interpreters.ChannelClosedError): - interpreters.channel_drop_interpreter(cid, send=True, recv=True) + self.assertEqual(obj, b'spam') - def test_drop_with_unused_items(self): + def test_send_recv_different_interpreters_and_threads(self): cid = interpreters.channel_create() + id1 = interpreters.create() + out = None + + def f(): + nonlocal out + out = _run_output(id1, dedent(f""" + import time + import _xxsubinterpreters as _interpreters + while True: + try: + obj = _interpreters.channel_recv({cid}) + break + except _interpreters.ChannelEmptyError: + time.sleep(0.1) + assert(obj == b'spam') + _interpreters.channel_send({cid}, b'eggs') + """)) + t = threading.Thread(target=f) + t.start() + interpreters.channel_send(cid, b'spam') - interpreters.channel_send(cid, b'ham') - interpreters.channel_drop_interpreter(cid, send=True, recv=True) + t.join() + obj = interpreters.channel_recv(cid) - with self.assertRaises(interpreters.ChannelClosedError): - interpreters.channel_recv(cid) + self.assertEqual(obj, b'eggs') - def test_drop_never_used(self): - cid = interpreters.channel_create() - interpreters.channel_drop_interpreter(cid) + def test_send_not_found(self): + with self.assertRaises(interpreters.ChannelNotFoundError): + interpreters.channel_send(10, b'spam') - with self.assertRaises(interpreters.ChannelClosedError): - interpreters.channel_send(cid, b'spam') - with self.assertRaises(interpreters.ChannelClosedError): + def test_recv_not_found(self): + with self.assertRaises(interpreters.ChannelNotFoundError): + interpreters.channel_recv(10) + + def test_recv_empty(self): + cid = interpreters.channel_create() + with self.assertRaises(interpreters.ChannelEmptyError): interpreters.channel_recv(cid) - def test_drop_by_unassociated_interp(self): + def test_run_string_arg_unresolved(self): cid = interpreters.channel_create() - interpreters.channel_send(cid, b'spam') interp = interpreters.create() - interpreters.run_string(interp, dedent(f""" + + out = _run_output(interp, dedent(""" import _xxsubinterpreters as _interpreters - _interpreters.channel_drop_interpreter({int(cid)}) - """)) + print(cid.end) + _interpreters.channel_send(cid, b'spam') + """), + dict(cid=cid.send)) obj = interpreters.channel_recv(cid) - interpreters.channel_drop_interpreter(cid) - with self.assertRaises(interpreters.ChannelClosedError): - interpreters.channel_send(cid, b'eggs') self.assertEqual(obj, b'spam') + self.assertEqual(out.strip(), 'send') - def test_drop_close_if_unassociated(self): + def test_run_string_arg_resolved(self): cid = interpreters.channel_create() + cid = interpreters._channel_id(cid, _resolve=True) interp = interpreters.create() - interpreters.run_string(interp, dedent(f""" - import _xxsubinterpreters as _interpreters - obj = _interpreters.channel_send({int(cid)}, b'spam') - _interpreters.channel_drop_interpreter({int(cid)}) - """)) - - with self.assertRaises(interpreters.ChannelClosedError): - interpreters.channel_recv(cid) - def test_drop_partially(self): - # XXX Is partial close too weird/confusing? - cid = interpreters.channel_create() - interpreters.channel_send(cid, None) - interpreters.channel_recv(cid) - interpreters.channel_send(cid, b'spam') - interpreters.channel_drop_interpreter(cid, send=True) + out = _run_output(interp, dedent(""" + import _xxsubinterpreters as _interpreters + print(chan.end) + _interpreters.channel_send(chan, b'spam') + #print(chan.id.end) + #_interpreters.channel_send(chan.id, b'spam') + """), + dict(chan=cid.send)) obj = interpreters.channel_recv(cid) self.assertEqual(obj, b'spam') + self.assertEqual(out.strip(), 'send') - def test_drop_used_multiple_times_by_single_user(self): - cid = interpreters.channel_create() - interpreters.channel_send(cid, b'spam') - interpreters.channel_send(cid, b'spam') - interpreters.channel_send(cid, b'spam') - interpreters.channel_recv(cid) - interpreters.channel_drop_interpreter(cid, send=True, recv=True) - - with self.assertRaises(interpreters.ChannelClosedError): - interpreters.channel_send(cid, b'eggs') - with self.assertRaises(interpreters.ChannelClosedError): - interpreters.channel_recv(cid) - - #################### + # close def test_close_single_user(self): cid = interpreters.channel_create() @@ -1043,21 +1352,21 @@ class ChannelTests(TestBase): id2 = interpreters.create() interpreters.run_string(id1, dedent(f""" import _xxsubinterpreters as _interpreters - _interpreters.channel_send({int(cid)}, b'spam') + _interpreters.channel_send({cid}, b'spam') """)) interpreters.run_string(id2, dedent(f""" import _xxsubinterpreters as _interpreters - _interpreters.channel_recv({int(cid)}) + _interpreters.channel_recv({cid}) """)) interpreters.channel_close(cid) with self.assertRaises(interpreters.RunFailedError) as cm: interpreters.run_string(id1, dedent(f""" - _interpreters.channel_send({int(cid)}, b'spam') + _interpreters.channel_send({cid}, b'spam') """)) self.assertIn('ChannelClosedError', str(cm.exception)) with self.assertRaises(interpreters.RunFailedError) as cm: interpreters.run_string(id2, dedent(f""" - _interpreters.channel_send({int(cid)}, b'spam') + _interpreters.channel_send({cid}, b'spam') """)) self.assertIn('ChannelClosedError', str(cm.exception)) @@ -1094,7 +1403,7 @@ class ChannelTests(TestBase): interp = interpreters.create() interpreters.run_string(interp, dedent(f""" import _xxsubinterpreters as _interpreters - _interpreters.channel_close({int(cid)}) + _interpreters.channel_close({cid}) """)) with self.assertRaises(interpreters.ChannelClosedError): interpreters.channel_recv(cid) @@ -1114,115 +1423,602 @@ class ChannelTests(TestBase): with self.assertRaises(interpreters.ChannelClosedError): interpreters.channel_recv(cid) - #################### - def test_send_recv_main(self): +class ChannelReleaseTests(TestBase): + + # XXX Add more test coverage a la the tests for close(). + + """ + - main / interp / other + - run in: current thread / new thread / other thread / different threads + - end / opposite + - force / no force + - used / not used (associated / not associated) + - empty / emptied / never emptied / partly emptied + - closed / not closed + - released / not released + - creator (interp) / other + - associated interpreter not running + - associated interpreter destroyed + """ + + """ + use + pre-release + release + after + check + """ + + """ + release in: main, interp1 + creator: same, other (incl. interp2) + + use: None,send,recv,send/recv in None,same,other(incl. interp2),same+other(incl. interp2),all + pre-release: None,send,recv,both in None,same,other(incl. interp2),same+other(incl. interp2),all + pre-release forced: None,send,recv,both in None,same,other(incl. interp2),same+other(incl. interp2),all + + release: same + release forced: same + + use after: None,send,recv,send/recv in None,same,other(incl. interp2),same+other(incl. interp2),all + release after: None,send,recv,send/recv in None,same,other(incl. interp2),same+other(incl. interp2),all + check released: send/recv for same/other(incl. interp2) + check closed: send/recv for same/other(incl. interp2) + """ + + def test_single_user(self): cid = interpreters.channel_create() - orig = b'spam' - interpreters.channel_send(cid, orig) - obj = interpreters.channel_recv(cid) + interpreters.channel_send(cid, b'spam') + interpreters.channel_recv(cid) + interpreters.channel_release(cid, send=True, recv=True) - self.assertEqual(obj, orig) - self.assertIsNot(obj, orig) + with self.assertRaises(interpreters.ChannelClosedError): + interpreters.channel_send(cid, b'eggs') + with self.assertRaises(interpreters.ChannelClosedError): + interpreters.channel_recv(cid) - def test_send_recv_same_interpreter(self): + def test_multiple_users(self): + cid = interpreters.channel_create() id1 = interpreters.create() - out = _run_output(id1, dedent(""" + id2 = interpreters.create() + interpreters.run_string(id1, dedent(f""" import _xxsubinterpreters as _interpreters - cid = _interpreters.channel_create() - orig = b'spam' - _interpreters.channel_send(cid, orig) - obj = _interpreters.channel_recv(cid) - assert obj is not orig - assert obj == orig + _interpreters.channel_send({cid}, b'spam') + """)) + out = _run_output(id2, dedent(f""" + import _xxsubinterpreters as _interpreters + obj = _interpreters.channel_recv({cid}) + _interpreters.channel_release({cid}) + print(repr(obj)) + """)) + interpreters.run_string(id1, dedent(f""" + _interpreters.channel_release({cid}) """)) - def test_send_recv_different_interpreters(self): + self.assertEqual(out.strip(), "b'spam'") + + def test_no_kwargs(self): cid = interpreters.channel_create() - id1 = interpreters.create() - out = _run_output(id1, dedent(f""" + interpreters.channel_send(cid, b'spam') + interpreters.channel_recv(cid) + interpreters.channel_release(cid) + + with self.assertRaises(interpreters.ChannelClosedError): + interpreters.channel_send(cid, b'eggs') + with self.assertRaises(interpreters.ChannelClosedError): + interpreters.channel_recv(cid) + + def test_multiple_times(self): + cid = interpreters.channel_create() + interpreters.channel_send(cid, b'spam') + interpreters.channel_recv(cid) + interpreters.channel_release(cid, send=True, recv=True) + + with self.assertRaises(interpreters.ChannelClosedError): + interpreters.channel_release(cid, send=True, recv=True) + + def test_with_unused_items(self): + cid = interpreters.channel_create() + interpreters.channel_send(cid, b'spam') + interpreters.channel_send(cid, b'ham') + interpreters.channel_release(cid, send=True, recv=True) + + with self.assertRaises(interpreters.ChannelClosedError): + interpreters.channel_recv(cid) + + def test_never_used(self): + cid = interpreters.channel_create() + interpreters.channel_release(cid) + + with self.assertRaises(interpreters.ChannelClosedError): + interpreters.channel_send(cid, b'spam') + with self.assertRaises(interpreters.ChannelClosedError): + interpreters.channel_recv(cid) + + def test_by_unassociated_interp(self): + cid = interpreters.channel_create() + interpreters.channel_send(cid, b'spam') + interp = interpreters.create() + interpreters.run_string(interp, dedent(f""" import _xxsubinterpreters as _interpreters - _interpreters.channel_send({int(cid)}, b'spam') + _interpreters.channel_release({cid}) """)) obj = interpreters.channel_recv(cid) + interpreters.channel_release(cid) + with self.assertRaises(interpreters.ChannelClosedError): + interpreters.channel_send(cid, b'eggs') self.assertEqual(obj, b'spam') - def test_send_recv_different_threads(self): + def test_close_if_unassociated(self): + # XXX Something's not right with this test... cid = interpreters.channel_create() + interp = interpreters.create() + interpreters.run_string(interp, dedent(f""" + import _xxsubinterpreters as _interpreters + obj = _interpreters.channel_send({cid}, b'spam') + _interpreters.channel_release({cid}) + """)) - def f(): - while True: - try: - obj = interpreters.channel_recv(cid) - break - except interpreters.ChannelEmptyError: - time.sleep(0.1) - interpreters.channel_send(cid, obj) - t = threading.Thread(target=f) - t.start() + with self.assertRaises(interpreters.ChannelClosedError): + interpreters.channel_recv(cid) + def test_partially(self): + # XXX Is partial close too weird/confusing? + cid = interpreters.channel_create() + interpreters.channel_send(cid, None) + interpreters.channel_recv(cid) interpreters.channel_send(cid, b'spam') - t.join() + interpreters.channel_release(cid, send=True) obj = interpreters.channel_recv(cid) self.assertEqual(obj, b'spam') - def test_send_recv_different_interpreters_and_threads(self): + def test_used_multiple_times_by_single_user(self): cid = interpreters.channel_create() - id1 = interpreters.create() - out = None + interpreters.channel_send(cid, b'spam') + interpreters.channel_send(cid, b'spam') + interpreters.channel_send(cid, b'spam') + interpreters.channel_recv(cid) + interpreters.channel_release(cid, send=True, recv=True) - def f(): - nonlocal out - out = _run_output(id1, dedent(f""" - import time - import _xxsubinterpreters as _interpreters - while True: - try: - obj = _interpreters.channel_recv({int(cid)}) - break - except _interpreters.ChannelEmptyError: - time.sleep(0.1) - assert(obj == b'spam') - _interpreters.channel_send({int(cid)}, b'eggs') - """)) - t = threading.Thread(target=f) - t.start() + with self.assertRaises(interpreters.ChannelClosedError): + interpreters.channel_send(cid, b'eggs') + with self.assertRaises(interpreters.ChannelClosedError): + interpreters.channel_recv(cid) - interpreters.channel_send(cid, b'spam') - t.join() - obj = interpreters.channel_recv(cid) - self.assertEqual(obj, b'eggs') +class ChannelCloseFixture(namedtuple('ChannelCloseFixture', + 'end interp other extra creator')): - def test_send_not_found(self): - with self.assertRaises(interpreters.ChannelNotFoundError): - interpreters.channel_send(10, b'spam') + # Set this to True to avoid creating interpreters, e.g. when + # scanning through test permutations without running them. + QUICK = False - def test_recv_not_found(self): - with self.assertRaises(interpreters.ChannelNotFoundError): - interpreters.channel_recv(10) + def __new__(cls, end, interp, other, extra, creator): + assert end in ('send', 'recv') + if cls.QUICK: + known = {} + else: + interp = Interpreter.from_raw(interp) + other = Interpreter.from_raw(other) + extra = Interpreter.from_raw(extra) + known = { + interp.name: interp, + other.name: other, + extra.name: extra, + } + if not creator: + creator = 'same' + self = super().__new__(cls, end, interp, other, extra, creator) + self._prepped = set() + self._state = ChannelState() + self._known = known + return self - def test_recv_empty(self): - cid = interpreters.channel_create() - with self.assertRaises(interpreters.ChannelEmptyError): - interpreters.channel_recv(cid) + @property + def state(self): + return self._state - def test_run_string_arg(self): - cid = interpreters.channel_create() - interp = interpreters.create() + @property + def cid(self): + try: + return self._cid + except AttributeError: + creator = self._get_interpreter(self.creator) + self._cid = self._new_channel(creator) + return self._cid + + def get_interpreter(self, interp): + interp = self._get_interpreter(interp) + self._prep_interpreter(interp) + return interp + + def expect_closed_error(self, end=None): + if end is None: + end = self.end + if end == 'recv' and self.state.closed == 'send': + return False + return bool(self.state.closed) + + def prep_interpreter(self, interp): + self._prep_interpreter(interp) + + def record_action(self, action, result): + self._state = result + + def clean_up(self): + clean_up_interpreters() + clean_up_channels() + + # internal methods + + def _new_channel(self, creator): + if creator.name == 'main': + return interpreters.channel_create() + else: + ch = interpreters.channel_create() + run_interp(creator.id, f""" + import _xxsubinterpreters + cid = _xxsubinterpreters.channel_create() + # We purposefully send back an int to avoid tying the + # channel to the other interpreter. + _xxsubinterpreters.channel_send({ch}, int(cid)) + del _xxsubinterpreters + """) + self._cid = interpreters.channel_recv(ch) + return self._cid + + def _get_interpreter(self, interp): + if interp in ('same', 'interp'): + return self.interp + elif interp == 'other': + return self.other + elif interp == 'extra': + return self.extra + else: + name = interp + try: + interp = self._known[name] + except KeyError: + interp = self._known[name] = Interpreter(name) + return interp + + def _prep_interpreter(self, interp): + if interp.id in self._prepped: + return + self._prepped.add(interp.id) + if interp.name == 'main': + return + run_interp(interp.id, f""" + import _xxsubinterpreters as interpreters + import test.test__xxsubinterpreters as helpers + ChannelState = helpers.ChannelState + try: + cid + except NameError: + cid = interpreters._channel_id({self.cid}) + """) - out = _run_output(interp, dedent(""" - import _xxsubinterpreters as _interpreters - print(cid.end) - _interpreters.channel_send(cid, b'spam') - """), - dict(cid=cid.send)) - obj = interpreters.channel_recv(cid) - self.assertEqual(obj, b'spam') - self.assertEqual(out.strip(), 'send') +@unittest.skip('these tests take several hours to run') +class ExhaustiveChannelTests(TestBase): + + """ + - main / interp / other + - run in: current thread / new thread / other thread / different threads + - end / opposite + - force / no force + - used / not used (associated / not associated) + - empty / emptied / never emptied / partly emptied + - closed / not closed + - released / not released + - creator (interp) / other + - associated interpreter not running + - associated interpreter destroyed + + - close after unbound + """ + + """ + use + pre-close + close + after + check + """ + + """ + close in: main, interp1 + creator: same, other, extra + + use: None,send,recv,send/recv in None,same,other,same+other,all + pre-close: None,send,recv in None,same,other,same+other,all + pre-close forced: None,send,recv in None,same,other,same+other,all + + close: same + close forced: same + + use after: None,send,recv,send/recv in None,same,other,extra,same+other,all + close after: None,send,recv,send/recv in None,same,other,extra,same+other,all + check closed: send/recv for same/other(incl. interp2) + """ + + def iter_action_sets(self): + # - used / not used (associated / not associated) + # - empty / emptied / never emptied / partly emptied + # - closed / not closed + # - released / not released + + # never used + yield [] + + # only pre-closed (and possible used after) + for closeactions in self._iter_close_action_sets('same', 'other'): + yield closeactions + for postactions in self._iter_post_close_action_sets(): + yield closeactions + postactions + for closeactions in self._iter_close_action_sets('other', 'extra'): + yield closeactions + for postactions in self._iter_post_close_action_sets(): + yield closeactions + postactions + + # used + for useactions in self._iter_use_action_sets('same', 'other'): + yield useactions + for closeactions in self._iter_close_action_sets('same', 'other'): + actions = useactions + closeactions + yield actions + for postactions in self._iter_post_close_action_sets(): + yield actions + postactions + for closeactions in self._iter_close_action_sets('other', 'extra'): + actions = useactions + closeactions + yield actions + for postactions in self._iter_post_close_action_sets(): + yield actions + postactions + for useactions in self._iter_use_action_sets('other', 'extra'): + yield useactions + for closeactions in self._iter_close_action_sets('same', 'other'): + actions = useactions + closeactions + yield actions + for postactions in self._iter_post_close_action_sets(): + yield actions + postactions + for closeactions in self._iter_close_action_sets('other', 'extra'): + actions = useactions + closeactions + yield actions + for postactions in self._iter_post_close_action_sets(): + yield actions + postactions + + def _iter_use_action_sets(self, interp1, interp2): + interps = (interp1, interp2) + + # only recv end used + yield [ + ChannelAction('use', 'recv', interp1), + ] + yield [ + ChannelAction('use', 'recv', interp2), + ] + yield [ + ChannelAction('use', 'recv', interp1), + ChannelAction('use', 'recv', interp2), + ] + + # never emptied + yield [ + ChannelAction('use', 'send', interp1), + ] + yield [ + ChannelAction('use', 'send', interp2), + ] + yield [ + ChannelAction('use', 'send', interp1), + ChannelAction('use', 'send', interp2), + ] + + # partially emptied + for interp1 in interps: + for interp2 in interps: + for interp3 in interps: + yield [ + ChannelAction('use', 'send', interp1), + ChannelAction('use', 'send', interp2), + ChannelAction('use', 'recv', interp3), + ] + + # fully emptied + for interp1 in interps: + for interp2 in interps: + for interp3 in interps: + for interp4 in interps: + yield [ + ChannelAction('use', 'send', interp1), + ChannelAction('use', 'send', interp2), + ChannelAction('use', 'recv', interp3), + ChannelAction('use', 'recv', interp4), + ] + + def _iter_close_action_sets(self, interp1, interp2): + ends = ('recv', 'send') + interps = (interp1, interp2) + for force in (True, False): + op = 'force-close' if force else 'close' + for interp in interps: + for end in ends: + yield [ + ChannelAction(op, end, interp), + ] + for recvop in ('close', 'force-close'): + for sendop in ('close', 'force-close'): + for recv in interps: + for send in interps: + yield [ + ChannelAction(recvop, 'recv', recv), + ChannelAction(sendop, 'send', send), + ] + + def _iter_post_close_action_sets(self): + for interp in ('same', 'extra', 'other'): + yield [ + ChannelAction('use', 'recv', interp), + ] + yield [ + ChannelAction('use', 'send', interp), + ] + + def run_actions(self, fix, actions): + for action in actions: + self.run_action(fix, action) + + def run_action(self, fix, action, *, hideclosed=True): + end = action.resolve_end(fix.end) + interp = action.resolve_interp(fix.interp, fix.other, fix.extra) + fix.prep_interpreter(interp) + if interp.name == 'main': + result = run_action( + fix.cid, + action.action, + end, + fix.state, + hideclosed=hideclosed, + ) + fix.record_action(action, result) + else: + _cid = interpreters.channel_create() + run_interp(interp.id, f""" + result = helpers.run_action( + {fix.cid}, + {repr(action.action)}, + {repr(end)}, + {repr(fix.state)}, + hideclosed={hideclosed}, + ) + interpreters.channel_send({_cid}, result.pending.to_bytes(1, 'little')) + interpreters.channel_send({_cid}, b'X' if result.closed else b'') + """) + result = ChannelState( + pending=int.from_bytes(interpreters.channel_recv(_cid), 'little'), + closed=bool(interpreters.channel_recv(_cid)), + ) + fix.record_action(action, result) + + def iter_fixtures(self): + # XXX threads? + interpreters = [ + ('main', 'interp', 'extra'), + ('interp', 'main', 'extra'), + ('interp1', 'interp2', 'extra'), + ('interp1', 'interp2', 'main'), + ] + for interp, other, extra in interpreters: + for creator in ('same', 'other', 'creator'): + for end in ('send', 'recv'): + yield ChannelCloseFixture(end, interp, other, extra, creator) + + def _close(self, fix, *, force): + op = 'force-close' if force else 'close' + close = ChannelAction(op, fix.end, 'same') + if not fix.expect_closed_error(): + self.run_action(fix, close, hideclosed=False) + else: + with self.assertRaises(interpreters.ChannelClosedError): + self.run_action(fix, close, hideclosed=False) + + def _assert_closed_in_interp(self, fix, interp=None): + if interp is None or interp.name == 'main': + with self.assertRaises(interpreters.ChannelClosedError): + interpreters.channel_recv(fix.cid) + with self.assertRaises(interpreters.ChannelClosedError): + interpreters.channel_send(fix.cid, b'spam') + with self.assertRaises(interpreters.ChannelClosedError): + interpreters.channel_close(fix.cid) + with self.assertRaises(interpreters.ChannelClosedError): + interpreters.channel_close(fix.cid, force=True) + else: + run_interp(interp.id, f""" + with helpers.expect_channel_closed(): + interpreters.channel_recv(cid) + """) + run_interp(interp.id, f""" + with helpers.expect_channel_closed(): + interpreters.channel_send(cid, b'spam') + """) + run_interp(interp.id, f""" + with helpers.expect_channel_closed(): + interpreters.channel_close(cid) + """) + run_interp(interp.id, f""" + with helpers.expect_channel_closed(): + interpreters.channel_close(cid, force=True) + """) + + def _assert_closed(self, fix): + self.assertTrue(fix.state.closed) + + for _ in range(fix.state.pending): + interpreters.channel_recv(fix.cid) + self._assert_closed_in_interp(fix) + + for interp in ('same', 'other'): + interp = fix.get_interpreter(interp) + if interp.name == 'main': + continue + self._assert_closed_in_interp(fix, interp) + + interp = fix.get_interpreter('fresh') + self._assert_closed_in_interp(fix, interp) + + def _iter_close_tests(self, verbose=False): + i = 0 + for actions in self.iter_action_sets(): + print() + for fix in self.iter_fixtures(): + i += 1 + if i > 1000: + return + if verbose: + if (i - 1) % 6 == 0: + print() + print(i, fix, '({} actions)'.format(len(actions))) + else: + if (i - 1) % 6 == 0: + print(' ', end='') + print('.', end=''); sys.stdout.flush() + yield i, fix, actions + if verbose: + print('---') + print() + + # This is useful for scanning through the possible tests. + def _skim_close_tests(self): + ChannelCloseFixture.QUICK = True + for i, fix, actions in self._iter_close_tests(): + pass + + def test_close(self): + for i, fix, actions in self._iter_close_tests(): + with self.subTest('{} {} {}'.format(i, fix, actions)): + fix.prep_interpreter(fix.interp) + self.run_actions(fix, actions) + + self._close(fix, force=False) + + self._assert_closed(fix) + # XXX Things slow down if we have too many interpreters. + fix.clean_up() + + def test_force_close(self): + for i, fix, actions in self._iter_close_tests(): + with self.subTest('{} {} {}'.format(i, fix, actions)): + fix.prep_interpreter(fix.interp) + self.run_actions(fix, actions) + + self._close(fix, force=True) + + self._assert_closed(fix) + # XXX Things slow down if we have too many interpreters. + fix.clean_up() if __name__ == '__main__': |