summaryrefslogtreecommitdiffstats
path: root/Modules
diff options
context:
space:
mode:
authormpage <mpage@meta.com>2024-01-23 19:25:41 (GMT)
committerGitHub <noreply@github.com>2024-01-23 19:25:41 (GMT)
commit925907ea362c4c014086be48625ac7dd67645cfc (patch)
tree2c6ad103f5296532d8cd8e0bcdb66623a3ad636d /Modules
parent441affc9e7f419ef0b68f734505fa2f79fe653c7 (diff)
downloadcpython-925907ea362c4c014086be48625ac7dd67645cfc.zip
cpython-925907ea362c4c014086be48625ac7dd67645cfc.tar.gz
cpython-925907ea362c4c014086be48625ac7dd67645cfc.tar.bz2
gh-113884: Make queue.SimpleQueue thread-safe when the GIL is disabled (#114161)
* use the ParkingLot API to manage waiting threads * use Argument Clinic's critical section directive to protect queue methods * remove unnecessary overflow check Co-authored-by: Erlend E. Aasland <erlend.aasland@protonmail.com>
Diffstat (limited to 'Modules')
-rw-r--r--Modules/_queuemodule.c202
-rw-r--r--Modules/clinic/_queuemodule.c.h24
2 files changed, 136 insertions, 90 deletions
diff --git a/Modules/_queuemodule.c b/Modules/_queuemodule.c
index 8fca3cd..18b2485 100644
--- a/Modules/_queuemodule.c
+++ b/Modules/_queuemodule.c
@@ -3,8 +3,9 @@
#endif
#include "Python.h"
-#include "pycore_ceval.h" // _PyEval_MakePendingCalls()
+#include "pycore_ceval.h" // Py_MakePendingCalls()
#include "pycore_moduleobject.h" // _PyModule_GetState()
+#include "pycore_parking_lot.h"
#include "pycore_time.h" // _PyTime_t
#include <stdbool.h>
@@ -151,7 +152,9 @@ RingBuf_Get(RingBuf *buf)
return item;
}
-// Returns 0 on success or -1 if the buffer failed to grow
+// Returns 0 on success or -1 if the buffer failed to grow.
+//
+// Steals a reference to item.
static int
RingBuf_Put(RingBuf *buf, PyObject *item)
{
@@ -164,7 +167,7 @@ RingBuf_Put(RingBuf *buf, PyObject *item)
return -1;
}
}
- buf->items[buf->put_idx] = Py_NewRef(item);
+ buf->items[buf->put_idx] = item;
buf->put_idx = (buf->put_idx + 1) % buf->items_cap;
buf->num_items++;
return 0;
@@ -184,9 +187,13 @@ RingBuf_IsEmpty(RingBuf *buf)
typedef struct {
PyObject_HEAD
- PyThread_type_lock lock;
- int locked;
+
+ // Are there threads waiting for items
+ bool has_threads_waiting;
+
+ // Items in the queue
RingBuf buf;
+
PyObject *weakreflist;
} simplequeueobject;
@@ -209,12 +216,6 @@ simplequeue_dealloc(simplequeueobject *self)
PyTypeObject *tp = Py_TYPE(self);
PyObject_GC_UnTrack(self);
- if (self->lock != NULL) {
- /* Unlock the lock so it's safe to free it */
- if (self->locked > 0)
- PyThread_release_lock(self->lock);
- PyThread_free_lock(self->lock);
- }
(void)simplequeue_clear(self);
if (self->weakreflist != NULL)
PyObject_ClearWeakRefs((PyObject *) self);
@@ -249,12 +250,6 @@ simplequeue_new_impl(PyTypeObject *type)
self = (simplequeueobject *) type->tp_alloc(type, 0);
if (self != NULL) {
self->weakreflist = NULL;
- self->lock = PyThread_allocate_lock();
- if (self->lock == NULL) {
- Py_DECREF(self);
- PyErr_SetString(PyExc_MemoryError, "can't allocate lock");
- return NULL;
- }
if (RingBuf_Init(&self->buf) < 0) {
Py_DECREF(self);
return NULL;
@@ -264,7 +259,29 @@ simplequeue_new_impl(PyTypeObject *type)
return (PyObject *) self;
}
+typedef struct {
+ bool handed_off;
+ simplequeueobject *queue;
+ PyObject *item;
+} HandoffData;
+
+static void
+maybe_handoff_item(HandoffData *data, PyObject **item, int has_more_waiters)
+{
+ if (item == NULL) {
+ // No threads were waiting
+ data->handed_off = false;
+ }
+ else {
+ // There was at least one waiting thread, hand off the item
+ *item = data->item;
+ data->handed_off = true;
+ }
+ data->queue->has_threads_waiting = has_more_waiters;
+}
+
/*[clinic input]
+@critical_section
_queue.SimpleQueue.put
item: object
block: bool = True
@@ -280,21 +297,28 @@ never blocks. They are provided for compatibility with the Queue class.
static PyObject *
_queue_SimpleQueue_put_impl(simplequeueobject *self, PyObject *item,
int block, PyObject *timeout)
-/*[clinic end generated code: output=4333136e88f90d8b input=6e601fa707a782d5]*/
+/*[clinic end generated code: output=4333136e88f90d8b input=a16dbb33363c0fa8]*/
{
- /* BEGIN GIL-protected critical section */
- if (RingBuf_Put(&self->buf, item) < 0)
- return NULL;
- if (self->locked) {
- /* A get() may be waiting, wake it up */
- self->locked = 0;
- PyThread_release_lock(self->lock);
+ HandoffData data = {
+ .handed_off = 0,
+ .item = Py_NewRef(item),
+ .queue = self,
+ };
+ if (self->has_threads_waiting) {
+ // Try to hand the item off directly if there are threads waiting
+ _PyParkingLot_Unpark(&self->has_threads_waiting,
+ (_Py_unpark_fn_t *)maybe_handoff_item, &data);
+ }
+ if (!data.handed_off) {
+ if (RingBuf_Put(&self->buf, item) < 0) {
+ return NULL;
+ }
}
- /* END GIL-protected critical section */
Py_RETURN_NONE;
}
/*[clinic input]
+@critical_section
_queue.SimpleQueue.put_nowait
item: object
@@ -307,12 +331,23 @@ for compatibility with the Queue class.
static PyObject *
_queue_SimpleQueue_put_nowait_impl(simplequeueobject *self, PyObject *item)
-/*[clinic end generated code: output=0990536715efb1f1 input=36b1ea96756b2ece]*/
+/*[clinic end generated code: output=0990536715efb1f1 input=ce949cc2cd8a4119]*/
{
return _queue_SimpleQueue_put_impl(self, item, 0, Py_None);
}
+static PyObject *
+empty_error(PyTypeObject *cls)
+{
+ PyObject *module = PyType_GetModule(cls);
+ assert(module != NULL);
+ simplequeue_state *state = simplequeue_get_state(module);
+ PyErr_SetNone(state->EmptyError);
+ return NULL;
+}
+
/*[clinic input]
+@critical_section
_queue.SimpleQueue.get
cls: defining_class
@@ -335,23 +370,15 @@ in that case).
static PyObject *
_queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls,
int block, PyObject *timeout_obj)
-/*[clinic end generated code: output=5c2cca914cd1e55b input=5b4047bfbc645ec1]*/
+/*[clinic end generated code: output=5c2cca914cd1e55b input=f7836c65e5839c51]*/
{
_PyTime_t endtime = 0;
- _PyTime_t timeout;
- PyObject *item;
- PyLockStatus r;
- PY_TIMEOUT_T microseconds;
- PyThreadState *tstate = PyThreadState_Get();
// XXX Use PyThread_ParseTimeoutArg().
- if (block == 0) {
- /* Non-blocking */
- microseconds = 0;
- }
- else if (timeout_obj != Py_None) {
+ if (block != 0 && !Py_IsNone(timeout_obj)) {
/* With timeout */
+ _PyTime_t timeout;
if (_PyTime_FromSecondsObject(&timeout,
timeout_obj, _PyTime_ROUND_CEILING) < 0) {
return NULL;
@@ -361,65 +388,64 @@ _queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls,
"'timeout' must be a non-negative number");
return NULL;
}
- microseconds = _PyTime_AsMicroseconds(timeout,
- _PyTime_ROUND_CEILING);
- if (microseconds > PY_TIMEOUT_MAX) {
- PyErr_SetString(PyExc_OverflowError,
- "timeout value is too large");
- return NULL;
- }
endtime = _PyDeadline_Init(timeout);
}
- else {
- /* Infinitely blocking */
- microseconds = -1;
- }
- /* put() signals the queue to be non-empty by releasing the lock.
- * So we simply try to acquire the lock in a loop, until the condition
- * (queue non-empty) becomes true.
- */
- while (RingBuf_IsEmpty(&self->buf)) {
- /* First a simple non-blocking try without releasing the GIL */
- r = PyThread_acquire_lock_timed(self->lock, 0, 0);
- if (r == PY_LOCK_FAILURE && microseconds != 0) {
- Py_BEGIN_ALLOW_THREADS
- r = PyThread_acquire_lock_timed(self->lock, microseconds, 1);
- Py_END_ALLOW_THREADS
+ for (;;) {
+ if (!RingBuf_IsEmpty(&self->buf)) {
+ return RingBuf_Get(&self->buf);
}
- if (r == PY_LOCK_INTR && _PyEval_MakePendingCalls(tstate) < 0) {
- return NULL;
- }
- if (r == PY_LOCK_FAILURE) {
- PyObject *module = PyType_GetModule(cls);
- simplequeue_state *state = simplequeue_get_state(module);
- /* Timed out */
- PyErr_SetNone(state->EmptyError);
- return NULL;
+ if (!block) {
+ return empty_error(cls);
}
- self->locked = 1;
- /* Adjust timeout for next iteration (if any) */
- if (microseconds > 0) {
- timeout = _PyDeadline_Get(endtime);
- microseconds = _PyTime_AsMicroseconds(timeout,
- _PyTime_ROUND_CEILING);
+ int64_t timeout_ns = -1;
+ if (endtime != 0) {
+ timeout_ns = _PyDeadline_Get(endtime);
+ if (timeout_ns < 0) {
+ return empty_error(cls);
+ }
}
- }
- /* BEGIN GIL-protected critical section */
- item = RingBuf_Get(&self->buf);
- if (self->locked) {
- PyThread_release_lock(self->lock);
- self->locked = 0;
+ bool waiting = 1;
+ self->has_threads_waiting = waiting;
+
+ PyObject *item = NULL;
+ int st = _PyParkingLot_Park(&self->has_threads_waiting, &waiting,
+ sizeof(bool), timeout_ns, &item,
+ /* detach */ 1);
+ switch (st) {
+ case Py_PARK_OK: {
+ assert(item != NULL);
+ return item;
+ }
+ case Py_PARK_TIMEOUT: {
+ return empty_error(cls);
+ }
+ case Py_PARK_INTR: {
+ // Interrupted
+ if (Py_MakePendingCalls() < 0) {
+ return NULL;
+ }
+ break;
+ }
+ case Py_PARK_AGAIN: {
+ // This should be impossible with the current implementation of
+ // PyParkingLot, but would be possible if critical sections /
+ // the GIL were released before the thread was added to the
+ // internal thread queue in the parking lot.
+ break;
+ }
+ default: {
+ Py_UNREACHABLE();
+ }
+ }
}
- /* END GIL-protected critical section */
-
- return item;
}
/*[clinic input]
+@critical_section
_queue.SimpleQueue.get_nowait
cls: defining_class
@@ -434,12 +460,13 @@ raise the Empty exception.
static PyObject *
_queue_SimpleQueue_get_nowait_impl(simplequeueobject *self,
PyTypeObject *cls)
-/*[clinic end generated code: output=620c58e2750f8b8a input=842f732bf04216d3]*/
+/*[clinic end generated code: output=620c58e2750f8b8a input=d48be63633fefae9]*/
{
return _queue_SimpleQueue_get_impl(self, cls, 0, Py_None);
}
/*[clinic input]
+@critical_section
_queue.SimpleQueue.empty -> bool
Return True if the queue is empty, False otherwise (not reliable!).
@@ -447,12 +474,13 @@ Return True if the queue is empty, False otherwise (not reliable!).
static int
_queue_SimpleQueue_empty_impl(simplequeueobject *self)
-/*[clinic end generated code: output=1a02a1b87c0ef838 input=1a98431c45fd66f9]*/
+/*[clinic end generated code: output=1a02a1b87c0ef838 input=96cb22df5a67d831]*/
{
return RingBuf_IsEmpty(&self->buf);
}
/*[clinic input]
+@critical_section
_queue.SimpleQueue.qsize -> Py_ssize_t
Return the approximate size of the queue (not reliable!).
@@ -460,7 +488,7 @@ Return the approximate size of the queue (not reliable!).
static Py_ssize_t
_queue_SimpleQueue_qsize_impl(simplequeueobject *self)
-/*[clinic end generated code: output=f9dcd9d0a90e121e input=7a74852b407868a1]*/
+/*[clinic end generated code: output=f9dcd9d0a90e121e input=e218623cb8c16a79]*/
{
return RingBuf_Len(&self->buf);
}
diff --git a/Modules/clinic/_queuemodule.c.h b/Modules/clinic/_queuemodule.c.h
index 8e2a430..b3b6b8e 100644
--- a/Modules/clinic/_queuemodule.c.h
+++ b/Modules/clinic/_queuemodule.c.h
@@ -6,6 +6,7 @@ preserve
# include "pycore_gc.h" // PyGC_Head
# include "pycore_runtime.h" // _Py_ID()
#endif
+#include "pycore_critical_section.h"// Py_BEGIN_CRITICAL_SECTION()
#include "pycore_modsupport.h" // _PyArg_NoKeywords()
PyDoc_STRVAR(simplequeue_new__doc__,
@@ -107,7 +108,9 @@ _queue_SimpleQueue_put(simplequeueobject *self, PyObject *const *args, Py_ssize_
}
timeout = args[2];
skip_optional_pos:
+ Py_BEGIN_CRITICAL_SECTION(self);
return_value = _queue_SimpleQueue_put_impl(self, item, block, timeout);
+ Py_END_CRITICAL_SECTION();
exit:
return return_value;
@@ -165,7 +168,9 @@ _queue_SimpleQueue_put_nowait(simplequeueobject *self, PyObject *const *args, Py
goto exit;
}
item = args[0];
+ Py_BEGIN_CRITICAL_SECTION(self);
return_value = _queue_SimpleQueue_put_nowait_impl(self, item);
+ Py_END_CRITICAL_SECTION();
exit:
return return_value;
@@ -244,7 +249,9 @@ _queue_SimpleQueue_get(simplequeueobject *self, PyTypeObject *cls, PyObject *con
}
timeout_obj = args[1];
skip_optional_pos:
+ Py_BEGIN_CRITICAL_SECTION(self);
return_value = _queue_SimpleQueue_get_impl(self, cls, block, timeout_obj);
+ Py_END_CRITICAL_SECTION();
exit:
return return_value;
@@ -269,11 +276,18 @@ _queue_SimpleQueue_get_nowait_impl(simplequeueobject *self,
static PyObject *
_queue_SimpleQueue_get_nowait(simplequeueobject *self, PyTypeObject *cls, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames)
{
+ PyObject *return_value = NULL;
+
if (nargs) {
PyErr_SetString(PyExc_TypeError, "get_nowait() takes no arguments");
- return NULL;
+ goto exit;
}
- return _queue_SimpleQueue_get_nowait_impl(self, cls);
+ Py_BEGIN_CRITICAL_SECTION(self);
+ return_value = _queue_SimpleQueue_get_nowait_impl(self, cls);
+ Py_END_CRITICAL_SECTION();
+
+exit:
+ return return_value;
}
PyDoc_STRVAR(_queue_SimpleQueue_empty__doc__,
@@ -294,7 +308,9 @@ _queue_SimpleQueue_empty(simplequeueobject *self, PyObject *Py_UNUSED(ignored))
PyObject *return_value = NULL;
int _return_value;
+ Py_BEGIN_CRITICAL_SECTION(self);
_return_value = _queue_SimpleQueue_empty_impl(self);
+ Py_END_CRITICAL_SECTION();
if ((_return_value == -1) && PyErr_Occurred()) {
goto exit;
}
@@ -322,7 +338,9 @@ _queue_SimpleQueue_qsize(simplequeueobject *self, PyObject *Py_UNUSED(ignored))
PyObject *return_value = NULL;
Py_ssize_t _return_value;
+ Py_BEGIN_CRITICAL_SECTION(self);
_return_value = _queue_SimpleQueue_qsize_impl(self);
+ Py_END_CRITICAL_SECTION();
if ((_return_value == -1) && PyErr_Occurred()) {
goto exit;
}
@@ -331,4 +349,4 @@ _queue_SimpleQueue_qsize(simplequeueobject *self, PyObject *Py_UNUSED(ignored))
exit:
return return_value;
}
-/*[clinic end generated code: output=457310b20cb61cf8 input=a9049054013a1b77]*/
+/*[clinic end generated code: output=242950edc8f7dfd7 input=a9049054013a1b77]*/