import threading from textwrap import dedent import unittest import time from test.support import import_helper # Raise SkipTest if subinterpreters not supported. _queues = import_helper.import_module('_xxinterpqueues') from test.support import interpreters from test.support.interpreters import queues from .utils import _run_output, TestBase class TestBase(TestBase): def tearDown(self): for qid in _queues.list_all(): try: _queues.destroy(qid) except Exception: pass class QueueTests(TestBase): def test_create(self): with self.subTest('vanilla'): queue = queues.create() self.assertEqual(queue.maxsize, 0) with self.subTest('small maxsize'): queue = queues.create(3) self.assertEqual(queue.maxsize, 3) with self.subTest('big maxsize'): queue = queues.create(100) self.assertEqual(queue.maxsize, 100) with self.subTest('no maxsize'): queue = queues.create(0) self.assertEqual(queue.maxsize, 0) with self.subTest('negative maxsize'): queue = queues.create(-10) self.assertEqual(queue.maxsize, -10) with self.subTest('bad maxsize'): with self.assertRaises(TypeError): queues.create('1') def test_shareable(self): queue1 = queues.create() interp = interpreters.create() interp.exec_sync(dedent(f""" from test.support.interpreters import queues queue1 = queues.Queue({queue1.id}) """)); with self.subTest('same interpreter'): queue2 = queues.create() queue1.put(queue2) queue3 = queue1.get() self.assertIs(queue3, queue2) with self.subTest('from current interpreter'): queue4 = queues.create() queue1.put(queue4) out = _run_output(interp, dedent(""" queue4 = queue1.get() print(queue4.id) """)) qid = int(out) self.assertEqual(qid, queue4.id) with self.subTest('from subinterpreter'): out = _run_output(interp, dedent(""" queue5 = queues.create() queue1.put(queue5) print(queue5.id) """)) qid = int(out) queue5 = queue1.get() self.assertEqual(queue5.id, qid) def test_id_type(self): queue = queues.create() self.assertIsInstance(queue.id, int) def test_custom_id(self): with self.assertRaises(queues.QueueNotFoundError): queues.Queue(1_000_000) def test_id_readonly(self): queue = queues.create() with self.assertRaises(AttributeError): queue.id = 1_000_000 def test_maxsize_readonly(self): queue = queues.create(10) with self.assertRaises(AttributeError): queue.maxsize = 1_000_000 def test_hashable(self): queue = queues.create() expected = hash(queue.id) actual = hash(queue) self.assertEqual(actual, expected) def test_equality(self): queue1 = queues.create() queue2 = queues.create() self.assertEqual(queue1, queue1) self.assertNotEqual(queue1, queue2) class TestQueueOps(TestBase): def test_empty(self): queue = queues.create() before = queue.empty() queue.put(None) during = queue.empty() queue.get() after = queue.empty() self.assertIs(before, True) self.assertIs(during, False) self.assertIs(after, True) def test_full(self): expected = [False, False, False, True, False, False, False] actual = [] queue = queues.create(3) for _ in range(3): actual.append(queue.full()) queue.put(None) actual.append(queue.full()) for _ in range(3): queue.get() actual.append(queue.full()) self.assertEqual(actual, expected) def test_qsize(self): expected = [0, 1, 2, 3, 2, 3, 2, 1, 0, 1, 0] actual = [] queue = queues.create() for _ in range(3): actual.append(queue.qsize()) queue.put(None) actual.append(queue.qsize()) queue.get() actual.append(queue.qsize()) queue.put(None) actual.append(queue.qsize()) for _ in range(3): queue.get() actual.append(queue.qsize()) queue.put(None) actual.append(queue.qsize()) queue.get() actual.append(queue.qsize()) self.assertEqual(actual, expected) def test_put_get_main(self): expected = list(range(20)) queue = queues.create() for i in range(20): queue.put(i) actual = [queue.get() for _ in range(20)] self.assertEqual(actual, expected) def test_put_timeout(self): queue = queues.create(2) queue.put(None) queue.put(None) with self.assertRaises(queues.QueueFull): queue.put(None, timeout=0.1) queue.get() queue.put(None) def test_put_nowait(self): queue = queues.create(2) queue.put_nowait(None) queue.put_nowait(None) with self.assertRaises(queues.QueueFull): queue.put_nowait(None) queue.get() queue.put_nowait(None) def test_get_timeout(self): queue = queues.create() with self.assertRaises(queues.QueueEmpty): queue.get(timeout=0.1) def test_get_nowait(self): queue = queues.create() with self.assertRaises(queues.QueueEmpty): queue.get_nowait() def test_put_get_same_interpreter(self): interp = interpreters.create() interp.exec_sync(dedent(""" from test.support.interpreters import queues queue = queues.create() orig = b'spam' queue.put(orig) obj = queue.get() assert obj == orig, 'expected: obj == orig' assert obj is not orig, 'expected: obj is not orig' """)) def test_put_get_different_interpreters(self): interp = interpreters.create() queue1 = queues.create() queue2 = queues.create() self.assertEqual(len(queues.list_all()), 2) obj1 = b'spam' queue1.put(obj1) out = _run_output( interp, dedent(f""" from test.support.interpreters import queues queue1 = queues.Queue({queue1.id}) queue2 = queues.Queue({queue2.id}) assert queue1.qsize() == 1, 'expected: queue1.qsize() == 1' obj = queue1.get() assert queue1.qsize() == 0, 'expected: queue1.qsize() == 0' assert obj == b'spam', 'expected: obj == obj1' # When going to another interpreter we get a copy. assert id(obj) != {id(obj1)}, 'expected: obj is not obj1' obj2 = b'eggs' print(id(obj2)) assert queue2.qsize() == 0, 'expected: queue2.qsize() == 0' queue2.put(obj2) assert queue2.qsize() == 1, 'expected: queue2.qsize() == 1' """)) self.assertEqual(len(queues.list_all()), 2) self.assertEqual(queue1.qsize(), 0) self.assertEqual(queue2.qsize(), 1) obj2 = queue2.get() self.assertEqual(obj2, b'eggs') self.assertNotEqual(id(obj2), int(out)) def test_put_cleared_with_subinterpreter(self): interp = interpreters.create() queue = queues.create() out = _run_output( interp, dedent(f""" from test.support.interpreters import queues queue = queues.Queue({queue.id}) obj1 = b'spam' obj2 = b'eggs' queue.put(obj1) queue.put(obj2) """)) self.assertEqual(queue.qsize(), 2) obj1 = queue.get() self.assertEqual(obj1, b'spam') self.assertEqual(queue.qsize(), 1) del interp self.assertEqual(queue.qsize(), 0) def test_put_get_different_threads(self): queue1 = queues.create() queue2 = queues.create() def f(): while True: try: obj = queue1.get(timeout=0.1) break except queues.QueueEmpty: continue queue2.put(obj) t = threading.Thread(target=f) t.start() orig = b'spam' queue1.put(orig) obj = queue2.get() t.join() self.assertEqual(obj, orig) self.assertIsNot(obj, orig) if __name__ == '__main__': # Test needs to be a package, so we can do relative imports. unittest.main()