diff options
author | Eric Snow <ericsnowcurrently@gmail.com> | 2023-10-17 23:05:49 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-10-17 23:05:49 (GMT) |
commit | c58c63fdf615a1c2bfc995dd0b938d82e32b6cde (patch) | |
tree | c55bd273d26fdcaf30013a44a7bf973940771a23 /Lib/test | |
parent | 7029c1a1c5b864056aa00298b1d0e0269f073f99 (diff) | |
download | cpython-c58c63fdf615a1c2bfc995dd0b938d82e32b6cde.zip cpython-c58c63fdf615a1c2bfc995dd0b938d82e32b6cde.tar.gz cpython-c58c63fdf615a1c2bfc995dd0b938d82e32b6cde.tar.bz2 |
gh-84570: Add Timeouts to SendChannel.send() and RecvChannel.recv() (gh-110567)
Diffstat (limited to 'Lib/test')
-rw-r--r-- | Lib/test/support/interpreters.py | 20 | ||||
-rw-r--r-- | Lib/test/test__xxinterpchannels.py | 128 | ||||
-rw-r--r-- | Lib/test/test_interpreters.py | 5 |
3 files changed, 128 insertions, 25 deletions
diff --git a/Lib/test/support/interpreters.py b/Lib/test/support/interpreters.py index 9ba6862..f8f42c0 100644 --- a/Lib/test/support/interpreters.py +++ b/Lib/test/support/interpreters.py @@ -170,15 +170,25 @@ class RecvChannel(_ChannelEnd): _end = 'recv' - def recv(self, *, _sentinel=object(), _delay=10 / 1000): # 10 milliseconds + def recv(self, timeout=None, *, + _sentinel=object(), + _delay=10 / 1000, # 10 milliseconds + ): """Return the next object from the channel. This blocks until an object has been sent, if none have been sent already. """ + if timeout is not None: + timeout = int(timeout) + if timeout < 0: + raise ValueError(f'timeout value must be non-negative') + end = time.time() + timeout obj = _channels.recv(self._id, _sentinel) while obj is _sentinel: time.sleep(_delay) + if timeout is not None and time.time() >= end: + raise TimeoutError obj = _channels.recv(self._id, _sentinel) return obj @@ -203,12 +213,12 @@ class SendChannel(_ChannelEnd): _end = 'send' - def send(self, obj): + def send(self, obj, timeout=None): """Send the object (i.e. its data) to the channel's receiving end. This blocks until the object is received. """ - _channels.send(self._id, obj, blocking=True) + _channels.send(self._id, obj, timeout=timeout, blocking=True) def send_nowait(self, obj): """Send the object to the channel's receiving end. @@ -221,12 +231,12 @@ class SendChannel(_ChannelEnd): # See bpo-32604 and gh-19829. return _channels.send(self._id, obj, blocking=False) - def send_buffer(self, obj): + def send_buffer(self, obj, timeout=None): """Send the object's buffer to the channel's receiving end. This blocks until the object is received. """ - _channels.send_buffer(self._id, obj, blocking=True) + _channels.send_buffer(self._id, obj, timeout=timeout, blocking=True) def send_buffer_nowait(self, obj): """Send the object's buffer to the channel's receiving end. diff --git a/Lib/test/test__xxinterpchannels.py b/Lib/test/test__xxinterpchannels.py index 90a1224..1c1ef3f 100644 --- a/Lib/test/test__xxinterpchannels.py +++ b/Lib/test/test__xxinterpchannels.py @@ -864,22 +864,97 @@ class ChannelTests(TestBase): self.assertEqual(received, obj) + def test_send_timeout(self): + obj = b'spam' + + with self.subTest('non-blocking with timeout'): + cid = channels.create() + with self.assertRaises(ValueError): + channels.send(cid, obj, blocking=False, timeout=0.1) + + with self.subTest('timeout hit'): + cid = channels.create() + with self.assertRaises(TimeoutError): + channels.send(cid, obj, blocking=True, timeout=0.1) + with self.assertRaises(channels.ChannelEmptyError): + received = channels.recv(cid) + print(repr(received)) + + with self.subTest('timeout not hit'): + cid = channels.create() + def f(): + recv_wait(cid) + t = threading.Thread(target=f) + t.start() + channels.send(cid, obj, blocking=True, timeout=10) + t.join() + + def test_send_buffer_timeout(self): + try: + self._has_run_once_timeout + except AttributeError: + # At the moment, this test leaks a few references. + # It looks like the leak originates with the addition + # of _channels.send_buffer() (gh-110246), whereas the + # tests were added afterward. We want this test even + # if the refleak isn't fixed yet, so we skip here. + raise unittest.SkipTest('temporarily skipped due to refleaks') + else: + self._has_run_once_timeout = True + + obj = bytearray(b'spam') + + with self.subTest('non-blocking with timeout'): + cid = channels.create() + with self.assertRaises(ValueError): + channels.send_buffer(cid, obj, blocking=False, timeout=0.1) + + with self.subTest('timeout hit'): + cid = channels.create() + with self.assertRaises(TimeoutError): + channels.send_buffer(cid, obj, blocking=True, timeout=0.1) + with self.assertRaises(channels.ChannelEmptyError): + received = channels.recv(cid) + print(repr(received)) + + with self.subTest('timeout not hit'): + cid = channels.create() + def f(): + recv_wait(cid) + t = threading.Thread(target=f) + t.start() + channels.send_buffer(cid, obj, blocking=True, timeout=10) + t.join() + def test_send_closed_while_waiting(self): obj = b'spam' wait = self.build_send_waiter(obj) - cid = channels.create() - def f(): - wait() - channels.close(cid, force=True) - t = threading.Thread(target=f) - t.start() - with self.assertRaises(channels.ChannelClosedError): - channels.send(cid, obj, blocking=True) - t.join() + + with self.subTest('without timeout'): + cid = channels.create() + def f(): + wait() + channels.close(cid, force=True) + t = threading.Thread(target=f) + t.start() + with self.assertRaises(channels.ChannelClosedError): + channels.send(cid, obj, blocking=True) + t.join() + + with self.subTest('with timeout'): + cid = channels.create() + def f(): + wait() + channels.close(cid, force=True) + t = threading.Thread(target=f) + t.start() + with self.assertRaises(channels.ChannelClosedError): + channels.send(cid, obj, blocking=True, timeout=30) + t.join() def test_send_buffer_closed_while_waiting(self): try: - self._has_run_once + self._has_run_once_closed except AttributeError: # At the moment, this test leaks a few references. # It looks like the leak originates with the addition @@ -888,19 +963,32 @@ class ChannelTests(TestBase): # if the refleak isn't fixed yet, so we skip here. raise unittest.SkipTest('temporarily skipped due to refleaks') else: - self._has_run_once = True + self._has_run_once_closed = True obj = bytearray(b'spam') wait = self.build_send_waiter(obj, buffer=True) - cid = channels.create() - def f(): - wait() - channels.close(cid, force=True) - t = threading.Thread(target=f) - t.start() - with self.assertRaises(channels.ChannelClosedError): - channels.send_buffer(cid, obj, blocking=True) - t.join() + + with self.subTest('without timeout'): + cid = channels.create() + def f(): + wait() + channels.close(cid, force=True) + t = threading.Thread(target=f) + t.start() + with self.assertRaises(channels.ChannelClosedError): + channels.send_buffer(cid, obj, blocking=True) + t.join() + + with self.subTest('with timeout'): + cid = channels.create() + def f(): + wait() + channels.close(cid, force=True) + t = threading.Thread(target=f) + t.start() + with self.assertRaises(channels.ChannelClosedError): + channels.send_buffer(cid, obj, blocking=True, timeout=30) + t.join() #------------------- # close diff --git a/Lib/test/test_interpreters.py b/Lib/test/test_interpreters.py index 0910b51..d2d52ec 100644 --- a/Lib/test/test_interpreters.py +++ b/Lib/test/test_interpreters.py @@ -1022,6 +1022,11 @@ class TestSendRecv(TestBase): self.assertEqual(obj2, b'eggs') self.assertNotEqual(id(obj2), int(out)) + def test_recv_timeout(self): + r, _ = interpreters.create_channel() + with self.assertRaises(TimeoutError): + r.recv(timeout=1) + def test_recv_channel_does_not_exist(self): ch = interpreters.RecvChannel(1_000_000) with self.assertRaises(interpreters.ChannelNotFoundError): |