summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_queue.py
diff options
context:
space:
mode:
authorDuprat <yduprat@gmail.com>2024-03-18 16:15:29 (GMT)
committerGitHub <noreply@github.com>2024-03-18 16:15:29 (GMT)
commit7707b14489644073ab0153f5751c6ddbf3fc6f91 (patch)
tree2b1c9acfdd829aea32e80dbd7378e6bd9446644b /Lib/test/test_queue.py
parentf139d840fb543f8357ce9fc8f845c4e945a0ce85 (diff)
downloadcpython-7707b14489644073ab0153f5751c6ddbf3fc6f91.zip
cpython-7707b14489644073ab0153f5751c6ddbf3fc6f91.tar.gz
cpython-7707b14489644073ab0153f5751c6ddbf3fc6f91.tar.bz2
gh-115258: Fix hanging tests for threading queue shutdown (#115940)
This reinstates `test_shutdown_immediate_all_methods_in_many_threads` and improves `test_shutdown_all_methods_in_many_threads`.
Diffstat (limited to 'Lib/test/test_queue.py')
-rw-r--r--Lib/test/test_queue.py132
1 files changed, 71 insertions, 61 deletions
diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.py
index ad31ba1..c4d1011 100644
--- a/Lib/test/test_queue.py
+++ b/Lib/test/test_queue.py
@@ -317,97 +317,107 @@ class BaseQueueTestMixin(BlockingTestMixin):
def test_shutdown_immediate_all_methods_in_one_thread(self):
return self._shutdown_all_methods_in_one_thread(True)
- def _write_msg_thread(self, q, n, results, delay,
- i_when_exec_shutdown,
- event_start, event_end):
- event_start.wait()
- for i in range(1, n+1):
+ def _write_msg_thread(self, q, n, results,
+ i_when_exec_shutdown, event_shutdown,
+ barrier_start):
+ # All `write_msg_threads`
+ # put several items into the queue.
+ for i in range(0, i_when_exec_shutdown//2):
+ q.put((i, 'LOYD'))
+ # Wait for the barrier to be complete.
+ barrier_start.wait()
+
+ for i in range(i_when_exec_shutdown//2, n):
try:
q.put((i, "YDLO"))
- results.append(True)
except self.queue.ShutDown:
results.append(False)
- # triggers shutdown of queue
- if i == i_when_exec_shutdown:
- event_end.set()
- time.sleep(delay)
- # end of all puts
- q.join()
+ break
- def _read_msg_thread(self, q, nb, results, delay, event_start):
- event_start.wait()
- block = True
- while nb:
- time.sleep(delay)
+ # Trigger queue shutdown.
+ if i == i_when_exec_shutdown:
+ # Only one thread should call shutdown().
+ if not event_shutdown.is_set():
+ event_shutdown.set()
+ results.append(True)
+
+ def _read_msg_thread(self, q, results, barrier_start):
+ # Get at least one item.
+ q.get(True)
+ q.task_done()
+ # Wait for the barrier to be complete.
+ barrier_start.wait()
+ while True:
try:
- # Get at least one message
- q.get(block)
- block = False
+ q.get(False)
q.task_done()
- results.append(True)
- nb -= 1
except self.queue.ShutDown:
- results.append(False)
- nb -= 1
+ results.append(True)
+ break
except self.queue.Empty:
pass
- q.join()
- def _shutdown_thread(self, q, event_end, immediate):
+ def _shutdown_thread(self, q, results, event_end, immediate):
event_end.wait()
q.shutdown(immediate)
- q.join()
+ results.append(q.qsize() == 0)
- def _join_thread(self, q, delay, event_start):
- event_start.wait()
- time.sleep(delay)
+ def _join_thread(self, q, barrier_start):
+ # Wait for the barrier to be complete.
+ barrier_start.wait()
q.join()
def _shutdown_all_methods_in_many_threads(self, immediate):
+ # Run a 'multi-producers/consumers queue' use case,
+ # with enough items into the queue.
+ # When shutdown, all running threads will be joined.
q = self.type2test()
ps = []
- ev_start = threading.Event()
- ev_exec_shutdown = threading.Event()
res_puts = []
res_gets = []
- delay = 1e-4
- read_process = 4
- nb_msgs = read_process * 16
- nb_msgs_r = nb_msgs // read_process
- when_exec_shutdown = nb_msgs // 2
- lprocs = (
- (self._write_msg_thread, 1, (q, nb_msgs, res_puts, delay,
- when_exec_shutdown,
- ev_start, ev_exec_shutdown)),
- (self._read_msg_thread, read_process, (q, nb_msgs_r,
- res_gets, delay*2,
- ev_start)),
- (self._join_thread, 2, (q, delay*2, ev_start)),
- (self._shutdown_thread, 1, (q, ev_exec_shutdown, immediate)),
- )
- # start all threds
+ res_shutdown = []
+ write_threads = 4
+ read_threads = 6
+ join_threads = 2
+ nb_msgs = 1024*64
+ nb_msgs_w = nb_msgs // write_threads
+ when_exec_shutdown = nb_msgs_w // 2
+ # Use of a Barrier to ensure that
+ # - all write threads put all their items into the queue,
+ # - all read thread get at least one item from the queue,
+ # and keep on running until shutdown.
+ # The join thread is started only when shutdown is immediate.
+ nparties = write_threads + read_threads
+ if immediate:
+ nparties += join_threads
+ barrier_start = threading.Barrier(nparties)
+ ev_exec_shutdown = threading.Event()
+ lprocs = [
+ (self._write_msg_thread, write_threads, (q, nb_msgs_w, res_puts,
+ when_exec_shutdown, ev_exec_shutdown,
+ barrier_start)),
+ (self._read_msg_thread, read_threads, (q, res_gets, barrier_start)),
+ (self._shutdown_thread, 1, (q, res_shutdown, ev_exec_shutdown, immediate)),
+ ]
+ if immediate:
+ lprocs.append((self._join_thread, join_threads, (q, barrier_start)))
+ # start all threads.
for func, n, args in lprocs:
for i in range(n):
ps.append(threading.Thread(target=func, args=args))
ps[-1].start()
- # set event in order to run q.shutdown()
- ev_start.set()
-
- if not immediate:
- assert(len(res_gets) == len(res_puts))
- assert(res_gets.count(True) == res_puts.count(True))
- else:
- assert(len(res_gets) <= len(res_puts))
- assert(res_gets.count(True) <= res_puts.count(True))
-
- for thread in ps[1:]:
+ for thread in ps:
thread.join()
- @unittest.skip("test times out (gh-115258)")
+ self.assertTrue(True in res_puts)
+ self.assertEqual(res_gets.count(True), read_threads)
+ if immediate:
+ self.assertListEqual(res_shutdown, [True])
+ self.assertTrue(q.empty())
+
def test_shutdown_all_methods_in_many_threads(self):
return self._shutdown_all_methods_in_many_threads(False)
- @unittest.skip("test times out (gh-115258)")
def test_shutdown_immediate_all_methods_in_many_threads(self):
return self._shutdown_all_methods_in_many_threads(True)