path: root/Modules
diff options
authorAntoine Pitrou <>2018-01-15 23:27:16 (GMT)
committerGitHub <>2018-01-15 23:27:16 (GMT)
commit94e1696d04c65e19ea52e5c8664079c9d9aa0e54 (patch)
tree2e68e71052365395b8fc843f30c9e430c0788ae6 /Modules
parent5ec0feeeecc1617987ec6cdc6d62b916e718a5cf (diff)
bpo-14976: Reentrant simple queue (#3346)
Add a queue.SimpleQueue class, an unbounded FIFO queue with a reentrant C implementation of put().
Diffstat (limited to 'Modules')
2 files changed, 618 insertions, 0 deletions
diff --git a/Modules/_queuemodule.c b/Modules/_queuemodule.c
new file mode 100644
index 0000000..8715337
--- /dev/null
+++ b/Modules/_queuemodule.c
@@ -0,0 +1,400 @@
+#include "Python.h"
+#include "structmember.h" /* offsetof */
+#include "pythread.h"
+/*[clinic input]
+module _queue
+class _queue.SimpleQueue "simplequeueobject *" "&PySimpleQueueType"
+[clinic start generated code]*/
+/*[clinic end generated code: output=da39a3ee5e6b4b0d input=cf49af81bcbbbea6]*/
+extern PyTypeObject PySimpleQueueType; /* forward decl */
+static PyObject *EmptyError;
+typedef struct {
+ PyObject_HEAD
+ PyThread_type_lock lock;
+ int locked;
+ PyObject *lst;
+ Py_ssize_t lst_pos;
+ PyObject *weakreflist;
+} simplequeueobject;
+static void
+simplequeue_dealloc(simplequeueobject *self)
+ _PyObject_GC_UNTRACK(self);
+ if (self->lock != NULL) {
+ /* Unlock the lock so it's safe to free it */
+ if (self->locked > 0)
+ PyThread_release_lock(self->lock);
+ PyThread_free_lock(self->lock);
+ }
+ Py_XDECREF(self->lst);
+ if (self->weakreflist != NULL)
+ PyObject_ClearWeakRefs((PyObject *) self);
+ Py_TYPE(self)->tp_free(self);
+static int
+simplequeue_traverse(simplequeueobject *self, visitproc visit, void *arg)
+ Py_VISIT(self->lst);
+ return 0;
+/*[clinic input]
+_queue.SimpleQueue.__new__ as simplequeue_new
+Simple, unbounded, reentrant FIFO queue.
+[clinic start generated code]*/
+static PyObject *
+simplequeue_new_impl(PyTypeObject *type)
+/*[clinic end generated code: output=ba97740608ba31cd input=a0674a1643e3e2fb]*/
+ simplequeueobject *self;
+ self = (simplequeueobject *) type->tp_alloc(type, 0);
+ if (self != NULL) {
+ self->weakreflist = NULL;
+ self->lst = PyList_New(0);
+ self->lock = PyThread_allocate_lock();
+ self->lst_pos = 0;
+ if (self->lock == NULL) {
+ Py_DECREF(self);
+ PyErr_SetString(PyExc_MemoryError, "can't allocate lock");
+ return NULL;
+ }
+ if (self->lst == NULL) {
+ Py_DECREF(self);
+ return NULL;
+ }
+ }
+ return (PyObject *) self;
+/*[clinic input]
+ item: object
+ block: bool = True
+ timeout: object = None
+Put the item on the queue.
+The optional 'block' and 'timeout' arguments are ignored, as this method
+never blocks. They are provided for compatibility with the Queue class.
+[clinic start generated code]*/
+static PyObject *
+_queue_SimpleQueue_put_impl(simplequeueobject *self, PyObject *item,
+ int block, PyObject *timeout)
+/*[clinic end generated code: output=4333136e88f90d8b input=6e601fa707a782d5]*/
+ /* BEGIN GIL-protected critical section */
+ if (PyList_Append(self->lst, item) < 0)
+ return NULL;
+ if (self->locked) {
+ /* A get() may be waiting, wake it up */
+ self->locked = 0;
+ PyThread_release_lock(self->lock);
+ }
+ /* END GIL-protected critical section */
+/*[clinic input]
+ item: object
+Put an item into the queue without blocking.
+This is exactly equivalent to `put(item)` and is only provided
+for compatibility with the Queue class.
+[clinic start generated code]*/
+static PyObject *
+_queue_SimpleQueue_put_nowait_impl(simplequeueobject *self, PyObject *item)
+/*[clinic end generated code: output=0990536715efb1f1 input=36b1ea96756b2ece]*/
+ return _queue_SimpleQueue_put_impl(self, item, 0, Py_None);
+static PyObject *
+simplequeue_pop_item(simplequeueobject *self)
+ Py_ssize_t count, n;
+ PyObject *item;
+ n = PyList_GET_SIZE(self->lst);
+ assert(self->lst_pos < n);
+ item = PyList_GET_ITEM(self->lst, self->lst_pos);
+ Py_INCREF(Py_None);
+ PyList_SET_ITEM(self->lst, self->lst_pos, Py_None);
+ self->lst_pos += 1;
+ count = n - self->lst_pos;
+ if (self->lst_pos > count) {
+ /* The list is more than 50% empty, reclaim space at the beginning */
+ if (PyList_SetSlice(self->lst, 0, self->lst_pos, NULL)) {
+ /* Undo pop */
+ self->lst_pos -= 1;
+ PyList_SET_ITEM(self->lst, self->lst_pos, item);
+ return NULL;
+ }
+ self->lst_pos = 0;
+ }
+ return item;
+/*[clinic input]
+ block: bool = True
+ timeout: object = None
+Remove and return an item from the queue.
+If optional args 'block' is true and 'timeout' is None (the default),
+block if necessary until an item is available. If 'timeout' is
+a non-negative number, it blocks at most 'timeout' seconds and raises
+the Empty exception if no item was available within that time.
+Otherwise ('block' is false), return an item if one is immediately
+available, else raise the Empty exception ('timeout' is ignored
+in that case).
+[clinic start generated code]*/
+static PyObject *
+_queue_SimpleQueue_get_impl(simplequeueobject *self, int block,
+ PyObject *timeout)
+/*[clinic end generated code: output=ec82a7157dcccd1a input=4bf691f9f01fa297]*/
+ _PyTime_t endtime = 0;
+ _PyTime_t timeout_val;
+ PyObject *item;
+ PyLockStatus r;
+ PY_TIMEOUT_T microseconds;
+ if (block == 0) {
+ /* Non-blocking */
+ microseconds = 0;
+ }
+ else if (timeout != Py_None) {
+ /* With timeout */
+ if (_PyTime_FromSecondsObject(&timeout_val,
+ timeout, _PyTime_ROUND_CEILING) < 0)
+ return NULL;
+ if (timeout_val < 0) {
+ PyErr_SetString(PyExc_ValueError,
+ "'timeout' must be a non-negative number");
+ return NULL;
+ }
+ microseconds = _PyTime_AsMicroseconds(timeout_val,
+ if (microseconds >= PY_TIMEOUT_MAX) {
+ PyErr_SetString(PyExc_OverflowError,
+ "timeout value is too large");
+ return NULL;
+ }
+ endtime = _PyTime_GetMonotonicClock() + timeout_val;
+ }
+ else {
+ /* Infinitely blocking */
+ microseconds = -1;
+ }
+ /* put() signals the queue to be non-empty by releasing the lock.
+ * So we simply try to acquire the lock in a loop, until the condition
+ * (queue non-empty) becomes true.
+ */
+ while (self->lst_pos == PyList_GET_SIZE(self->lst)) {
+ /* First a simple non-blocking try without releasing the GIL */
+ r = PyThread_acquire_lock_timed(self->lock, 0, 0);
+ if (r == PY_LOCK_FAILURE && microseconds != 0) {
+ r = PyThread_acquire_lock_timed(self->lock, microseconds, 1);
+ }
+ if (r == PY_LOCK_INTR && Py_MakePendingCalls() < 0) {
+ return NULL;
+ }
+ if (r == PY_LOCK_FAILURE) {
+ /* Timed out */
+ PyErr_SetNone(EmptyError);
+ return NULL;
+ }
+ self->locked = 1;
+ /* Adjust timeout for next iteration (if any) */
+ if (endtime > 0) {
+ timeout_val = endtime - _PyTime_GetMonotonicClock();
+ microseconds = _PyTime_AsMicroseconds(timeout_val, _PyTime_ROUND_CEILING);
+ }
+ }
+ /* BEGIN GIL-protected critical section */
+ assert(self->lst_pos < PyList_GET_SIZE(self->lst));
+ item = simplequeue_pop_item(self);
+ if (self->locked) {
+ PyThread_release_lock(self->lock);
+ self->locked = 0;
+ }
+ /* END GIL-protected critical section */
+ return item;
+/*[clinic input]
+Remove and return an item from the queue without blocking.
+Only get an item if one is immediately available. Otherwise
+raise the Empty exception.
+[clinic start generated code]*/
+static PyObject *
+_queue_SimpleQueue_get_nowait_impl(simplequeueobject *self)
+/*[clinic end generated code: output=a89731a75dbe4937 input=6fe5102db540a1b9]*/
+ return _queue_SimpleQueue_get_impl(self, 0, Py_None);
+/*[clinic input]
+_queue.SimpleQueue.empty -> bool
+Return True if the queue is empty, False otherwise (not reliable!).
+[clinic start generated code]*/
+static int
+_queue_SimpleQueue_empty_impl(simplequeueobject *self)
+/*[clinic end generated code: output=1a02a1b87c0ef838 input=1a98431c45fd66f9]*/
+ return self->lst_pos == PyList_GET_SIZE(self->lst);
+/*[clinic input]
+_queue.SimpleQueue.qsize -> Py_ssize_t
+Return the approximate size of the queue (not reliable!).
+[clinic start generated code]*/
+static Py_ssize_t
+_queue_SimpleQueue_qsize_impl(simplequeueobject *self)
+/*[clinic end generated code: output=f9dcd9d0a90e121e input=7a74852b407868a1]*/
+ return PyList_GET_SIZE(self->lst) - self->lst_pos;
+#include "clinic/_queuemodule.c.h"
+static PyMethodDef simplequeue_methods[] = {
+ {NULL, NULL} /* sentinel */
+PyTypeObject PySimpleQueueType = {
+ PyVarObject_HEAD_INIT(NULL, 0)
+ "_queue.SimpleQueue", /*tp_name*/
+ sizeof(simplequeueobject), /*tp_size*/
+ 0, /*tp_itemsize*/
+ /* methods */
+ (destructor)simplequeue_dealloc, /*tp_dealloc*/
+ 0, /*tp_print*/
+ 0, /*tp_getattr*/
+ 0, /*tp_setattr*/
+ 0, /*tp_reserved*/
+ 0, /*tp_repr*/
+ 0, /*tp_as_number*/
+ 0, /*tp_as_sequence*/
+ 0, /*tp_as_mapping*/
+ 0, /*tp_hash*/
+ 0, /*tp_call*/
+ 0, /*tp_str*/
+ 0, /*tp_getattro*/
+ 0, /*tp_setattro*/
+ 0, /*tp_as_buffer*/
+ | Py_TPFLAGS_HAVE_GC, /* tp_flags */
+ simplequeue_new__doc__, /*tp_doc*/
+ (traverseproc)simplequeue_traverse, /*tp_traverse*/
+ 0, /*tp_clear*/
+ 0, /*tp_richcompare*/
+ offsetof(simplequeueobject, weakreflist), /*tp_weaklistoffset*/
+ 0, /*tp_iter*/
+ 0, /*tp_iternext*/
+ simplequeue_methods, /*tp_methods*/
+ 0, /* tp_members */
+ 0, /* tp_getset */
+ 0, /* tp_base */
+ 0, /* tp_dict */
+ 0, /* tp_descr_get */
+ 0, /* tp_descr_set */
+ 0, /* tp_dictoffset */
+ 0, /* tp_init */
+ 0, /* tp_alloc */
+ simplequeue_new /* tp_new */
+/* Initialization function */
+"C implementation of the Python queue module.\n\
+This module is an implementation detail, please do not use it directly.");
+static struct PyModuleDef queuemodule = {
+ PyModuleDef_HEAD_INIT,
+ "_queue",
+ queue_module_doc,
+ -1,
+ PyObject *m;
+ /* Create the module */
+ m = PyModule_Create(&queuemodule);
+ if (m == NULL)
+ return NULL;
+ EmptyError = PyErr_NewExceptionWithDoc(
+ "_queue.Empty",
+ "Exception raised by Queue.get(block=0)/get_nowait().",
+ if (EmptyError == NULL)
+ return NULL;
+ Py_INCREF(EmptyError);
+ if (PyModule_AddObject(m, "Empty", EmptyError) < 0)
+ return NULL;
+ if (PyType_Ready(&PySimpleQueueType) < 0)
+ return NULL;
+ Py_INCREF(&PySimpleQueueType);
+ if (PyModule_AddObject(m, "SimpleQueue", (PyObject *)&PySimpleQueueType) < 0)
+ return NULL;
+ return m;
diff --git a/Modules/clinic/_queuemodule.c.h b/Modules/clinic/_queuemodule.c.h
new file mode 100644
index 0000000..97247fd
--- /dev/null
+++ b/Modules/clinic/_queuemodule.c.h
@@ -0,0 +1,218 @@
+/*[clinic input]
+[clinic start generated code]*/
+"Simple, unbounded, reentrant FIFO queue.");
+static PyObject *
+simplequeue_new_impl(PyTypeObject *type);
+static PyObject *
+simplequeue_new(PyTypeObject *type, PyObject *args, PyObject *kwargs)
+ PyObject *return_value = NULL;
+ if ((type == &PySimpleQueueType) &&
+ !_PyArg_NoPositional("SimpleQueue", args)) {
+ goto exit;
+ }
+ if ((type == &PySimpleQueueType) &&
+ !_PyArg_NoKeywords("SimpleQueue", kwargs)) {
+ goto exit;
+ }
+ return_value = simplequeue_new_impl(type);
+ return return_value;
+"put($self, /, item, block=True, timeout=None)\n"
+"Put the item on the queue.\n"
+"The optional \'block\' and \'timeout\' arguments are ignored, as this method\n"
+"never blocks. They are provided for compatibility with the Queue class.");
+ {"put", (PyCFunction)_queue_SimpleQueue_put, METH_FASTCALL|METH_KEYWORDS, _queue_SimpleQueue_put__doc__},
+static PyObject *
+_queue_SimpleQueue_put_impl(simplequeueobject *self, PyObject *item,
+ int block, PyObject *timeout);
+static PyObject *
+_queue_SimpleQueue_put(simplequeueobject *self, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames)
+ PyObject *return_value = NULL;
+ static const char * const _keywords[] = {"item", "block", "timeout", NULL};
+ static _PyArg_Parser _parser = {"O|pO:put", _keywords, 0};
+ PyObject *item;
+ int block = 1;
+ PyObject *timeout = Py_None;
+ if (!_PyArg_ParseStackAndKeywords(args, nargs, kwnames, &_parser,
+ &item, &block, &timeout)) {
+ goto exit;
+ }
+ return_value = _queue_SimpleQueue_put_impl(self, item, block, timeout);
+ return return_value;
+"put_nowait($self, /, item)\n"
+"Put an item into the queue without blocking.\n"
+"This is exactly equivalent to `put(item)` and is only provided\n"
+"for compatibility with the Queue class.");
+ {"put_nowait", (PyCFunction)_queue_SimpleQueue_put_nowait, METH_FASTCALL|METH_KEYWORDS, _queue_SimpleQueue_put_nowait__doc__},
+static PyObject *
+_queue_SimpleQueue_put_nowait_impl(simplequeueobject *self, PyObject *item);
+static PyObject *
+_queue_SimpleQueue_put_nowait(simplequeueobject *self, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames)
+ PyObject *return_value = NULL;
+ static const char * const _keywords[] = {"item", NULL};
+ static _PyArg_Parser _parser = {"O:put_nowait", _keywords, 0};
+ PyObject *item;
+ if (!_PyArg_ParseStackAndKeywords(args, nargs, kwnames, &_parser,
+ &item)) {
+ goto exit;
+ }
+ return_value = _queue_SimpleQueue_put_nowait_impl(self, item);
+ return return_value;
+"get($self, /, block=True, timeout=None)\n"
+"Remove and return an item from the queue.\n"
+"If optional args \'block\' is true and \'timeout\' is None (the default),\n"
+"block if necessary until an item is available. If \'timeout\' is\n"
+"a non-negative number, it blocks at most \'timeout\' seconds and raises\n"
+"the Empty exception if no item was available within that time.\n"
+"Otherwise (\'block\' is false), return an item if one is immediately\n"
+"available, else raise the Empty exception (\'timeout\' is ignored\n"
+"in that case).");
+ {"get", (PyCFunction)_queue_SimpleQueue_get, METH_FASTCALL|METH_KEYWORDS, _queue_SimpleQueue_get__doc__},
+static PyObject *
+_queue_SimpleQueue_get_impl(simplequeueobject *self, int block,
+ PyObject *timeout);
+static PyObject *
+_queue_SimpleQueue_get(simplequeueobject *self, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames)
+ PyObject *return_value = NULL;
+ static const char * const _keywords[] = {"block", "timeout", NULL};
+ static _PyArg_Parser _parser = {"|pO:get", _keywords, 0};
+ int block = 1;
+ PyObject *timeout = Py_None;
+ if (!_PyArg_ParseStackAndKeywords(args, nargs, kwnames, &_parser,
+ &block, &timeout)) {
+ goto exit;
+ }
+ return_value = _queue_SimpleQueue_get_impl(self, block, timeout);
+ return return_value;
+"get_nowait($self, /)\n"
+"Remove and return an item from the queue without blocking.\n"
+"Only get an item if one is immediately available. Otherwise\n"
+"raise the Empty exception.");
+ {"get_nowait", (PyCFunction)_queue_SimpleQueue_get_nowait, METH_NOARGS, _queue_SimpleQueue_get_nowait__doc__},
+static PyObject *
+_queue_SimpleQueue_get_nowait_impl(simplequeueobject *self);
+static PyObject *
+_queue_SimpleQueue_get_nowait(simplequeueobject *self, PyObject *Py_UNUSED(ignored))
+ return _queue_SimpleQueue_get_nowait_impl(self);
+"empty($self, /)\n"
+"Return True if the queue is empty, False otherwise (not reliable!).");
+ {"empty", (PyCFunction)_queue_SimpleQueue_empty, METH_NOARGS, _queue_SimpleQueue_empty__doc__},
+static int
+_queue_SimpleQueue_empty_impl(simplequeueobject *self);
+static PyObject *
+_queue_SimpleQueue_empty(simplequeueobject *self, PyObject *Py_UNUSED(ignored))
+ PyObject *return_value = NULL;
+ int _return_value;
+ _return_value = _queue_SimpleQueue_empty_impl(self);
+ if ((_return_value == -1) && PyErr_Occurred()) {
+ goto exit;
+ }
+ return_value = PyBool_FromLong((long)_return_value);
+ return return_value;
+"qsize($self, /)\n"
+"Return the approximate size of the queue (not reliable!).");
+ {"qsize", (PyCFunction)_queue_SimpleQueue_qsize, METH_NOARGS, _queue_SimpleQueue_qsize__doc__},
+static Py_ssize_t
+_queue_SimpleQueue_qsize_impl(simplequeueobject *self);
+static PyObject *
+_queue_SimpleQueue_qsize(simplequeueobject *self, PyObject *Py_UNUSED(ignored))
+ PyObject *return_value = NULL;
+ Py_ssize_t _return_value;
+ _return_value = _queue_SimpleQueue_qsize_impl(self);
+ if ((_return_value == -1) && PyErr_Occurred()) {
+ goto exit;
+ }
+ return_value = PyLong_FromSsize_t(_return_value);
+ return return_value;
+/*[clinic end generated code: output=8badc3bb85263689 input=a9049054013a1b77]*/