diff options
Diffstat (limited to 'Lib')
-rw-r--r-- | Lib/queue.py | 86 | ||||
-rw-r--r-- | Lib/test/test_queue.py | 240 |
2 files changed, 318 insertions, 8 deletions
diff --git a/Lib/queue.py b/Lib/queue.py index c803b96..ef07957 100644 --- a/Lib/queue.py +++ b/Lib/queue.py @@ -4,17 +4,26 @@ import threading from collections import deque from heapq import heappush, heappop from time import monotonic as time +try: + from _queue import SimpleQueue +except ImportError: + SimpleQueue = None -__all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue'] +__all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue', 'SimpleQueue'] -class Empty(Exception): - 'Exception raised by Queue.get(block=0)/get_nowait().' - pass + +try: + from _queue import Empty +except AttributeError: + class Empty(Exception): + 'Exception raised by Queue.get(block=0)/get_nowait().' + pass class Full(Exception): 'Exception raised by Queue.put(block=0)/put_nowait().' pass + class Queue: '''Create a queue object with a given maximum size. @@ -241,3 +250,72 @@ class LifoQueue(Queue): def _get(self): return self.queue.pop() + + +class _PySimpleQueue: + '''Simple, unbounded FIFO queue. + + This pure Python implementation is not reentrant. + ''' + # Note: while this pure Python version provides fairness + # (by using a threading.Semaphore which is itself fair, being based + # on threading.Condition), fairness is not part of the API contract. + # This allows the C version to use a different implementation. + + def __init__(self): + self._queue = deque() + self._count = threading.Semaphore(0) + + def put(self, item, block=True, timeout=None): + '''Put the item on the queue. + + The optional 'block' and 'timeout' arguments are ignored, as this method + never blocks. They are provided for compatibility with the Queue class. + ''' + self._queue.append(item) + self._count.release() + + def get(self, block=True, timeout=None): + '''Remove and return an item from the queue. + + If optional args 'block' is true and 'timeout' is None (the default), + block if necessary until an item is available. If 'timeout' is + a non-negative number, it blocks at most 'timeout' seconds and raises + the Empty exception if no item was available within that time. + Otherwise ('block' is false), return an item if one is immediately + available, else raise the Empty exception ('timeout' is ignored + in that case). + ''' + if timeout is not None and timeout < 0: + raise ValueError("'timeout' must be a non-negative number") + if not self._count.acquire(block, timeout): + raise Empty + return self._queue.popleft() + + def put_nowait(self, item): + '''Put an item into the queue without blocking. + + This is exactly equivalent to `put(item)` and is only provided + for compatibility with the Queue class. + ''' + return self.put(item, block=False) + + def get_nowait(self): + '''Remove and return an item from the queue without blocking. + + Only get an item if one is immediately available. Otherwise + raise the Empty exception. + ''' + return self.get(block=False) + + def empty(self): + '''Return True if the queue is empty, False otherwise (not reliable!).''' + return len(self._queue) == 0 + + def qsize(self): + '''Return the approximate size of the queue (not reliable!).''' + return len(self._queue) + + +if SimpleQueue is None: + SimpleQueue = _PySimpleQueue diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.py index 6ee906c..1a8d5f8 100644 --- a/Lib/test/test_queue.py +++ b/Lib/test/test_queue.py @@ -1,12 +1,22 @@ # Some simple queue module tests, plus some failure conditions # to ensure the Queue locks remain stable. +import collections +import itertools import queue +import random +import sys import threading import time import unittest +import weakref from test import support +try: + import _queue +except ImportError: + _queue = None + QUEUE_SIZE = 5 def qfull(q): @@ -84,7 +94,7 @@ class BaseQueueTestMixin(BlockingTestMixin): self.cum = 0 self.cumlock = threading.Lock() - def simple_queue_test(self, q): + def basic_queue_test(self, q): if q.qsize(): raise RuntimeError("Call this function with an empty queue") self.assertTrue(q.empty()) @@ -192,12 +202,12 @@ class BaseQueueTestMixin(BlockingTestMixin): else: self.fail("Did not detect task count going negative") - def test_simple_queue(self): + def test_basic(self): # Do it a couple of times on the same queue. # Done twice to make sure works with same instance reused. q = self.type2test(QUEUE_SIZE) - self.simple_queue_test(q) - self.simple_queue_test(q) + self.basic_queue_test(q) + self.basic_queue_test(q) def test_negative_timeout_raises_exception(self): q = self.type2test(QUEUE_SIZE) @@ -353,5 +363,227 @@ class FailingQueueTest(BlockingTestMixin, unittest.TestCase): self.failing_queue_test(q) +class BaseSimpleQueueTest: + + def setUp(self): + self.q = self.type2test() + + def feed(self, q, seq, rnd): + while True: + try: + val = seq.pop() + except IndexError: + return + q.put(val) + if rnd.random() > 0.5: + time.sleep(rnd.random() * 1e-3) + + def consume(self, q, results, sentinel): + while True: + val = q.get() + if val == sentinel: + return + results.append(val) + + def consume_nonblock(self, q, results, sentinel): + while True: + while True: + try: + val = q.get(block=False) + except queue.Empty: + time.sleep(1e-5) + else: + break + if val == sentinel: + return + results.append(val) + + def consume_timeout(self, q, results, sentinel): + while True: + while True: + try: + val = q.get(timeout=1e-5) + except queue.Empty: + pass + else: + break + if val == sentinel: + return + results.append(val) + + def run_threads(self, n_feeders, n_consumers, q, inputs, + feed_func, consume_func): + results = [] + sentinel = None + seq = inputs + [sentinel] * n_consumers + seq.reverse() + rnd = random.Random(42) + + exceptions = [] + def log_exceptions(f): + def wrapper(*args, **kwargs): + try: + f(*args, **kwargs) + except BaseException as e: + exceptions.append(e) + return wrapper + + feeders = [threading.Thread(target=log_exceptions(feed_func), + args=(q, seq, rnd)) + for i in range(n_feeders)] + consumers = [threading.Thread(target=log_exceptions(consume_func), + args=(q, results, sentinel)) + for i in range(n_consumers)] + + with support.start_threads(feeders + consumers): + pass + + self.assertFalse(exceptions) + self.assertTrue(q.empty()) + self.assertEqual(q.qsize(), 0) + + return results + + def test_basic(self): + # Basic tests for get(), put() etc. + q = self.q + self.assertTrue(q.empty()) + self.assertEqual(q.qsize(), 0) + q.put(1) + self.assertFalse(q.empty()) + self.assertEqual(q.qsize(), 1) + q.put(2) + q.put_nowait(3) + q.put(4) + self.assertFalse(q.empty()) + self.assertEqual(q.qsize(), 4) + + self.assertEqual(q.get(), 1) + self.assertEqual(q.qsize(), 3) + + self.assertEqual(q.get_nowait(), 2) + self.assertEqual(q.qsize(), 2) + + self.assertEqual(q.get(block=False), 3) + self.assertFalse(q.empty()) + self.assertEqual(q.qsize(), 1) + + self.assertEqual(q.get(timeout=0.1), 4) + self.assertTrue(q.empty()) + self.assertEqual(q.qsize(), 0) + + with self.assertRaises(queue.Empty): + q.get(block=False) + with self.assertRaises(queue.Empty): + q.get(timeout=1e-3) + with self.assertRaises(queue.Empty): + q.get_nowait() + self.assertTrue(q.empty()) + self.assertEqual(q.qsize(), 0) + + def test_negative_timeout_raises_exception(self): + q = self.q + q.put(1) + with self.assertRaises(ValueError): + q.get(timeout=-1) + + def test_order(self): + # Test a pair of concurrent put() and get() + q = self.q + inputs = list(range(100)) + results = self.run_threads(1, 1, q, inputs, self.feed, self.consume) + + # One producer, one consumer => results appended in well-defined order + self.assertEqual(results, inputs) + + def test_many_threads(self): + # Test multiple concurrent put() and get() + N = 50 + q = self.q + inputs = list(range(10000)) + results = self.run_threads(N, N, q, inputs, self.feed, self.consume) + + # Multiple consumers without synchronization append the + # results in random order + self.assertEqual(sorted(results), inputs) + + def test_many_threads_nonblock(self): + # Test multiple concurrent put() and get(block=False) + N = 50 + q = self.q + inputs = list(range(10000)) + results = self.run_threads(N, N, q, inputs, + self.feed, self.consume_nonblock) + + self.assertEqual(sorted(results), inputs) + + def test_many_threads_timeout(self): + # Test multiple concurrent put() and get(timeout=...) + N = 50 + q = self.q + inputs = list(range(1000)) + results = self.run_threads(N, N, q, inputs, + self.feed, self.consume_timeout) + + self.assertEqual(sorted(results), inputs) + + def test_references(self): + # The queue should lose references to each item as soon as + # it leaves the queue. + class C: + pass + + N = 20 + q = self.q + for i in range(N): + q.put(C()) + for i in range(N): + wr = weakref.ref(q.get()) + self.assertIsNone(wr()) + + +class PySimpleQueueTest(BaseSimpleQueueTest, unittest.TestCase): + type2test = queue._PySimpleQueue + + +@unittest.skipIf(_queue is None, "No _queue module found") +class CSimpleQueueTest(BaseSimpleQueueTest, unittest.TestCase): + + def setUp(self): + self.type2test = _queue.SimpleQueue + super().setUp() + + def test_is_default(self): + self.assertIs(self.type2test, queue.SimpleQueue) + + def test_reentrancy(self): + # bpo-14976: put() may be called reentrantly in an asynchronous + # callback. + q = self.q + gen = itertools.count() + N = 10000 + results = [] + + # This test exploits the fact that __del__ in a reference cycle + # can be called any time the GC may run. + + class Circular(object): + def __init__(self): + self.circular = self + + def __del__(self): + q.put(next(gen)) + + while True: + o = Circular() + q.put(next(gen)) + del o + results.append(q.get()) + if results[-1] >= N: + break + + self.assertEqual(results, list(range(N + 1))) + + if __name__ == "__main__": unittest.main() |