diff options
author | Pieter Eendebak <pieter.eendebak@gmail.com> | 2025-03-12 10:00:33 (GMT) |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-03-12 10:00:33 (GMT) |
commit | 405a2d74cbdef5a899c900b6897ec85fe465abd2 (patch) | |
tree | 667200f25c9f3e501fb24fbbb5c52927e1fafff8 | |
parent | 155c44b9015089eacc6e2ace449391c12bfb8b8d (diff) | |
download | cpython-405a2d74cbdef5a899c900b6897ec85fe465abd2.zip cpython-405a2d74cbdef5a899c900b6897ec85fe465abd2.tar.gz cpython-405a2d74cbdef5a899c900b6897ec85fe465abd2.tar.bz2 |
gh-123471: make `itertools.batched` thread-safe (#129416)
-rw-r--r-- | Lib/test/test_free_threading/test_itertools_batched.py | 39 | ||||
-rw-r--r-- | Misc/NEWS.d/next/Library/2025-02-19-08-06-37.gh-issue-123471.br7uyR.rst | 1 | ||||
-rw-r--r-- | Modules/itertoolsmodule.c | 13 |
3 files changed, 51 insertions, 2 deletions
diff --git a/Lib/test/test_free_threading/test_itertools_batched.py b/Lib/test/test_free_threading/test_itertools_batched.py new file mode 100644 index 0000000..fa9e06b --- /dev/null +++ b/Lib/test/test_free_threading/test_itertools_batched.py @@ -0,0 +1,39 @@ +import unittest +import sys +from threading import Thread, Barrier +from itertools import batched +from test.support import threading_helper + + +threading_helper.requires_working_threading(module=True) + +class EnumerateThreading(unittest.TestCase): + + @threading_helper.reap_threads + def test_threading(self): + number_of_threads = 10 + number_of_iterations = 20 + barrier = Barrier(number_of_threads) + def work(it): + barrier.wait() + while True: + try: + _ = next(it) + except StopIteration: + break + + data = tuple(range(1000)) + for it in range(number_of_iterations): + batch_iterator = batched(data, 2) + worker_threads = [] + for ii in range(number_of_threads): + worker_threads.append( + Thread(target=work, args=[batch_iterator])) + + with threading_helper.start_threads(worker_threads): + pass + + barrier.reset() + +if __name__ == "__main__": + unittest.main() diff --git a/Misc/NEWS.d/next/Library/2025-02-19-08-06-37.gh-issue-123471.br7uyR.rst b/Misc/NEWS.d/next/Library/2025-02-19-08-06-37.gh-issue-123471.br7uyR.rst new file mode 100644 index 0000000..f34d0bc --- /dev/null +++ b/Misc/NEWS.d/next/Library/2025-02-19-08-06-37.gh-issue-123471.br7uyR.rst @@ -0,0 +1 @@ +Make concurrent iterations over :class:`itertools.batched` safe under free-threading. diff --git a/Modules/itertoolsmodule.c b/Modules/itertoolsmodule.c index 3e425ee..40e436a6 100644 --- a/Modules/itertoolsmodule.c +++ b/Modules/itertoolsmodule.c @@ -191,12 +191,12 @@ batched_next(PyObject *op) { batchedobject *bo = batchedobject_CAST(op); Py_ssize_t i; - Py_ssize_t n = bo->batch_size; + Py_ssize_t n = FT_ATOMIC_LOAD_SSIZE_RELAXED(bo->batch_size); PyObject *it = bo->it; PyObject *item; PyObject *result; - if (it == NULL) { + if (n < 0) { return NULL; } result = PyTuple_New(n); @@ -218,19 +218,28 @@ batched_next(PyObject *op) if (PyErr_Occurred()) { if (!PyErr_ExceptionMatches(PyExc_StopIteration)) { /* Input raised an exception other than StopIteration */ + FT_ATOMIC_STORE_SSIZE_RELAXED(bo->batch_size, -1); +#ifndef Py_GIL_DISABLED Py_CLEAR(bo->it); +#endif Py_DECREF(result); return NULL; } PyErr_Clear(); } if (i == 0) { + FT_ATOMIC_STORE_SSIZE_RELAXED(bo->batch_size, -1); +#ifndef Py_GIL_DISABLED Py_CLEAR(bo->it); +#endif Py_DECREF(result); return NULL; } if (bo->strict) { + FT_ATOMIC_STORE_SSIZE_RELAXED(bo->batch_size, -1); +#ifndef Py_GIL_DISABLED Py_CLEAR(bo->it); +#endif Py_DECREF(result); PyErr_SetString(PyExc_ValueError, "batched(): incomplete batch"); return NULL; |