summaryrefslogtreecommitdiffstats
path: root/Lib/test/test_queue.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_queue.py')
-rw-r--r--Lib/test/test_queue.py378
1 files changed, 378 insertions, 0 deletions
diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.py
index 33113a7..e3d4d56 100644
--- a/Lib/test/test_queue.py
+++ b/Lib/test/test_queue.py
@@ -241,6 +241,384 @@ class BaseQueueTestMixin(BlockingTestMixin):
with self.assertRaises(self.queue.Full):
q.put_nowait(4)
+ def test_shutdown_empty(self):
+ q = self.type2test()
+ q.shutdown()
+ with self.assertRaises(self.queue.ShutDown):
+ q.put("data")
+ with self.assertRaises(self.queue.ShutDown):
+ q.get()
+
+ def test_shutdown_nonempty(self):
+ q = self.type2test()
+ q.put("data")
+ q.shutdown()
+ q.get()
+ with self.assertRaises(self.queue.ShutDown):
+ q.get()
+
+ def test_shutdown_immediate(self):
+ q = self.type2test()
+ q.put("data")
+ q.shutdown(immediate=True)
+ with self.assertRaises(self.queue.ShutDown):
+ q.get()
+
+ def test_shutdown_allowed_transitions(self):
+ # allowed transitions would be from alive via shutdown to immediate
+ q = self.type2test()
+ self.assertFalse(q.is_shutdown)
+
+ q.shutdown()
+ self.assertTrue(q.is_shutdown)
+
+ q.shutdown(immediate=True)
+ self.assertTrue(q.is_shutdown)
+
+ q.shutdown(immediate=False)
+
+ def _shutdown_all_methods_in_one_thread(self, immediate):
+ q = self.type2test(2)
+ q.put("L")
+ q.put_nowait("O")
+ q.shutdown(immediate)
+
+ with self.assertRaises(self.queue.ShutDown):
+ q.put("E")
+ with self.assertRaises(self.queue.ShutDown):
+ q.put_nowait("W")
+ if immediate:
+ with self.assertRaises(self.queue.ShutDown):
+ q.get()
+ with self.assertRaises(self.queue.ShutDown):
+ q.get_nowait()
+ with self.assertRaises(ValueError):
+ q.task_done()
+ q.join()
+ else:
+ self.assertIn(q.get(), "LO")
+ q.task_done()
+ self.assertIn(q.get(), "LO")
+ q.task_done()
+ q.join()
+ # on shutdown(immediate=False)
+ # when queue is empty, should raise ShutDown Exception
+ with self.assertRaises(self.queue.ShutDown):
+ q.get() # p.get(True)
+ with self.assertRaises(self.queue.ShutDown):
+ q.get_nowait() # p.get(False)
+ with self.assertRaises(self.queue.ShutDown):
+ q.get(True, 1.0)
+
+ def test_shutdown_all_methods_in_one_thread(self):
+ return self._shutdown_all_methods_in_one_thread(False)
+
+ def test_shutdown_immediate_all_methods_in_one_thread(self):
+ return self._shutdown_all_methods_in_one_thread(True)
+
+ def _write_msg_thread(self, q, n, results, delay,
+ i_when_exec_shutdown,
+ event_start, event_end):
+ event_start.wait()
+ for i in range(1, n+1):
+ try:
+ q.put((i, "YDLO"))
+ results.append(True)
+ except self.queue.ShutDown:
+ results.append(False)
+ # triggers shutdown of queue
+ if i == i_when_exec_shutdown:
+ event_end.set()
+ time.sleep(delay)
+ # end of all puts
+ q.join()
+
+ def _read_msg_thread(self, q, nb, results, delay, event_start):
+ event_start.wait()
+ block = True
+ while nb:
+ time.sleep(delay)
+ try:
+ # Get at least one message
+ q.get(block)
+ block = False
+ q.task_done()
+ results.append(True)
+ nb -= 1
+ except self.queue.ShutDown:
+ results.append(False)
+ nb -= 1
+ except self.queue.Empty:
+ pass
+ q.join()
+
+ def _shutdown_thread(self, q, event_end, immediate):
+ event_end.wait()
+ q.shutdown(immediate)
+ q.join()
+
+ def _join_thread(self, q, delay, event_start):
+ event_start.wait()
+ time.sleep(delay)
+ q.join()
+
+ def _shutdown_all_methods_in_many_threads(self, immediate):
+ q = self.type2test()
+ ps = []
+ ev_start = threading.Event()
+ ev_exec_shutdown = threading.Event()
+ res_puts = []
+ res_gets = []
+ delay = 1e-4
+ read_process = 4
+ nb_msgs = read_process * 16
+ nb_msgs_r = nb_msgs // read_process
+ when_exec_shutdown = nb_msgs // 2
+ lprocs = (
+ (self._write_msg_thread, 1, (q, nb_msgs, res_puts, delay,
+ when_exec_shutdown,
+ ev_start, ev_exec_shutdown)),
+ (self._read_msg_thread, read_process, (q, nb_msgs_r,
+ res_gets, delay*2,
+ ev_start)),
+ (self._join_thread, 2, (q, delay*2, ev_start)),
+ (self._shutdown_thread, 1, (q, ev_exec_shutdown, immediate)),
+ )
+ # start all threds
+ for func, n, args in lprocs:
+ for i in range(n):
+ ps.append(threading.Thread(target=func, args=args))
+ ps[-1].start()
+ # set event in order to run q.shutdown()
+ ev_start.set()
+
+ if not immediate:
+ assert(len(res_gets) == len(res_puts))
+ assert(res_gets.count(True) == res_puts.count(True))
+ else:
+ assert(len(res_gets) <= len(res_puts))
+ assert(res_gets.count(True) <= res_puts.count(True))
+
+ for thread in ps[1:]:
+ thread.join()
+
+ def test_shutdown_all_methods_in_many_threads(self):
+ return self._shutdown_all_methods_in_many_threads(False)
+
+ def test_shutdown_immediate_all_methods_in_many_threads(self):
+ return self._shutdown_all_methods_in_many_threads(True)
+
+ def _get(self, q, go, results, shutdown=False):
+ go.wait()
+ try:
+ msg = q.get()
+ results.append(not shutdown)
+ return not shutdown
+ except self.queue.ShutDown:
+ results.append(shutdown)
+ return shutdown
+
+ def _get_shutdown(self, q, go, results):
+ return self._get(q, go, results, True)
+
+ def _get_task_done(self, q, go, results):
+ go.wait()
+ try:
+ msg = q.get()
+ q.task_done()
+ results.append(True)
+ return msg
+ except self.queue.ShutDown:
+ results.append(False)
+ return False
+
+ def _put(self, q, msg, go, results, shutdown=False):
+ go.wait()
+ try:
+ q.put(msg)
+ results.append(not shutdown)
+ return not shutdown
+ except self.queue.ShutDown:
+ results.append(shutdown)
+ return shutdown
+
+ def _put_shutdown(self, q, msg, go, results):
+ return self._put(q, msg, go, results, True)
+
+ def _join(self, q, results, shutdown=False):
+ try:
+ q.join()
+ results.append(not shutdown)
+ return not shutdown
+ except self.queue.ShutDown:
+ results.append(shutdown)
+ return shutdown
+
+ def _join_shutdown(self, q, results):
+ return self._join(q, results, True)
+
+ def _shutdown_get(self, immediate):
+ q = self.type2test(2)
+ results = []
+ go = threading.Event()
+ q.put("Y")
+ q.put("D")
+ # queue full
+
+ if immediate:
+ thrds = (
+ (self._get_shutdown, (q, go, results)),
+ (self._get_shutdown, (q, go, results)),
+ )
+ else:
+ thrds = (
+ # on shutdown(immediate=False)
+ # one of these threads shoud raise Shutdown
+ (self._get, (q, go, results)),
+ (self._get, (q, go, results)),
+ (self._get, (q, go, results)),
+ )
+ threads = []
+ for func, params in thrds:
+ threads.append(threading.Thread(target=func, args=params))
+ threads[-1].start()
+ q.shutdown(immediate)
+ go.set()
+ for t in threads:
+ t.join()
+ if immediate:
+ self.assertListEqual(results, [True, True])
+ else:
+ self.assertListEqual(sorted(results), [False] + [True]*(len(thrds)-1))
+
+ def test_shutdown_get(self):
+ return self._shutdown_get(False)
+
+ def test_shutdown_immediate_get(self):
+ return self._shutdown_get(True)
+
+ def _shutdown_put(self, immediate):
+ q = self.type2test(2)
+ results = []
+ go = threading.Event()
+ q.put("Y")
+ q.put("D")
+ # queue fulled
+
+ thrds = (
+ (self._put_shutdown, (q, "E", go, results)),
+ (self._put_shutdown, (q, "W", go, results)),
+ )
+ threads = []
+ for func, params in thrds:
+ threads.append(threading.Thread(target=func, args=params))
+ threads[-1].start()
+ q.shutdown()
+ go.set()
+ for t in threads:
+ t.join()
+
+ self.assertEqual(results, [True]*len(thrds))
+
+ def test_shutdown_put(self):
+ return self._shutdown_put(False)
+
+ def test_shutdown_immediate_put(self):
+ return self._shutdown_put(True)
+
+ def _shutdown_join(self, immediate):
+ q = self.type2test()
+ results = []
+ q.put("Y")
+ go = threading.Event()
+ nb = q.qsize()
+
+ thrds = (
+ (self._join, (q, results)),
+ (self._join, (q, results)),
+ )
+ threads = []
+ for func, params in thrds:
+ threads.append(threading.Thread(target=func, args=params))
+ threads[-1].start()
+ if not immediate:
+ res = []
+ for i in range(nb):
+ threads.append(threading.Thread(target=self._get_task_done, args=(q, go, res)))
+ threads[-1].start()
+ q.shutdown(immediate)
+ go.set()
+ for t in threads:
+ t.join()
+
+ self.assertEqual(results, [True]*len(thrds))
+
+ def test_shutdown_immediate_join(self):
+ return self._shutdown_join(True)
+
+ def test_shutdown_join(self):
+ return self._shutdown_join(False)
+
+ def _shutdown_put_join(self, immediate):
+ q = self.type2test(2)
+ results = []
+ go = threading.Event()
+ q.put("Y")
+ nb = q.qsize()
+ # queue not fulled
+
+ thrds = (
+ (self._put_shutdown, (q, "E", go, results)),
+ (self._join, (q, results)),
+ )
+ threads = []
+ for func, params in thrds:
+ threads.append(threading.Thread(target=func, args=params))
+ threads[-1].start()
+ self.assertEqual(q.unfinished_tasks, nb)
+ for i in range(nb):
+ t = threading.Thread(target=q.task_done)
+ t.start()
+ threads.append(t)
+ q.shutdown(immediate)
+ go.set()
+ for t in threads:
+ t.join()
+
+ self.assertEqual(results, [True]*len(thrds))
+
+ def test_shutdown_immediate_put_join(self):
+ return self._shutdown_put_join(True)
+
+ def test_shutdown_put_join(self):
+ return self._shutdown_put_join(False)
+
+ def test_shutdown_get_task_done_join(self):
+ q = self.type2test(2)
+ results = []
+ go = threading.Event()
+ q.put("Y")
+ q.put("D")
+ self.assertEqual(q.unfinished_tasks, q.qsize())
+
+ thrds = (
+ (self._get_task_done, (q, go, results)),
+ (self._get_task_done, (q, go, results)),
+ (self._join, (q, results)),
+ (self._join, (q, results)),
+ )
+ threads = []
+ for func, params in thrds:
+ threads.append(threading.Thread(target=func, args=params))
+ threads[-1].start()
+ go.set()
+ q.shutdown(False)
+ for t in threads:
+ t.join()
+
+ self.assertEqual(results, [True]*len(thrds))
+
+
class QueueTest(BaseQueueTestMixin):
def setUp(self):