summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPieter Eendebak <pieter.eendebak@gmail.com>2025-03-12 10:00:33 (GMT)
committerGitHub <noreply@github.com>2025-03-12 10:00:33 (GMT)
commit405a2d74cbdef5a899c900b6897ec85fe465abd2 (patch)
tree667200f25c9f3e501fb24fbbb5c52927e1fafff8
parent155c44b9015089eacc6e2ace449391c12bfb8b8d (diff)
downloadcpython-405a2d74cbdef5a899c900b6897ec85fe465abd2.zip
cpython-405a2d74cbdef5a899c900b6897ec85fe465abd2.tar.gz
cpython-405a2d74cbdef5a899c900b6897ec85fe465abd2.tar.bz2
gh-123471: make `itertools.batched` thread-safe (#129416)
-rw-r--r--Lib/test/test_free_threading/test_itertools_batched.py39
-rw-r--r--Misc/NEWS.d/next/Library/2025-02-19-08-06-37.gh-issue-123471.br7uyR.rst1
-rw-r--r--Modules/itertoolsmodule.c13
3 files changed, 51 insertions, 2 deletions
diff --git a/Lib/test/test_free_threading/test_itertools_batched.py b/Lib/test/test_free_threading/test_itertools_batched.py
new file mode 100644
index 0000000..fa9e06b
--- /dev/null
+++ b/Lib/test/test_free_threading/test_itertools_batched.py
@@ -0,0 +1,39 @@
+import unittest
+import sys
+from threading import Thread, Barrier
+from itertools import batched
+from test.support import threading_helper
+
+
+threading_helper.requires_working_threading(module=True)
+
+class EnumerateThreading(unittest.TestCase):
+
+ @threading_helper.reap_threads
+ def test_threading(self):
+ number_of_threads = 10
+ number_of_iterations = 20
+ barrier = Barrier(number_of_threads)
+ def work(it):
+ barrier.wait()
+ while True:
+ try:
+ _ = next(it)
+ except StopIteration:
+ break
+
+ data = tuple(range(1000))
+ for it in range(number_of_iterations):
+ batch_iterator = batched(data, 2)
+ worker_threads = []
+ for ii in range(number_of_threads):
+ worker_threads.append(
+ Thread(target=work, args=[batch_iterator]))
+
+ with threading_helper.start_threads(worker_threads):
+ pass
+
+ barrier.reset()
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/Misc/NEWS.d/next/Library/2025-02-19-08-06-37.gh-issue-123471.br7uyR.rst b/Misc/NEWS.d/next/Library/2025-02-19-08-06-37.gh-issue-123471.br7uyR.rst
new file mode 100644
index 0000000..f34d0bc
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2025-02-19-08-06-37.gh-issue-123471.br7uyR.rst
@@ -0,0 +1 @@
+Make concurrent iterations over :class:`itertools.batched` safe under free-threading.
diff --git a/Modules/itertoolsmodule.c b/Modules/itertoolsmodule.c
index 3e425ee..40e436a6 100644
--- a/Modules/itertoolsmodule.c
+++ b/Modules/itertoolsmodule.c
@@ -191,12 +191,12 @@ batched_next(PyObject *op)
{
batchedobject *bo = batchedobject_CAST(op);
Py_ssize_t i;
- Py_ssize_t n = bo->batch_size;
+ Py_ssize_t n = FT_ATOMIC_LOAD_SSIZE_RELAXED(bo->batch_size);
PyObject *it = bo->it;
PyObject *item;
PyObject *result;
- if (it == NULL) {
+ if (n < 0) {
return NULL;
}
result = PyTuple_New(n);
@@ -218,19 +218,28 @@ batched_next(PyObject *op)
if (PyErr_Occurred()) {
if (!PyErr_ExceptionMatches(PyExc_StopIteration)) {
/* Input raised an exception other than StopIteration */
+ FT_ATOMIC_STORE_SSIZE_RELAXED(bo->batch_size, -1);
+#ifndef Py_GIL_DISABLED
Py_CLEAR(bo->it);
+#endif
Py_DECREF(result);
return NULL;
}
PyErr_Clear();
}
if (i == 0) {
+ FT_ATOMIC_STORE_SSIZE_RELAXED(bo->batch_size, -1);
+#ifndef Py_GIL_DISABLED
Py_CLEAR(bo->it);
+#endif
Py_DECREF(result);
return NULL;
}
if (bo->strict) {
+ FT_ATOMIC_STORE_SSIZE_RELAXED(bo->batch_size, -1);
+#ifndef Py_GIL_DISABLED
Py_CLEAR(bo->it);
+#endif
Py_DECREF(result);
PyErr_SetString(PyExc_ValueError, "batched(): incomplete batch");
return NULL;