summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_interpreters.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_interpreters.py')
-rw-r--r--Lib/test/test_interpreters.py1136
1 files changed, 0 insertions, 1136 deletions
diff --git a/Lib/test/test_interpreters.py b/Lib/test/test_interpreters.py
deleted file mode 100644
index 5663706..0000000
--- a/Lib/test/test_interpreters.py
+++ /dev/null
@@ -1,1136 +0,0 @@
-import contextlib
-import json
-import os
-import os.path
-import sys
-import threading
-from textwrap import dedent
-import unittest
-import time
-
-from test import support
-from test.support import import_helper
-from test.support import threading_helper
-from test.support import os_helper
-_interpreters = import_helper.import_module('_xxsubinterpreters')
-_channels = import_helper.import_module('_xxinterpchannels')
-from test.support import interpreters
-
-
-def _captured_script(script):
- r, w = os.pipe()
- indented = script.replace('\n', '\n ')
- wrapped = dedent(f"""
- import contextlib
- with open({w}, 'w', encoding='utf-8') as spipe:
- with contextlib.redirect_stdout(spipe):
- {indented}
- """)
- return wrapped, open(r, encoding='utf-8')
-
-
-def clean_up_interpreters():
- for interp in interpreters.list_all():
- if interp.id == 0: # main
- continue
- try:
- interp.close()
- except RuntimeError:
- pass # already destroyed
-
-
-def _run_output(interp, request, channels=None):
- script, rpipe = _captured_script(request)
- with rpipe:
- interp.run(script, channels=channels)
- return rpipe.read()
-
-
-@contextlib.contextmanager
-def _running(interp):
- r, w = os.pipe()
- def run():
- interp.run(dedent(f"""
- # wait for "signal"
- with open({r}) as rpipe:
- rpipe.read()
- """))
-
- t = threading.Thread(target=run)
- t.start()
-
- yield
-
- with open(w, 'w') as spipe:
- spipe.write('done')
- t.join()
-
-
-class TestBase(unittest.TestCase):
-
- def pipe(self):
- def ensure_closed(fd):
- try:
- os.close(fd)
- except OSError:
- pass
- r, w = os.pipe()
- self.addCleanup(lambda: ensure_closed(r))
- self.addCleanup(lambda: ensure_closed(w))
- return r, w
-
- def tearDown(self):
- clean_up_interpreters()
-
-
-class CreateTests(TestBase):
-
- def test_in_main(self):
- interp = interpreters.create()
- self.assertIsInstance(interp, interpreters.Interpreter)
- self.assertIn(interp, interpreters.list_all())
-
- def test_in_thread(self):
- lock = threading.Lock()
- interp = None
- def f():
- nonlocal interp
- interp = interpreters.create()
- lock.acquire()
- lock.release()
- t = threading.Thread(target=f)
- with lock:
- t.start()
- t.join()
- self.assertIn(interp, interpreters.list_all())
-
- def test_in_subinterpreter(self):
- main, = interpreters.list_all()
- interp = interpreters.create()
- out = _run_output(interp, dedent("""
- from test.support import interpreters
- interp = interpreters.create()
- print(interp.id)
- """))
- interp2 = interpreters.Interpreter(int(out))
- self.assertEqual(interpreters.list_all(), [main, interp, interp2])
-
- def test_after_destroy_all(self):
- before = set(interpreters.list_all())
- # Create 3 subinterpreters.
- interp_lst = []
- for _ in range(3):
- interps = interpreters.create()
- interp_lst.append(interps)
- # Now destroy them.
- for interp in interp_lst:
- interp.close()
- # Finally, create another.
- interp = interpreters.create()
- self.assertEqual(set(interpreters.list_all()), before | {interp})
-
- def test_after_destroy_some(self):
- before = set(interpreters.list_all())
- # Create 3 subinterpreters.
- interp1 = interpreters.create()
- interp2 = interpreters.create()
- interp3 = interpreters.create()
- # Now destroy 2 of them.
- interp1.close()
- interp2.close()
- # Finally, create another.
- interp = interpreters.create()
- self.assertEqual(set(interpreters.list_all()), before | {interp3, interp})
-
-
-class GetCurrentTests(TestBase):
-
- def test_main(self):
- main = interpreters.get_main()
- current = interpreters.get_current()
- self.assertEqual(current, main)
-
- def test_subinterpreter(self):
- main = _interpreters.get_main()
- interp = interpreters.create()
- out = _run_output(interp, dedent("""
- from test.support import interpreters
- cur = interpreters.get_current()
- print(cur.id)
- """))
- current = interpreters.Interpreter(int(out))
- self.assertNotEqual(current, main)
-
-
-class ListAllTests(TestBase):
-
- def test_initial(self):
- interps = interpreters.list_all()
- self.assertEqual(1, len(interps))
-
- def test_after_creating(self):
- main = interpreters.get_current()
- first = interpreters.create()
- second = interpreters.create()
-
- ids = []
- for interp in interpreters.list_all():
- ids.append(interp.id)
-
- self.assertEqual(ids, [main.id, first.id, second.id])
-
- def test_after_destroying(self):
- main = interpreters.get_current()
- first = interpreters.create()
- second = interpreters.create()
- first.close()
-
- ids = []
- for interp in interpreters.list_all():
- ids.append(interp.id)
-
- self.assertEqual(ids, [main.id, second.id])
-
-
-class TestInterpreterAttrs(TestBase):
-
- def test_id_type(self):
- main = interpreters.get_main()
- current = interpreters.get_current()
- interp = interpreters.create()
- self.assertIsInstance(main.id, _interpreters.InterpreterID)
- self.assertIsInstance(current.id, _interpreters.InterpreterID)
- self.assertIsInstance(interp.id, _interpreters.InterpreterID)
-
- def test_main_id(self):
- main = interpreters.get_main()
- self.assertEqual(main.id, 0)
-
- def test_custom_id(self):
- interp = interpreters.Interpreter(1)
- self.assertEqual(interp.id, 1)
-
- with self.assertRaises(TypeError):
- interpreters.Interpreter('1')
-
- def test_id_readonly(self):
- interp = interpreters.Interpreter(1)
- with self.assertRaises(AttributeError):
- interp.id = 2
-
- @unittest.skip('not ready yet (see bpo-32604)')
- def test_main_isolated(self):
- main = interpreters.get_main()
- self.assertFalse(main.isolated)
-
- @unittest.skip('not ready yet (see bpo-32604)')
- def test_subinterpreter_isolated_default(self):
- interp = interpreters.create()
- self.assertFalse(interp.isolated)
-
- def test_subinterpreter_isolated_explicit(self):
- interp1 = interpreters.create(isolated=True)
- interp2 = interpreters.create(isolated=False)
- self.assertTrue(interp1.isolated)
- self.assertFalse(interp2.isolated)
-
- @unittest.skip('not ready yet (see bpo-32604)')
- def test_custom_isolated_default(self):
- interp = interpreters.Interpreter(1)
- self.assertFalse(interp.isolated)
-
- def test_custom_isolated_explicit(self):
- interp1 = interpreters.Interpreter(1, isolated=True)
- interp2 = interpreters.Interpreter(1, isolated=False)
- self.assertTrue(interp1.isolated)
- self.assertFalse(interp2.isolated)
-
- def test_isolated_readonly(self):
- interp = interpreters.Interpreter(1)
- with self.assertRaises(AttributeError):
- interp.isolated = True
-
- def test_equality(self):
- interp1 = interpreters.create()
- interp2 = interpreters.create()
- self.assertEqual(interp1, interp1)
- self.assertNotEqual(interp1, interp2)
-
-
-class TestInterpreterIsRunning(TestBase):
-
- def test_main(self):
- main = interpreters.get_main()
- self.assertTrue(main.is_running())
-
- @unittest.skip('Fails on FreeBSD')
- def test_subinterpreter(self):
- interp = interpreters.create()
- self.assertFalse(interp.is_running())
-
- with _running(interp):
- self.assertTrue(interp.is_running())
- self.assertFalse(interp.is_running())
-
- def test_finished(self):
- r, w = self.pipe()
- interp = interpreters.create()
- interp.run(f"""if True:
- import os
- os.write({w}, b'x')
- """)
- self.assertFalse(interp.is_running())
- self.assertEqual(os.read(r, 1), b'x')
-
- def test_from_subinterpreter(self):
- interp = interpreters.create()
- out = _run_output(interp, dedent(f"""
- import _xxsubinterpreters as _interpreters
- if _interpreters.is_running({interp.id}):
- print(True)
- else:
- print(False)
- """))
- self.assertEqual(out.strip(), 'True')
-
- def test_already_destroyed(self):
- interp = interpreters.create()
- interp.close()
- with self.assertRaises(RuntimeError):
- interp.is_running()
-
- def test_does_not_exist(self):
- interp = interpreters.Interpreter(1_000_000)
- with self.assertRaises(RuntimeError):
- interp.is_running()
-
- def test_bad_id(self):
- interp = interpreters.Interpreter(-1)
- with self.assertRaises(ValueError):
- interp.is_running()
-
- def test_with_only_background_threads(self):
- r_interp, w_interp = self.pipe()
- r_thread, w_thread = self.pipe()
-
- DONE = b'D'
- FINISHED = b'F'
-
- interp = interpreters.create()
- interp.run(f"""if True:
- import os
- import threading
-
- def task():
- v = os.read({r_thread}, 1)
- assert v == {DONE!r}
- os.write({w_interp}, {FINISHED!r})
- t = threading.Thread(target=task)
- t.start()
- """)
- self.assertFalse(interp.is_running())
-
- os.write(w_thread, DONE)
- interp.run('t.join()')
- self.assertEqual(os.read(r_interp, 1), FINISHED)
-
-
-class TestInterpreterClose(TestBase):
-
- def test_basic(self):
- main = interpreters.get_main()
- interp1 = interpreters.create()
- interp2 = interpreters.create()
- interp3 = interpreters.create()
- self.assertEqual(set(interpreters.list_all()),
- {main, interp1, interp2, interp3})
- interp2.close()
- self.assertEqual(set(interpreters.list_all()),
- {main, interp1, interp3})
-
- def test_all(self):
- before = set(interpreters.list_all())
- interps = set()
- for _ in range(3):
- interp = interpreters.create()
- interps.add(interp)
- self.assertEqual(set(interpreters.list_all()), before | interps)
- for interp in interps:
- interp.close()
- self.assertEqual(set(interpreters.list_all()), before)
-
- def test_main(self):
- main, = interpreters.list_all()
- with self.assertRaises(RuntimeError):
- main.close()
-
- def f():
- with self.assertRaises(RuntimeError):
- main.close()
-
- t = threading.Thread(target=f)
- t.start()
- t.join()
-
- def test_already_destroyed(self):
- interp = interpreters.create()
- interp.close()
- with self.assertRaises(RuntimeError):
- interp.close()
-
- def test_does_not_exist(self):
- interp = interpreters.Interpreter(1_000_000)
- with self.assertRaises(RuntimeError):
- interp.close()
-
- def test_bad_id(self):
- interp = interpreters.Interpreter(-1)
- with self.assertRaises(ValueError):
- interp.close()
-
- def test_from_current(self):
- main, = interpreters.list_all()
- interp = interpreters.create()
- out = _run_output(interp, dedent(f"""
- from test.support import interpreters
- interp = interpreters.Interpreter({int(interp.id)})
- try:
- interp.close()
- except RuntimeError:
- print('failed')
- """))
- self.assertEqual(out.strip(), 'failed')
- self.assertEqual(set(interpreters.list_all()), {main, interp})
-
- def test_from_sibling(self):
- main, = interpreters.list_all()
- interp1 = interpreters.create()
- interp2 = interpreters.create()
- self.assertEqual(set(interpreters.list_all()),
- {main, interp1, interp2})
- interp1.run(dedent(f"""
- from test.support import interpreters
- interp2 = interpreters.Interpreter(int({interp2.id}))
- interp2.close()
- interp3 = interpreters.create()
- interp3.close()
- """))
- self.assertEqual(set(interpreters.list_all()), {main, interp1})
-
- def test_from_other_thread(self):
- interp = interpreters.create()
- def f():
- interp.close()
-
- t = threading.Thread(target=f)
- t.start()
- t.join()
-
- @unittest.skip('Fails on FreeBSD')
- def test_still_running(self):
- main, = interpreters.list_all()
- interp = interpreters.create()
- with _running(interp):
- with self.assertRaises(RuntimeError):
- interp.close()
- self.assertTrue(interp.is_running())
-
- def test_subthreads_still_running(self):
- r_interp, w_interp = self.pipe()
- r_thread, w_thread = self.pipe()
-
- FINISHED = b'F'
-
- interp = interpreters.create()
- interp.run(f"""if True:
- import os
- import threading
- import time
-
- done = False
-
- def notify_fini():
- global done
- done = True
- t.join()
- threading._register_atexit(notify_fini)
-
- def task():
- while not done:
- time.sleep(0.1)
- os.write({w_interp}, {FINISHED!r})
- t = threading.Thread(target=task)
- t.start()
- """)
- interp.close()
-
- self.assertEqual(os.read(r_interp, 1), FINISHED)
-
-
-class TestInterpreterRun(TestBase):
-
- def test_success(self):
- interp = interpreters.create()
- script, file = _captured_script('print("it worked!", end="")')
- with file:
- interp.run(script)
- out = file.read()
-
- self.assertEqual(out, 'it worked!')
-
- def test_failure(self):
- interp = interpreters.create()
- with self.assertRaises(interpreters.RunFailedError):
- interp.run('raise Exception')
-
- def test_in_thread(self):
- interp = interpreters.create()
- script, file = _captured_script('print("it worked!", end="")')
- with file:
- def f():
- interp.run(script)
-
- t = threading.Thread(target=f)
- t.start()
- t.join()
- out = file.read()
-
- self.assertEqual(out, 'it worked!')
-
- @support.requires_fork()
- def test_fork(self):
- interp = interpreters.create()
- import tempfile
- with tempfile.NamedTemporaryFile('w+', encoding='utf-8') as file:
- file.write('')
- file.flush()
-
- expected = 'spam spam spam spam spam'
- script = dedent(f"""
- import os
- try:
- os.fork()
- except RuntimeError:
- with open('{file.name}', 'w', encoding='utf-8') as out:
- out.write('{expected}')
- """)
- interp.run(script)
-
- file.seek(0)
- content = file.read()
- self.assertEqual(content, expected)
-
- @unittest.skip('Fails on FreeBSD')
- def test_already_running(self):
- interp = interpreters.create()
- with _running(interp):
- with self.assertRaises(RuntimeError):
- interp.run('print("spam")')
-
- def test_does_not_exist(self):
- interp = interpreters.Interpreter(1_000_000)
- with self.assertRaises(RuntimeError):
- interp.run('print("spam")')
-
- def test_bad_id(self):
- interp = interpreters.Interpreter(-1)
- with self.assertRaises(ValueError):
- interp.run('print("spam")')
-
- def test_bad_script(self):
- interp = interpreters.create()
- with self.assertRaises(TypeError):
- interp.run(10)
-
- def test_bytes_for_script(self):
- interp = interpreters.create()
- with self.assertRaises(TypeError):
- interp.run(b'print("spam")')
-
- def test_with_background_threads_still_running(self):
- r_interp, w_interp = self.pipe()
- r_thread, w_thread = self.pipe()
-
- RAN = b'R'
- DONE = b'D'
- FINISHED = b'F'
-
- interp = interpreters.create()
- interp.run(f"""if True:
- import os
- import threading
-
- def task():
- v = os.read({r_thread}, 1)
- assert v == {DONE!r}
- os.write({w_interp}, {FINISHED!r})
- t = threading.Thread(target=task)
- t.start()
- os.write({w_interp}, {RAN!r})
- """)
- interp.run(f"""if True:
- os.write({w_interp}, {RAN!r})
- """)
-
- os.write(w_thread, DONE)
- interp.run('t.join()')
- self.assertEqual(os.read(r_interp, 1), RAN)
- self.assertEqual(os.read(r_interp, 1), RAN)
- self.assertEqual(os.read(r_interp, 1), FINISHED)
-
- # test_xxsubinterpreters covers the remaining Interpreter.run() behavior.
-
-
-class StressTests(TestBase):
-
- # In these tests we generally want a lot of interpreters,
- # but not so many that any test takes too long.
-
- @support.requires_resource('cpu')
- def test_create_many_sequential(self):
- alive = []
- for _ in range(100):
- interp = interpreters.create()
- alive.append(interp)
-
- @support.requires_resource('cpu')
- def test_create_many_threaded(self):
- alive = []
- def task():
- interp = interpreters.create()
- alive.append(interp)
- threads = (threading.Thread(target=task) for _ in range(200))
- with threading_helper.start_threads(threads):
- pass
-
-
-class StartupTests(TestBase):
-
- # We want to ensure the initial state of subinterpreters
- # matches expectations.
-
- _subtest_count = 0
-
- @contextlib.contextmanager
- def subTest(self, *args):
- with super().subTest(*args) as ctx:
- self._subtest_count += 1
- try:
- yield ctx
- finally:
- if self._debugged_in_subtest:
- if self._subtest_count == 1:
- # The first subtest adds a leading newline, so we
- # compensate here by not printing a trailing newline.
- print('### end subtest debug ###', end='')
- else:
- print('### end subtest debug ###')
- self._debugged_in_subtest = False
-
- def debug(self, msg, *, header=None):
- if header:
- self._debug(f'--- {header} ---')
- if msg:
- if msg.endswith(os.linesep):
- self._debug(msg[:-len(os.linesep)])
- else:
- self._debug(msg)
- self._debug('<no newline>')
- self._debug('------')
- else:
- self._debug(msg)
-
- _debugged = False
- _debugged_in_subtest = False
- def _debug(self, msg):
- if not self._debugged:
- print()
- self._debugged = True
- if self._subtest is not None:
- if True:
- if not self._debugged_in_subtest:
- self._debugged_in_subtest = True
- print('### start subtest debug ###')
- print(msg)
- else:
- print(msg)
-
- def create_temp_dir(self):
- import tempfile
- tmp = tempfile.mkdtemp(prefix='test_interpreters_')
- tmp = os.path.realpath(tmp)
- self.addCleanup(os_helper.rmtree, tmp)
- return tmp
-
- def write_script(self, *path, text):
- filename = os.path.join(*path)
- dirname = os.path.dirname(filename)
- if dirname:
- os.makedirs(dirname, exist_ok=True)
- with open(filename, 'w', encoding='utf-8') as outfile:
- outfile.write(dedent(text))
- return filename
-
- @support.requires_subprocess()
- def run_python(self, argv, *, cwd=None):
- # This method is inspired by
- # EmbeddingTestsMixin.run_embedded_interpreter() in test_embed.py.
- import shlex
- import subprocess
- if isinstance(argv, str):
- argv = shlex.split(argv)
- argv = [sys.executable, *argv]
- try:
- proc = subprocess.run(
- argv,
- cwd=cwd,
- capture_output=True,
- text=True,
- )
- except Exception as exc:
- self.debug(f'# cmd: {shlex.join(argv)}')
- if isinstance(exc, FileNotFoundError) and not exc.filename:
- if os.path.exists(argv[0]):
- exists = 'exists'
- else:
- exists = 'does not exist'
- self.debug(f'{argv[0]} {exists}')
- raise # re-raise
- assert proc.stderr == '' or proc.returncode != 0, proc.stderr
- if proc.returncode != 0 and support.verbose:
- self.debug(f'# python3 {shlex.join(argv[1:])} failed:')
- self.debug(proc.stdout, header='stdout')
- self.debug(proc.stderr, header='stderr')
- self.assertEqual(proc.returncode, 0)
- self.assertEqual(proc.stderr, '')
- return proc.stdout
-
- def test_sys_path_0(self):
- # The main interpreter's sys.path[0] should be used by subinterpreters.
- script = '''
- import sys
- from test.support import interpreters
-
- orig = sys.path[0]
-
- interp = interpreters.create()
- interp.run(f"""if True:
- import json
- import sys
- print(json.dumps({{
- 'main': {orig!r},
- 'sub': sys.path[0],
- }}, indent=4), flush=True)
- """)
- '''
- # <tmp>/
- # pkg/
- # __init__.py
- # __main__.py
- # script.py
- # script.py
- cwd = self.create_temp_dir()
- self.write_script(cwd, 'pkg', '__init__.py', text='')
- self.write_script(cwd, 'pkg', '__main__.py', text=script)
- self.write_script(cwd, 'pkg', 'script.py', text=script)
- self.write_script(cwd, 'script.py', text=script)
-
- cases = [
- ('script.py', cwd),
- ('-m script', cwd),
- ('-m pkg', cwd),
- ('-m pkg.script', cwd),
- ('-c "import script"', ''),
- ]
- for argv, expected in cases:
- with self.subTest(f'python3 {argv}'):
- out = self.run_python(argv, cwd=cwd)
- data = json.loads(out)
- sp0_main, sp0_sub = data['main'], data['sub']
- self.assertEqual(sp0_sub, sp0_main)
- self.assertEqual(sp0_sub, expected)
- # XXX Also check them all with the -P cmdline flag?
-
-
-class FinalizationTests(TestBase):
-
- def test_gh_109793(self):
- import subprocess
- argv = [sys.executable, '-c', '''if True:
- import _xxsubinterpreters as _interpreters
- interpid = _interpreters.create()
- raise Exception
- ''']
- proc = subprocess.run(argv, capture_output=True, text=True)
- self.assertIn('Traceback', proc.stderr)
- if proc.returncode == 0 and support.verbose:
- print()
- print("--- cmd unexpected succeeded ---")
- print(f"stdout:\n{proc.stdout}")
- print(f"stderr:\n{proc.stderr}")
- print("------")
- self.assertEqual(proc.returncode, 1)
-
-
-class TestIsShareable(TestBase):
-
- def test_default_shareables(self):
- shareables = [
- # singletons
- None,
- # builtin objects
- b'spam',
- 'spam',
- 10,
- -10,
- True,
- False,
- 100.0,
- (),
- (1, ('spam', 'eggs'), True),
- ]
- for obj in shareables:
- with self.subTest(obj):
- shareable = interpreters.is_shareable(obj)
- self.assertTrue(shareable)
-
- def test_not_shareable(self):
- class Cheese:
- def __init__(self, name):
- self.name = name
- def __str__(self):
- return self.name
-
- class SubBytes(bytes):
- """A subclass of a shareable type."""
-
- not_shareables = [
- # singletons
- NotImplemented,
- ...,
- # builtin types and objects
- type,
- object,
- object(),
- Exception(),
- # user-defined types and objects
- Cheese,
- Cheese('Wensleydale'),
- SubBytes(b'spam'),
- ]
- for obj in not_shareables:
- with self.subTest(repr(obj)):
- self.assertFalse(
- interpreters.is_shareable(obj))
-
-
-class TestChannels(TestBase):
-
- def test_create(self):
- r, s = interpreters.create_channel()
- self.assertIsInstance(r, interpreters.RecvChannel)
- self.assertIsInstance(s, interpreters.SendChannel)
-
- def test_list_all(self):
- self.assertEqual(interpreters.list_all_channels(), [])
- created = set()
- for _ in range(3):
- ch = interpreters.create_channel()
- created.add(ch)
- after = set(interpreters.list_all_channels())
- self.assertEqual(after, created)
-
- def test_shareable(self):
- rch, sch = interpreters.create_channel()
-
- self.assertTrue(
- interpreters.is_shareable(rch))
- self.assertTrue(
- interpreters.is_shareable(sch))
-
- sch.send_nowait(rch)
- sch.send_nowait(sch)
- rch2 = rch.recv()
- sch2 = rch.recv()
-
- self.assertEqual(rch2, rch)
- self.assertEqual(sch2, sch)
-
- def test_is_closed(self):
- rch, sch = interpreters.create_channel()
- rbefore = rch.is_closed
- sbefore = sch.is_closed
- rch.close()
- rafter = rch.is_closed
- safter = sch.is_closed
-
- self.assertFalse(rbefore)
- self.assertFalse(sbefore)
- self.assertTrue(rafter)
- self.assertTrue(safter)
-
-
-class TestRecvChannelAttrs(TestBase):
-
- def test_id_type(self):
- rch, _ = interpreters.create_channel()
- self.assertIsInstance(rch.id, _channels.ChannelID)
-
- def test_custom_id(self):
- rch = interpreters.RecvChannel(1)
- self.assertEqual(rch.id, 1)
-
- with self.assertRaises(TypeError):
- interpreters.RecvChannel('1')
-
- def test_id_readonly(self):
- rch = interpreters.RecvChannel(1)
- with self.assertRaises(AttributeError):
- rch.id = 2
-
- def test_equality(self):
- ch1, _ = interpreters.create_channel()
- ch2, _ = interpreters.create_channel()
- self.assertEqual(ch1, ch1)
- self.assertNotEqual(ch1, ch2)
-
-
-class TestSendChannelAttrs(TestBase):
-
- def test_id_type(self):
- _, sch = interpreters.create_channel()
- self.assertIsInstance(sch.id, _channels.ChannelID)
-
- def test_custom_id(self):
- sch = interpreters.SendChannel(1)
- self.assertEqual(sch.id, 1)
-
- with self.assertRaises(TypeError):
- interpreters.SendChannel('1')
-
- def test_id_readonly(self):
- sch = interpreters.SendChannel(1)
- with self.assertRaises(AttributeError):
- sch.id = 2
-
- def test_equality(self):
- _, ch1 = interpreters.create_channel()
- _, ch2 = interpreters.create_channel()
- self.assertEqual(ch1, ch1)
- self.assertNotEqual(ch1, ch2)
-
-
-class TestSendRecv(TestBase):
-
- def test_send_recv_main(self):
- r, s = interpreters.create_channel()
- orig = b'spam'
- s.send_nowait(orig)
- obj = r.recv()
-
- self.assertEqual(obj, orig)
- self.assertIsNot(obj, orig)
-
- def test_send_recv_same_interpreter(self):
- interp = interpreters.create()
- interp.run(dedent("""
- from test.support import interpreters
- r, s = interpreters.create_channel()
- orig = b'spam'
- s.send_nowait(orig)
- obj = r.recv()
- assert obj == orig, 'expected: obj == orig'
- assert obj is not orig, 'expected: obj is not orig'
- """))
-
- @unittest.skip('broken (see BPO-...)')
- def test_send_recv_different_interpreters(self):
- r1, s1 = interpreters.create_channel()
- r2, s2 = interpreters.create_channel()
- orig1 = b'spam'
- s1.send_nowait(orig1)
- out = _run_output(
- interpreters.create(),
- dedent(f"""
- obj1 = r.recv()
- assert obj1 == b'spam', 'expected: obj1 == orig1'
- # When going to another interpreter we get a copy.
- assert id(obj1) != {id(orig1)}, 'expected: obj1 is not orig1'
- orig2 = b'eggs'
- print(id(orig2))
- s.send_nowait(orig2)
- """),
- channels=dict(r=r1, s=s2),
- )
- obj2 = r2.recv()
-
- self.assertEqual(obj2, b'eggs')
- self.assertNotEqual(id(obj2), int(out))
-
- def test_send_recv_different_threads(self):
- r, s = interpreters.create_channel()
-
- def f():
- while True:
- try:
- obj = r.recv()
- break
- except interpreters.ChannelEmptyError:
- time.sleep(0.1)
- s.send(obj)
- t = threading.Thread(target=f)
- t.start()
-
- orig = b'spam'
- s.send(orig)
- obj = r.recv()
- t.join()
-
- self.assertEqual(obj, orig)
- self.assertIsNot(obj, orig)
-
- def test_send_recv_nowait_main(self):
- r, s = interpreters.create_channel()
- orig = b'spam'
- s.send_nowait(orig)
- obj = r.recv_nowait()
-
- self.assertEqual(obj, orig)
- self.assertIsNot(obj, orig)
-
- def test_send_recv_nowait_main_with_default(self):
- r, _ = interpreters.create_channel()
- obj = r.recv_nowait(None)
-
- self.assertIsNone(obj)
-
- def test_send_recv_nowait_same_interpreter(self):
- interp = interpreters.create()
- interp.run(dedent("""
- from test.support import interpreters
- r, s = interpreters.create_channel()
- orig = b'spam'
- s.send_nowait(orig)
- obj = r.recv_nowait()
- assert obj == orig, 'expected: obj == orig'
- # When going back to the same interpreter we get the same object.
- assert obj is not orig, 'expected: obj is not orig'
- """))
-
- @unittest.skip('broken (see BPO-...)')
- def test_send_recv_nowait_different_interpreters(self):
- r1, s1 = interpreters.create_channel()
- r2, s2 = interpreters.create_channel()
- orig1 = b'spam'
- s1.send_nowait(orig1)
- out = _run_output(
- interpreters.create(),
- dedent(f"""
- obj1 = r.recv_nowait()
- assert obj1 == b'spam', 'expected: obj1 == orig1'
- # When going to another interpreter we get a copy.
- assert id(obj1) != {id(orig1)}, 'expected: obj1 is not orig1'
- orig2 = b'eggs'
- print(id(orig2))
- s.send_nowait(orig2)
- """),
- channels=dict(r=r1, s=s2),
- )
- obj2 = r2.recv_nowait()
-
- self.assertEqual(obj2, b'eggs')
- self.assertNotEqual(id(obj2), int(out))
-
- def test_recv_timeout(self):
- r, _ = interpreters.create_channel()
- with self.assertRaises(TimeoutError):
- r.recv(timeout=1)
-
- def test_recv_channel_does_not_exist(self):
- ch = interpreters.RecvChannel(1_000_000)
- with self.assertRaises(interpreters.ChannelNotFoundError):
- ch.recv()
-
- def test_send_channel_does_not_exist(self):
- ch = interpreters.SendChannel(1_000_000)
- with self.assertRaises(interpreters.ChannelNotFoundError):
- ch.send(b'spam')
-
- def test_recv_nowait_channel_does_not_exist(self):
- ch = interpreters.RecvChannel(1_000_000)
- with self.assertRaises(interpreters.ChannelNotFoundError):
- ch.recv_nowait()
-
- def test_send_nowait_channel_does_not_exist(self):
- ch = interpreters.SendChannel(1_000_000)
- with self.assertRaises(interpreters.ChannelNotFoundError):
- ch.send_nowait(b'spam')
-
- def test_recv_nowait_empty(self):
- ch, _ = interpreters.create_channel()
- with self.assertRaises(interpreters.ChannelEmptyError):
- ch.recv_nowait()
-
- def test_recv_nowait_default(self):
- default = object()
- rch, sch = interpreters.create_channel()
- obj1 = rch.recv_nowait(default)
- sch.send_nowait(None)
- sch.send_nowait(1)
- sch.send_nowait(b'spam')
- sch.send_nowait(b'eggs')
- obj2 = rch.recv_nowait(default)
- obj3 = rch.recv_nowait(default)
- obj4 = rch.recv_nowait()
- obj5 = rch.recv_nowait(default)
- obj6 = rch.recv_nowait(default)
-
- self.assertIs(obj1, default)
- self.assertIs(obj2, None)
- self.assertEqual(obj3, 1)
- self.assertEqual(obj4, b'spam')
- self.assertEqual(obj5, b'eggs')
- self.assertIs(obj6, default)
-
- def test_send_buffer(self):
- buf = bytearray(b'spamspamspam')
- obj = None
- rch, sch = interpreters.create_channel()
-
- def f():
- nonlocal obj
- while True:
- try:
- obj = rch.recv()
- break
- except interpreters.ChannelEmptyError:
- time.sleep(0.1)
- t = threading.Thread(target=f)
- t.start()
-
- sch.send_buffer(buf)
- t.join()
-
- self.assertIsNot(obj, buf)
- self.assertIsInstance(obj, memoryview)
- self.assertEqual(obj, buf)
-
- buf[4:8] = b'eggs'
- self.assertEqual(obj, buf)
- obj[4:8] = b'ham.'
- self.assertEqual(obj, buf)
-
- def test_send_buffer_nowait(self):
- buf = bytearray(b'spamspamspam')
- rch, sch = interpreters.create_channel()
- sch.send_buffer_nowait(buf)
- obj = rch.recv()
-
- self.assertIsNot(obj, buf)
- self.assertIsInstance(obj, memoryview)
- self.assertEqual(obj, buf)
-
- buf[4:8] = b'eggs'
- self.assertEqual(obj, buf)
- obj[4:8] = b'ham.'
- self.assertEqual(obj, buf)