summaryrefslogtreecommitdiffstats
path: root/Lib/test
diff options
context:
space:
mode:
authorEric Snow <ericsnowcurrently@gmail.com>2023-10-17 23:05:49 (GMT)
committerGitHub <noreply@github.com>2023-10-17 23:05:49 (GMT)
commitc58c63fdf615a1c2bfc995dd0b938d82e32b6cde (patch)
treec55bd273d26fdcaf30013a44a7bf973940771a23 /Lib/test
parent7029c1a1c5b864056aa00298b1d0e0269f073f99 (diff)
downloadcpython-c58c63fdf615a1c2bfc995dd0b938d82e32b6cde.zip
cpython-c58c63fdf615a1c2bfc995dd0b938d82e32b6cde.tar.gz
cpython-c58c63fdf615a1c2bfc995dd0b938d82e32b6cde.tar.bz2
gh-84570: Add Timeouts to SendChannel.send() and RecvChannel.recv() (gh-110567)
Diffstat (limited to 'Lib/test')
-rw-r--r--Lib/test/support/interpreters.py20
-rw-r--r--Lib/test/test__xxinterpchannels.py128
-rw-r--r--Lib/test/test_interpreters.py5
3 files changed, 128 insertions, 25 deletions
diff --git a/Lib/test/support/interpreters.py b/Lib/test/support/interpreters.py
index 9ba6862..f8f42c0 100644
--- a/Lib/test/support/interpreters.py
+++ b/Lib/test/support/interpreters.py
@@ -170,15 +170,25 @@ class RecvChannel(_ChannelEnd):
_end = 'recv'
- def recv(self, *, _sentinel=object(), _delay=10 / 1000): # 10 milliseconds
+ def recv(self, timeout=None, *,
+ _sentinel=object(),
+ _delay=10 / 1000, # 10 milliseconds
+ ):
"""Return the next object from the channel.
This blocks until an object has been sent, if none have been
sent already.
"""
+ if timeout is not None:
+ timeout = int(timeout)
+ if timeout < 0:
+ raise ValueError(f'timeout value must be non-negative')
+ end = time.time() + timeout
obj = _channels.recv(self._id, _sentinel)
while obj is _sentinel:
time.sleep(_delay)
+ if timeout is not None and time.time() >= end:
+ raise TimeoutError
obj = _channels.recv(self._id, _sentinel)
return obj
@@ -203,12 +213,12 @@ class SendChannel(_ChannelEnd):
_end = 'send'
- def send(self, obj):
+ def send(self, obj, timeout=None):
"""Send the object (i.e. its data) to the channel's receiving end.
This blocks until the object is received.
"""
- _channels.send(self._id, obj, blocking=True)
+ _channels.send(self._id, obj, timeout=timeout, blocking=True)
def send_nowait(self, obj):
"""Send the object to the channel's receiving end.
@@ -221,12 +231,12 @@ class SendChannel(_ChannelEnd):
# See bpo-32604 and gh-19829.
return _channels.send(self._id, obj, blocking=False)
- def send_buffer(self, obj):
+ def send_buffer(self, obj, timeout=None):
"""Send the object's buffer to the channel's receiving end.
This blocks until the object is received.
"""
- _channels.send_buffer(self._id, obj, blocking=True)
+ _channels.send_buffer(self._id, obj, timeout=timeout, blocking=True)
def send_buffer_nowait(self, obj):
"""Send the object's buffer to the channel's receiving end.
diff --git a/Lib/test/test__xxinterpchannels.py b/Lib/test/test__xxinterpchannels.py
index 90a1224..1c1ef3f 100644
--- a/Lib/test/test__xxinterpchannels.py
+++ b/Lib/test/test__xxinterpchannels.py
@@ -864,22 +864,97 @@ class ChannelTests(TestBase):
self.assertEqual(received, obj)
+ def test_send_timeout(self):
+ obj = b'spam'
+
+ with self.subTest('non-blocking with timeout'):
+ cid = channels.create()
+ with self.assertRaises(ValueError):
+ channels.send(cid, obj, blocking=False, timeout=0.1)
+
+ with self.subTest('timeout hit'):
+ cid = channels.create()
+ with self.assertRaises(TimeoutError):
+ channels.send(cid, obj, blocking=True, timeout=0.1)
+ with self.assertRaises(channels.ChannelEmptyError):
+ received = channels.recv(cid)
+ print(repr(received))
+
+ with self.subTest('timeout not hit'):
+ cid = channels.create()
+ def f():
+ recv_wait(cid)
+ t = threading.Thread(target=f)
+ t.start()
+ channels.send(cid, obj, blocking=True, timeout=10)
+ t.join()
+
+ def test_send_buffer_timeout(self):
+ try:
+ self._has_run_once_timeout
+ except AttributeError:
+ # At the moment, this test leaks a few references.
+ # It looks like the leak originates with the addition
+ # of _channels.send_buffer() (gh-110246), whereas the
+ # tests were added afterward. We want this test even
+ # if the refleak isn't fixed yet, so we skip here.
+ raise unittest.SkipTest('temporarily skipped due to refleaks')
+ else:
+ self._has_run_once_timeout = True
+
+ obj = bytearray(b'spam')
+
+ with self.subTest('non-blocking with timeout'):
+ cid = channels.create()
+ with self.assertRaises(ValueError):
+ channels.send_buffer(cid, obj, blocking=False, timeout=0.1)
+
+ with self.subTest('timeout hit'):
+ cid = channels.create()
+ with self.assertRaises(TimeoutError):
+ channels.send_buffer(cid, obj, blocking=True, timeout=0.1)
+ with self.assertRaises(channels.ChannelEmptyError):
+ received = channels.recv(cid)
+ print(repr(received))
+
+ with self.subTest('timeout not hit'):
+ cid = channels.create()
+ def f():
+ recv_wait(cid)
+ t = threading.Thread(target=f)
+ t.start()
+ channels.send_buffer(cid, obj, blocking=True, timeout=10)
+ t.join()
+
def test_send_closed_while_waiting(self):
obj = b'spam'
wait = self.build_send_waiter(obj)
- cid = channels.create()
- def f():
- wait()
- channels.close(cid, force=True)
- t = threading.Thread(target=f)
- t.start()
- with self.assertRaises(channels.ChannelClosedError):
- channels.send(cid, obj, blocking=True)
- t.join()
+
+ with self.subTest('without timeout'):
+ cid = channels.create()
+ def f():
+ wait()
+ channels.close(cid, force=True)
+ t = threading.Thread(target=f)
+ t.start()
+ with self.assertRaises(channels.ChannelClosedError):
+ channels.send(cid, obj, blocking=True)
+ t.join()
+
+ with self.subTest('with timeout'):
+ cid = channels.create()
+ def f():
+ wait()
+ channels.close(cid, force=True)
+ t = threading.Thread(target=f)
+ t.start()
+ with self.assertRaises(channels.ChannelClosedError):
+ channels.send(cid, obj, blocking=True, timeout=30)
+ t.join()
def test_send_buffer_closed_while_waiting(self):
try:
- self._has_run_once
+ self._has_run_once_closed
except AttributeError:
# At the moment, this test leaks a few references.
# It looks like the leak originates with the addition
@@ -888,19 +963,32 @@ class ChannelTests(TestBase):
# if the refleak isn't fixed yet, so we skip here.
raise unittest.SkipTest('temporarily skipped due to refleaks')
else:
- self._has_run_once = True
+ self._has_run_once_closed = True
obj = bytearray(b'spam')
wait = self.build_send_waiter(obj, buffer=True)
- cid = channels.create()
- def f():
- wait()
- channels.close(cid, force=True)
- t = threading.Thread(target=f)
- t.start()
- with self.assertRaises(channels.ChannelClosedError):
- channels.send_buffer(cid, obj, blocking=True)
- t.join()
+
+ with self.subTest('without timeout'):
+ cid = channels.create()
+ def f():
+ wait()
+ channels.close(cid, force=True)
+ t = threading.Thread(target=f)
+ t.start()
+ with self.assertRaises(channels.ChannelClosedError):
+ channels.send_buffer(cid, obj, blocking=True)
+ t.join()
+
+ with self.subTest('with timeout'):
+ cid = channels.create()
+ def f():
+ wait()
+ channels.close(cid, force=True)
+ t = threading.Thread(target=f)
+ t.start()
+ with self.assertRaises(channels.ChannelClosedError):
+ channels.send_buffer(cid, obj, blocking=True, timeout=30)
+ t.join()
#-------------------
# close
diff --git a/Lib/test/test_interpreters.py b/Lib/test/test_interpreters.py
index 0910b51..d2d52ec 100644
--- a/Lib/test/test_interpreters.py
+++ b/Lib/test/test_interpreters.py
@@ -1022,6 +1022,11 @@ class TestSendRecv(TestBase):
self.assertEqual(obj2, b'eggs')
self.assertNotEqual(id(obj2), int(out))
+ def test_recv_timeout(self):
+ r, _ = interpreters.create_channel()
+ with self.assertRaises(TimeoutError):
+ r.recv(timeout=1)
+
def test_recv_channel_does_not_exist(self):
ch = interpreters.RecvChannel(1_000_000)
with self.assertRaises(interpreters.ChannelNotFoundError):