summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
authorEric Snow <ericsnowcurrently@gmail.com>2023-10-17 22:32:00 (GMT)
committerGitHub <noreply@github.com>2023-10-17 22:32:00 (GMT)
commita53d7cb6729dc3f254b70afcf19eaf71a2eed540 (patch)
tree679f68637995e4bfefc98f0ec40edac6150693cd /Lib
parente37620edfd77b78b913b5eab55cd91327c3e7fd3 (diff)
downloadcpython-a53d7cb6729dc3f254b70afcf19eaf71a2eed540.zip
cpython-a53d7cb6729dc3f254b70afcf19eaf71a2eed540.tar.gz
cpython-a53d7cb6729dc3f254b70afcf19eaf71a2eed540.tar.bz2
gh-84570: Send-Wait Fixes for _xxinterpchannels (gh-111006)
There were a few things I did in gh-110565 that need to be fixed. I also forgot to add tests in that PR. (Note that this PR exposes a refleak introduced by gh-110246. I'll take care of that separately.)
Diffstat (limited to 'Lib')
-rw-r--r--Lib/test/test__xxinterpchannels.py217
1 files changed, 173 insertions, 44 deletions
diff --git a/Lib/test/test__xxinterpchannels.py b/Lib/test/test__xxinterpchannels.py
index ff01a33..90a1224 100644
--- a/Lib/test/test__xxinterpchannels.py
+++ b/Lib/test/test__xxinterpchannels.py
@@ -564,7 +564,62 @@ class ChannelTests(TestBase):
with self.assertRaises(channels.ChannelClosedError):
channels.list_interpreters(cid, send=False)
- ####################
+ def test_allowed_types(self):
+ cid = channels.create()
+ objects = [
+ None,
+ 'spam',
+ b'spam',
+ 42,
+ ]
+ for obj in objects:
+ with self.subTest(obj):
+ channels.send(cid, obj, blocking=False)
+ got = channels.recv(cid)
+
+ self.assertEqual(got, obj)
+ self.assertIs(type(got), type(obj))
+ # XXX Check the following?
+ #self.assertIsNot(got, obj)
+ # XXX What about between interpreters?
+
+ def test_run_string_arg_unresolved(self):
+ cid = channels.create()
+ interp = interpreters.create()
+
+ out = _run_output(interp, dedent("""
+ import _xxinterpchannels as _channels
+ print(cid.end)
+ _channels.send(cid, b'spam', blocking=False)
+ """),
+ dict(cid=cid.send))
+ obj = channels.recv(cid)
+
+ self.assertEqual(obj, b'spam')
+ self.assertEqual(out.strip(), 'send')
+
+ # XXX For now there is no high-level channel into which the
+ # sent channel ID can be converted...
+ # Note: this test caused crashes on some buildbots (bpo-33615).
+ @unittest.skip('disabled until high-level channels exist')
+ def test_run_string_arg_resolved(self):
+ cid = channels.create()
+ cid = channels._channel_id(cid, _resolve=True)
+ interp = interpreters.create()
+
+ out = _run_output(interp, dedent("""
+ import _xxinterpchannels as _channels
+ print(chan.id.end)
+ _channels.send(chan.id, b'spam', blocking=False)
+ """),
+ dict(chan=cid.send))
+ obj = channels.recv(cid)
+
+ self.assertEqual(obj, b'spam')
+ self.assertEqual(out.strip(), 'send')
+
+ #-------------------
+ # send/recv
def test_send_recv_main(self):
cid = channels.create()
@@ -705,6 +760,9 @@ class ChannelTests(TestBase):
channels.recv(cid2)
del cid2
+ #-------------------
+ # send_buffer
+
def test_send_buffer(self):
buf = bytearray(b'spamspamspam')
cid = channels.create()
@@ -720,60 +778,131 @@ class ChannelTests(TestBase):
obj[4:8] = b'ham.'
self.assertEqual(obj, buf)
- def test_allowed_types(self):
+ #-------------------
+ # send with waiting
+
+ def build_send_waiter(self, obj, *, buffer=False):
+ # We want a long enough sleep that send() actually has to wait.
+
+ if buffer:
+ send = channels.send_buffer
+ else:
+ send = channels.send
+
cid = channels.create()
- objects = [
- None,
- 'spam',
- b'spam',
- 42,
- ]
- for obj in objects:
- with self.subTest(obj):
- channels.send(cid, obj, blocking=False)
- got = channels.recv(cid)
+ try:
+ started = time.monotonic()
+ send(cid, obj, blocking=False)
+ stopped = time.monotonic()
+ channels.recv(cid)
+ finally:
+ channels.destroy(cid)
+ delay = stopped - started # seconds
+ delay *= 3
- self.assertEqual(got, obj)
- self.assertIs(type(got), type(obj))
- # XXX Check the following?
- #self.assertIsNot(got, obj)
- # XXX What about between interpreters?
+ def wait():
+ time.sleep(delay)
+ return wait
- def test_run_string_arg_unresolved(self):
+ def test_send_blocking_waiting(self):
+ received = None
+ obj = b'spam'
+ wait = self.build_send_waiter(obj)
cid = channels.create()
- interp = interpreters.create()
+ def f():
+ nonlocal received
+ wait()
+ received = recv_wait(cid)
+ t = threading.Thread(target=f)
+ t.start()
+ channels.send(cid, obj, blocking=True)
+ t.join()
- out = _run_output(interp, dedent("""
- import _xxinterpchannels as _channels
- print(cid.end)
- _channels.send(cid, b'spam', blocking=False)
- """),
- dict(cid=cid.send))
- obj = channels.recv(cid)
+ self.assertEqual(received, obj)
- self.assertEqual(obj, b'spam')
- self.assertEqual(out.strip(), 'send')
+ def test_send_buffer_blocking_waiting(self):
+ received = None
+ obj = bytearray(b'spam')
+ wait = self.build_send_waiter(obj, buffer=True)
+ cid = channels.create()
+ def f():
+ nonlocal received
+ wait()
+ received = recv_wait(cid)
+ t = threading.Thread(target=f)
+ t.start()
+ channels.send_buffer(cid, obj, blocking=True)
+ t.join()
- # XXX For now there is no high-level channel into which the
- # sent channel ID can be converted...
- # Note: this test caused crashes on some buildbots (bpo-33615).
- @unittest.skip('disabled until high-level channels exist')
- def test_run_string_arg_resolved(self):
+ self.assertEqual(received, obj)
+
+ def test_send_blocking_no_wait(self):
+ received = None
+ obj = b'spam'
cid = channels.create()
- cid = channels._channel_id(cid, _resolve=True)
- interp = interpreters.create()
+ def f():
+ nonlocal received
+ received = recv_wait(cid)
+ t = threading.Thread(target=f)
+ t.start()
+ channels.send(cid, obj, blocking=True)
+ t.join()
- out = _run_output(interp, dedent("""
- import _xxinterpchannels as _channels
- print(chan.id.end)
- _channels.send(chan.id, b'spam', blocking=False)
- """),
- dict(chan=cid.send))
- obj = channels.recv(cid)
+ self.assertEqual(received, obj)
- self.assertEqual(obj, b'spam')
- self.assertEqual(out.strip(), 'send')
+ def test_send_buffer_blocking_no_wait(self):
+ received = None
+ obj = bytearray(b'spam')
+ cid = channels.create()
+ def f():
+ nonlocal received
+ received = recv_wait(cid)
+ t = threading.Thread(target=f)
+ t.start()
+ channels.send_buffer(cid, obj, blocking=True)
+ t.join()
+
+ self.assertEqual(received, obj)
+
+ def test_send_closed_while_waiting(self):
+ obj = b'spam'
+ wait = self.build_send_waiter(obj)
+ cid = channels.create()
+ def f():
+ wait()
+ channels.close(cid, force=True)
+ t = threading.Thread(target=f)
+ t.start()
+ with self.assertRaises(channels.ChannelClosedError):
+ channels.send(cid, obj, blocking=True)
+ t.join()
+
+ def test_send_buffer_closed_while_waiting(self):
+ try:
+ self._has_run_once
+ except AttributeError:
+ # At the moment, this test leaks a few references.
+ # It looks like the leak originates with the addition
+ # of _channels.send_buffer() (gh-110246), whereas the
+ # tests were added afterward. We want this test even
+ # if the refleak isn't fixed yet, so we skip here.
+ raise unittest.SkipTest('temporarily skipped due to refleaks')
+ else:
+ self._has_run_once = True
+
+ obj = bytearray(b'spam')
+ wait = self.build_send_waiter(obj, buffer=True)
+ cid = channels.create()
+ def f():
+ wait()
+ channels.close(cid, force=True)
+ t = threading.Thread(target=f)
+ t.start()
+ with self.assertRaises(channels.ChannelClosedError):
+ channels.send_buffer(cid, obj, blocking=True)
+ t.join()
+ #-------------------
# close
def test_close_single_user(self):