diff options
author | Eric Snow <ericsnowcurrently@gmail.com> | 2023-10-17 22:32:00 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-10-17 22:32:00 (GMT) |
commit | a53d7cb6729dc3f254b70afcf19eaf71a2eed540 (patch) | |
tree | 679f68637995e4bfefc98f0ec40edac6150693cd /Lib | |
parent | e37620edfd77b78b913b5eab55cd91327c3e7fd3 (diff) | |
download | cpython-a53d7cb6729dc3f254b70afcf19eaf71a2eed540.zip cpython-a53d7cb6729dc3f254b70afcf19eaf71a2eed540.tar.gz cpython-a53d7cb6729dc3f254b70afcf19eaf71a2eed540.tar.bz2 |
gh-84570: Send-Wait Fixes for _xxinterpchannels (gh-111006)
There were a few things I did in gh-110565 that need to be fixed. I also forgot to add tests in that PR.
(Note that this PR exposes a refleak introduced by gh-110246. I'll take care of that separately.)
Diffstat (limited to 'Lib')
-rw-r--r-- | Lib/test/test__xxinterpchannels.py | 217 |
1 files changed, 173 insertions, 44 deletions
diff --git a/Lib/test/test__xxinterpchannels.py b/Lib/test/test__xxinterpchannels.py index ff01a33..90a1224 100644 --- a/Lib/test/test__xxinterpchannels.py +++ b/Lib/test/test__xxinterpchannels.py @@ -564,7 +564,62 @@ class ChannelTests(TestBase): with self.assertRaises(channels.ChannelClosedError): channels.list_interpreters(cid, send=False) - #################### + def test_allowed_types(self): + cid = channels.create() + objects = [ + None, + 'spam', + b'spam', + 42, + ] + for obj in objects: + with self.subTest(obj): + channels.send(cid, obj, blocking=False) + got = channels.recv(cid) + + self.assertEqual(got, obj) + self.assertIs(type(got), type(obj)) + # XXX Check the following? + #self.assertIsNot(got, obj) + # XXX What about between interpreters? + + def test_run_string_arg_unresolved(self): + cid = channels.create() + interp = interpreters.create() + + out = _run_output(interp, dedent(""" + import _xxinterpchannels as _channels + print(cid.end) + _channels.send(cid, b'spam', blocking=False) + """), + dict(cid=cid.send)) + obj = channels.recv(cid) + + self.assertEqual(obj, b'spam') + self.assertEqual(out.strip(), 'send') + + # XXX For now there is no high-level channel into which the + # sent channel ID can be converted... + # Note: this test caused crashes on some buildbots (bpo-33615). + @unittest.skip('disabled until high-level channels exist') + def test_run_string_arg_resolved(self): + cid = channels.create() + cid = channels._channel_id(cid, _resolve=True) + interp = interpreters.create() + + out = _run_output(interp, dedent(""" + import _xxinterpchannels as _channels + print(chan.id.end) + _channels.send(chan.id, b'spam', blocking=False) + """), + dict(chan=cid.send)) + obj = channels.recv(cid) + + self.assertEqual(obj, b'spam') + self.assertEqual(out.strip(), 'send') + + #------------------- + # send/recv def test_send_recv_main(self): cid = channels.create() @@ -705,6 +760,9 @@ class ChannelTests(TestBase): channels.recv(cid2) del cid2 + #------------------- + # send_buffer + def test_send_buffer(self): buf = bytearray(b'spamspamspam') cid = channels.create() @@ -720,60 +778,131 @@ class ChannelTests(TestBase): obj[4:8] = b'ham.' self.assertEqual(obj, buf) - def test_allowed_types(self): + #------------------- + # send with waiting + + def build_send_waiter(self, obj, *, buffer=False): + # We want a long enough sleep that send() actually has to wait. + + if buffer: + send = channels.send_buffer + else: + send = channels.send + cid = channels.create() - objects = [ - None, - 'spam', - b'spam', - 42, - ] - for obj in objects: - with self.subTest(obj): - channels.send(cid, obj, blocking=False) - got = channels.recv(cid) + try: + started = time.monotonic() + send(cid, obj, blocking=False) + stopped = time.monotonic() + channels.recv(cid) + finally: + channels.destroy(cid) + delay = stopped - started # seconds + delay *= 3 - self.assertEqual(got, obj) - self.assertIs(type(got), type(obj)) - # XXX Check the following? - #self.assertIsNot(got, obj) - # XXX What about between interpreters? + def wait(): + time.sleep(delay) + return wait - def test_run_string_arg_unresolved(self): + def test_send_blocking_waiting(self): + received = None + obj = b'spam' + wait = self.build_send_waiter(obj) cid = channels.create() - interp = interpreters.create() + def f(): + nonlocal received + wait() + received = recv_wait(cid) + t = threading.Thread(target=f) + t.start() + channels.send(cid, obj, blocking=True) + t.join() - out = _run_output(interp, dedent(""" - import _xxinterpchannels as _channels - print(cid.end) - _channels.send(cid, b'spam', blocking=False) - """), - dict(cid=cid.send)) - obj = channels.recv(cid) + self.assertEqual(received, obj) - self.assertEqual(obj, b'spam') - self.assertEqual(out.strip(), 'send') + def test_send_buffer_blocking_waiting(self): + received = None + obj = bytearray(b'spam') + wait = self.build_send_waiter(obj, buffer=True) + cid = channels.create() + def f(): + nonlocal received + wait() + received = recv_wait(cid) + t = threading.Thread(target=f) + t.start() + channels.send_buffer(cid, obj, blocking=True) + t.join() - # XXX For now there is no high-level channel into which the - # sent channel ID can be converted... - # Note: this test caused crashes on some buildbots (bpo-33615). - @unittest.skip('disabled until high-level channels exist') - def test_run_string_arg_resolved(self): + self.assertEqual(received, obj) + + def test_send_blocking_no_wait(self): + received = None + obj = b'spam' cid = channels.create() - cid = channels._channel_id(cid, _resolve=True) - interp = interpreters.create() + def f(): + nonlocal received + received = recv_wait(cid) + t = threading.Thread(target=f) + t.start() + channels.send(cid, obj, blocking=True) + t.join() - out = _run_output(interp, dedent(""" - import _xxinterpchannels as _channels - print(chan.id.end) - _channels.send(chan.id, b'spam', blocking=False) - """), - dict(chan=cid.send)) - obj = channels.recv(cid) + self.assertEqual(received, obj) - self.assertEqual(obj, b'spam') - self.assertEqual(out.strip(), 'send') + def test_send_buffer_blocking_no_wait(self): + received = None + obj = bytearray(b'spam') + cid = channels.create() + def f(): + nonlocal received + received = recv_wait(cid) + t = threading.Thread(target=f) + t.start() + channels.send_buffer(cid, obj, blocking=True) + t.join() + + self.assertEqual(received, obj) + + def test_send_closed_while_waiting(self): + obj = b'spam' + wait = self.build_send_waiter(obj) + cid = channels.create() + def f(): + wait() + channels.close(cid, force=True) + t = threading.Thread(target=f) + t.start() + with self.assertRaises(channels.ChannelClosedError): + channels.send(cid, obj, blocking=True) + t.join() + + def test_send_buffer_closed_while_waiting(self): + try: + self._has_run_once + except AttributeError: + # At the moment, this test leaks a few references. + # It looks like the leak originates with the addition + # of _channels.send_buffer() (gh-110246), whereas the + # tests were added afterward. We want this test even + # if the refleak isn't fixed yet, so we skip here. + raise unittest.SkipTest('temporarily skipped due to refleaks') + else: + self._has_run_once = True + + obj = bytearray(b'spam') + wait = self.build_send_waiter(obj, buffer=True) + cid = channels.create() + def f(): + wait() + channels.close(cid, force=True) + t = threading.Thread(target=f) + t.start() + with self.assertRaises(channels.ChannelClosedError): + channels.send_buffer(cid, obj, blocking=True) + t.join() + #------------------- # close def test_close_single_user(self): |