summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEric Snow <ericsnowcurrently@gmail.com>2024-10-16 22:50:46 (GMT)
committerGitHub <noreply@github.com>2024-10-16 22:50:46 (GMT)
commita5a7f5e16d8c3938d266703ea8fba8ffee3e3ae5 (patch)
tree8f35555767598b877e490814eca9363cdf5eea2d
parenta38fef4439139743e3334c1d69f24cafdf4d71da (diff)
downloadcpython-a5a7f5e16d8c3938d266703ea8fba8ffee3e3ae5.zip
cpython-a5a7f5e16d8c3938d266703ea8fba8ffee3e3ae5.tar.gz
cpython-a5a7f5e16d8c3938d266703ea8fba8ffee3e3ae5.tar.bz2
gh-124694: Add concurrent.futures.InterpreterPoolExecutor (gh-124548)
This is an implementation of InterpreterPoolExecutor that builds on ThreadPoolExecutor. (Note that this is not tied to PEP 734, which is strictly about adding a new stdlib module.) Possible future improvements: * support passing a script for the initializer or to submit() * support passing (most) arbitrary functions without pickling * support passing closures * optionally exec functions against __main__ instead of the their original module
-rw-r--r--Doc/library/asyncio-dev.rst6
-rw-r--r--Doc/library/asyncio-eventloop.rst9
-rw-r--r--Doc/library/asyncio-llapi-index.rst2
-rw-r--r--Doc/library/concurrent.futures.rst135
-rw-r--r--Doc/whatsnew/3.14.rst8
-rw-r--r--Lib/concurrent/futures/__init__.py12
-rw-r--r--Lib/concurrent/futures/interpreter.py241
-rw-r--r--Lib/concurrent/futures/thread.py90
-rw-r--r--Lib/test/test_concurrent_futures/executor.py4
-rw-r--r--Lib/test/test_concurrent_futures/test_interpreter_pool.py346
-rw-r--r--Lib/test/test_concurrent_futures/util.py5
-rw-r--r--Misc/NEWS.d/next/Library/2024-09-27-15-42-55.gh-issue-124694.uUy32y.rst6
12 files changed, 826 insertions, 38 deletions
diff --git a/Doc/library/asyncio-dev.rst b/Doc/library/asyncio-dev.rst
index a9c3a01..44b507a 100644
--- a/Doc/library/asyncio-dev.rst
+++ b/Doc/library/asyncio-dev.rst
@@ -103,7 +103,8 @@ To handle signals the event loop must be
run in the main thread.
The :meth:`loop.run_in_executor` method can be used with a
-:class:`concurrent.futures.ThreadPoolExecutor` to execute
+:class:`concurrent.futures.ThreadPoolExecutor` or
+:class:`~concurrent.futures.InterpreterPoolExecutor` to execute
blocking code in a different OS thread without blocking the OS thread
that the event loop runs in.
@@ -128,7 +129,8 @@ if a function performs a CPU-intensive calculation for 1 second,
all concurrent asyncio Tasks and IO operations would be delayed
by 1 second.
-An executor can be used to run a task in a different thread or even in
+An executor can be used to run a task in a different thread,
+including in a different interpreter, or even in
a different process to avoid blocking the OS thread with the
event loop. See the :meth:`loop.run_in_executor` method for more
details.
diff --git a/Doc/library/asyncio-eventloop.rst b/Doc/library/asyncio-eventloop.rst
index 943683f..14fd153 100644
--- a/Doc/library/asyncio-eventloop.rst
+++ b/Doc/library/asyncio-eventloop.rst
@@ -1305,6 +1305,12 @@ Executing code in thread or process pools
pool, cpu_bound)
print('custom process pool', result)
+ # 4. Run in a custom interpreter pool:
+ with concurrent.futures.InterpreterPoolExecutor() as pool:
+ result = await loop.run_in_executor(
+ pool, cpu_bound)
+ print('custom interpreter pool', result)
+
if __name__ == '__main__':
asyncio.run(main())
@@ -1329,7 +1335,8 @@ Executing code in thread or process pools
Set *executor* as the default executor used by :meth:`run_in_executor`.
*executor* must be an instance of
- :class:`~concurrent.futures.ThreadPoolExecutor`.
+ :class:`~concurrent.futures.ThreadPoolExecutor`, which includes
+ :class:`~concurrent.futures.InterpreterPoolExecutor`.
.. versionchanged:: 3.11
*executor* must be an instance of
diff --git a/Doc/library/asyncio-llapi-index.rst b/Doc/library/asyncio-llapi-index.rst
index 3e21054..f5af888 100644
--- a/Doc/library/asyncio-llapi-index.rst
+++ b/Doc/library/asyncio-llapi-index.rst
@@ -96,7 +96,7 @@ See also the main documentation section about the
- Invoke a callback *at* the given time.
-.. rubric:: Thread/Process Pool
+.. rubric:: Thread/Interpreter/Process Pool
.. list-table::
:widths: 50 50
:class: full-width-table
diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst
index ce72127..45a7370 100644
--- a/Doc/library/concurrent.futures.rst
+++ b/Doc/library/concurrent.futures.rst
@@ -15,9 +15,10 @@ The :mod:`concurrent.futures` module provides a high-level interface for
asynchronously executing callables.
The asynchronous execution can be performed with threads, using
-:class:`ThreadPoolExecutor`, or separate processes, using
-:class:`ProcessPoolExecutor`. Both implement the same interface, which is
-defined by the abstract :class:`Executor` class.
+:class:`ThreadPoolExecutor` or :class:`InterpreterPoolExecutor`,
+or separate processes, using :class:`ProcessPoolExecutor`.
+Each implements the same interface, which is defined
+by the abstract :class:`Executor` class.
.. include:: ../includes/wasm-notavail.rst
@@ -63,7 +64,8 @@ Executor Objects
setting *chunksize* to a positive integer. For very long iterables,
using a large value for *chunksize* can significantly improve
performance compared to the default size of 1. With
- :class:`ThreadPoolExecutor`, *chunksize* has no effect.
+ :class:`ThreadPoolExecutor` and :class:`InterpreterPoolExecutor`,
+ *chunksize* has no effect.
.. versionchanged:: 3.5
Added the *chunksize* argument.
@@ -227,6 +229,111 @@ ThreadPoolExecutor Example
print('%r page is %d bytes' % (url, len(data)))
+InterpreterPoolExecutor
+-----------------------
+
+The :class:`InterpreterPoolExecutor` class uses a pool of interpreters
+to execute calls asynchronously. It is a :class:`ThreadPoolExecutor`
+subclass, which means each worker is running in its own thread.
+The difference here is that each worker has its own interpreter,
+and runs each task using that interpreter.
+
+The biggest benefit to using interpreters instead of only threads
+is true multi-core parallelism. Each interpreter has its own
+:term:`Global Interpreter Lock <global interpreter lock>`, so code
+running in one interpreter can run on one CPU core, while code in
+another interpreter runs unblocked on a different core.
+
+The tradeoff is that writing concurrent code for use with multiple
+interpreters can take extra effort. However, this is because it
+forces you to be deliberate about how and when interpreters interact,
+and to be explicit about what data is shared between interpreters.
+This results in several benefits that help balance the extra effort,
+including true multi-core parallelism, For example, code written
+this way can make it easier to reason about concurrency. Another
+major benefit is that you don't have to deal with several of the
+big pain points of using threads, like nrace conditions.
+
+Each worker's interpreter is isolated from all the other interpreters.
+"Isolated" means each interpreter has its own runtime state and
+operates completely independently. For example, if you redirect
+:data:`sys.stdout` in one interpreter, it will not be automatically
+redirected any other interpreter. If you import a module in one
+interpreter, it is not automatically imported in any other. You
+would need to import the module separately in interpreter where
+you need it. In fact, each module imported in an interpreter is
+a completely separate object from the same module in a different
+interpreter, including :mod:`sys`, :mod:`builtins`,
+and even ``__main__``.
+
+Isolation means a mutable object, or other data, cannot be used
+by more than one interpreter at the same time. That effectively means
+interpreters cannot actually share such objects or data. Instead,
+each interpreter must have its own copy, and you will have to
+synchronize any changes between the copies manually. Immutable
+objects and data, like the builtin singletons, strings, and tuples
+of immutable objects, don't have these limitations.
+
+Communicating and synchronizing between interpreters is most effectively
+done using dedicated tools, like those proposed in :pep:`734`. One less
+efficient alternative is to serialize with :mod:`pickle` and then send
+the bytes over a shared :mod:`socket <socket>` or
+:func:`pipe <os.pipe>`.
+
+.. class:: InterpreterPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=(), shared=None)
+
+ A :class:`ThreadPoolExecutor` subclass that executes calls asynchronously
+ using a pool of at most *max_workers* threads. Each thread runs
+ tasks in its own interpreter. The worker interpreters are isolated
+ from each other, which means each has its own runtime state and that
+ they can't share any mutable objects or other data. Each interpreter
+ has its own :term:`Global Interpreter Lock <global interpreter lock>`,
+ which means code run with this executor has true multi-core parallelism.
+
+ The optional *initializer* and *initargs* arguments have the same
+ meaning as for :class:`!ThreadPoolExecutor`: the initializer is run
+ when each worker is created, though in this case it is run.in
+ the worker's interpreter. The executor serializes the *initializer*
+ and *initargs* using :mod:`pickle` when sending them to the worker's
+ interpreter.
+
+ .. note::
+ Functions defined in the ``__main__`` module cannot be pickled
+ and thus cannot be used.
+
+ .. note::
+ The executor may replace uncaught exceptions from *initializer*
+ with :class:`~concurrent.futures.interpreter.ExecutionFailed`.
+
+ The optional *shared* argument is a :class:`dict` of objects that all
+ interpreters in the pool share. The *shared* items are added to each
+ interpreter's ``__main__`` module. Not all objects are shareable.
+ Shareable objects include the builtin singletons, :class:`str`
+ and :class:`bytes`, and :class:`memoryview`. See :pep:`734`
+ for more info.
+
+ Other caveats from parent :class:`ThreadPoolExecutor` apply here.
+
+:meth:`~Executor.submit` and :meth:`~Executor.map` work like normal,
+except the worker serializes the callable and arguments using
+:mod:`pickle` when sending them to its interpreter. The worker
+likewise serializes the return value when sending it back.
+
+.. note::
+ Functions defined in the ``__main__`` module cannot be pickled
+ and thus cannot be used.
+
+When a worker's current task raises an uncaught exception, the worker
+always tries to preserve the exception as-is. If that is successful
+then it also sets the ``__cause__`` to a corresponding
+:class:`~concurrent.futures.interpreter.ExecutionFailed`
+instance, which contains a summary of the original exception.
+In the uncommon case that the worker is not able to preserve the
+original as-is then it directly preserves the corresponding
+:class:`~concurrent.futures.interpreter.ExecutionFailed`
+instance instead.
+
+
ProcessPoolExecutor
-------------------
@@ -574,6 +681,26 @@ Exception classes
.. versionadded:: 3.7
+.. currentmodule:: concurrent.futures.interpreter
+
+.. exception:: BrokenInterpreterPool
+
+ Derived from :exc:`~concurrent.futures.thread.BrokenThreadPool`,
+ this exception class is raised when one of the workers
+ of a :class:`~concurrent.futures.InterpreterPoolExecutor`
+ has failed initializing.
+
+ .. versionadded:: next
+
+.. exception:: ExecutionFailed
+
+ Raised from :class:`~concurrent.futures.InterpreterPoolExecutor` when
+ the given initializer fails or from
+ :meth:`~concurrent.futures.Executor.submit` when there's an uncaught
+ exception from the submitted task.
+
+ .. versionadded:: next
+
.. currentmodule:: concurrent.futures.process
.. exception:: BrokenProcessPool
diff --git a/Doc/whatsnew/3.14.rst b/Doc/whatsnew/3.14.rst
index b106578..9543af3 100644
--- a/Doc/whatsnew/3.14.rst
+++ b/Doc/whatsnew/3.14.rst
@@ -225,6 +225,14 @@ ast
* The ``repr()`` output for AST nodes now includes more information.
(Contributed by Tomas R in :gh:`116022`.)
+concurrent.futures
+------------------
+
+* Add :class:`~concurrent.futures.InterpreterPoolExecutor`,
+ which exposes "subinterpreters (multiple Python interpreters in the
+ same process) to Python code. This is separate from the proposed API
+ in :pep:`734`.
+ (Contributed by Eric Snow in :gh:`124548`.)
ctypes
------
diff --git a/Lib/concurrent/futures/__init__.py b/Lib/concurrent/futures/__init__.py
index 72de617..7ada743 100644
--- a/Lib/concurrent/futures/__init__.py
+++ b/Lib/concurrent/futures/__init__.py
@@ -29,6 +29,7 @@ __all__ = (
'Executor',
'wait',
'as_completed',
+ 'InterpreterPoolExecutor',
'ProcessPoolExecutor',
'ThreadPoolExecutor',
)
@@ -39,7 +40,7 @@ def __dir__():
def __getattr__(name):
- global ProcessPoolExecutor, ThreadPoolExecutor
+ global ProcessPoolExecutor, ThreadPoolExecutor, InterpreterPoolExecutor
if name == 'ProcessPoolExecutor':
from .process import ProcessPoolExecutor as pe
@@ -51,4 +52,13 @@ def __getattr__(name):
ThreadPoolExecutor = te
return te
+ if name == 'InterpreterPoolExecutor':
+ try:
+ from .interpreter import InterpreterPoolExecutor as ie
+ except ModuleNotFoundError:
+ ie = InterpreterPoolExecutor = None
+ else:
+ InterpreterPoolExecutor = ie
+ return ie
+
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
diff --git a/Lib/concurrent/futures/interpreter.py b/Lib/concurrent/futures/interpreter.py
new file mode 100644
index 0000000..fd7941a
--- /dev/null
+++ b/Lib/concurrent/futures/interpreter.py
@@ -0,0 +1,241 @@
+"""Implements InterpreterPoolExecutor."""
+
+import contextlib
+import pickle
+import textwrap
+from . import thread as _thread
+import _interpreters
+import _interpqueues
+
+
+class ExecutionFailed(_interpreters.InterpreterError):
+ """An unhandled exception happened during execution."""
+
+ def __init__(self, excinfo):
+ msg = excinfo.formatted
+ if not msg:
+ if excinfo.type and excinfo.msg:
+ msg = f'{excinfo.type.__name__}: {excinfo.msg}'
+ else:
+ msg = excinfo.type.__name__ or excinfo.msg
+ super().__init__(msg)
+ self.excinfo = excinfo
+
+ def __str__(self):
+ try:
+ formatted = self.excinfo.errdisplay
+ except Exception:
+ return super().__str__()
+ else:
+ return textwrap.dedent(f"""
+{super().__str__()}
+
+Uncaught in the interpreter:
+
+{formatted}
+ """.strip())
+
+
+UNBOUND = 2 # error; this should not happen.
+
+
+class WorkerContext(_thread.WorkerContext):
+
+ @classmethod
+ def prepare(cls, initializer, initargs, shared):
+ def resolve_task(fn, args, kwargs):
+ if isinstance(fn, str):
+ # XXX Circle back to this later.
+ raise TypeError('scripts not supported')
+ if args or kwargs:
+ raise ValueError(f'a script does not take args or kwargs, got {args!r} and {kwargs!r}')
+ data = textwrap.dedent(fn)
+ kind = 'script'
+ # Make sure the script compiles.
+ # Ideally we wouldn't throw away the resulting code
+ # object. However, there isn't much to be done until
+ # code objects are shareable and/or we do a better job
+ # of supporting code objects in _interpreters.exec().
+ compile(data, '<string>', 'exec')
+ else:
+ # Functions defined in the __main__ module can't be pickled,
+ # so they can't be used here. In the future, we could possibly
+ # borrow from multiprocessing to work around this.
+ data = pickle.dumps((fn, args, kwargs))
+ kind = 'function'
+ return (data, kind)
+
+ if initializer is not None:
+ try:
+ initdata = resolve_task(initializer, initargs, {})
+ except ValueError:
+ if isinstance(initializer, str) and initargs:
+ raise ValueError(f'an initializer script does not take args, got {initargs!r}')
+ raise # re-raise
+ else:
+ initdata = None
+ def create_context():
+ return cls(initdata, shared)
+ return create_context, resolve_task
+
+ @classmethod
+ @contextlib.contextmanager
+ def _capture_exc(cls, resultsid):
+ try:
+ yield
+ except BaseException as exc:
+ # Send the captured exception out on the results queue,
+ # but still leave it unhandled for the interpreter to handle.
+ err = pickle.dumps(exc)
+ _interpqueues.put(resultsid, (None, err), 1, UNBOUND)
+ raise # re-raise
+
+ @classmethod
+ def _send_script_result(cls, resultsid):
+ _interpqueues.put(resultsid, (None, None), 0, UNBOUND)
+
+ @classmethod
+ def _call(cls, func, args, kwargs, resultsid):
+ with cls._capture_exc(resultsid):
+ res = func(*args or (), **kwargs or {})
+ # Send the result back.
+ try:
+ _interpqueues.put(resultsid, (res, None), 0, UNBOUND)
+ except _interpreters.NotShareableError:
+ res = pickle.dumps(res)
+ _interpqueues.put(resultsid, (res, None), 1, UNBOUND)
+
+ @classmethod
+ def _call_pickled(cls, pickled, resultsid):
+ fn, args, kwargs = pickle.loads(pickled)
+ cls._call(fn, args, kwargs, resultsid)
+
+ def __init__(self, initdata, shared=None):
+ self.initdata = initdata
+ self.shared = dict(shared) if shared else None
+ self.interpid = None
+ self.resultsid = None
+
+ def __del__(self):
+ if self.interpid is not None:
+ self.finalize()
+
+ def _exec(self, script):
+ assert self.interpid is not None
+ excinfo = _interpreters.exec(self.interpid, script, restrict=True)
+ if excinfo is not None:
+ raise ExecutionFailed(excinfo)
+
+ def initialize(self):
+ assert self.interpid is None, self.interpid
+ self.interpid = _interpreters.create(reqrefs=True)
+ try:
+ _interpreters.incref(self.interpid)
+
+ maxsize = 0
+ fmt = 0
+ self.resultsid = _interpqueues.create(maxsize, fmt, UNBOUND)
+
+ self._exec(f'from {__name__} import WorkerContext')
+
+ if self.shared:
+ _interpreters.set___main___attrs(
+ self.interpid, self.shared, restrict=True)
+
+ if self.initdata:
+ self.run(self.initdata)
+ except BaseException:
+ self.finalize()
+ raise # re-raise
+
+ def finalize(self):
+ interpid = self.interpid
+ resultsid = self.resultsid
+ self.resultsid = None
+ self.interpid = None
+ if resultsid is not None:
+ try:
+ _interpqueues.destroy(resultsid)
+ except _interpqueues.QueueNotFoundError:
+ pass
+ if interpid is not None:
+ try:
+ _interpreters.decref(interpid)
+ except _interpreters.InterpreterNotFoundError:
+ pass
+
+ def run(self, task):
+ data, kind = task
+ if kind == 'script':
+ raise NotImplementedError('script kind disabled')
+ script = f"""
+with WorkerContext._capture_exc({self.resultsid}):
+{textwrap.indent(data, ' ')}
+WorkerContext._send_script_result({self.resultsid})"""
+ elif kind == 'function':
+ script = f'WorkerContext._call_pickled({data!r}, {self.resultsid})'
+ else:
+ raise NotImplementedError(kind)
+
+ try:
+ self._exec(script)
+ except ExecutionFailed as exc:
+ exc_wrapper = exc
+ else:
+ exc_wrapper = None
+
+ # Return the result, or raise the exception.
+ while True:
+ try:
+ obj = _interpqueues.get(self.resultsid)
+ except _interpqueues.QueueNotFoundError:
+ raise # re-raise
+ except _interpqueues.QueueError:
+ continue
+ except ModuleNotFoundError:
+ # interpreters.queues doesn't exist, which means
+ # QueueEmpty doesn't. Act as though it does.
+ continue
+ else:
+ break
+ (res, excdata), pickled, unboundop = obj
+ assert unboundop is None, unboundop
+ if excdata is not None:
+ assert res is None, res
+ assert pickled
+ assert exc_wrapper is not None
+ exc = pickle.loads(excdata)
+ raise exc from exc_wrapper
+ return pickle.loads(res) if pickled else res
+
+
+class BrokenInterpreterPool(_thread.BrokenThreadPool):
+ """
+ Raised when a worker thread in an InterpreterPoolExecutor failed initializing.
+ """
+
+
+class InterpreterPoolExecutor(_thread.ThreadPoolExecutor):
+
+ BROKEN = BrokenInterpreterPool
+
+ @classmethod
+ def prepare_context(cls, initializer, initargs, shared):
+ return WorkerContext.prepare(initializer, initargs, shared)
+
+ def __init__(self, max_workers=None, thread_name_prefix='',
+ initializer=None, initargs=(), shared=None):
+ """Initializes a new InterpreterPoolExecutor instance.
+
+ Args:
+ max_workers: The maximum number of interpreters that can be used to
+ execute the given calls.
+ thread_name_prefix: An optional name prefix to give our threads.
+ initializer: A callable or script used to initialize
+ each worker interpreter.
+ initargs: A tuple of arguments to pass to the initializer.
+ shared: A mapping of shareabled objects to be inserted into
+ each worker interpreter.
+ """
+ super().__init__(max_workers, thread_name_prefix,
+ initializer, initargs, shared=shared)
diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py
index a024033..16cc553 100644
--- a/Lib/concurrent/futures/thread.py
+++ b/Lib/concurrent/futures/thread.py
@@ -43,19 +43,46 @@ if hasattr(os, 'register_at_fork'):
after_in_parent=_global_shutdown_lock.release)
+class WorkerContext:
+
+ @classmethod
+ def prepare(cls, initializer, initargs):
+ if initializer is not None:
+ if not callable(initializer):
+ raise TypeError("initializer must be a callable")
+ def create_context():
+ return cls(initializer, initargs)
+ def resolve_task(fn, args, kwargs):
+ return (fn, args, kwargs)
+ return create_context, resolve_task
+
+ def __init__(self, initializer, initargs):
+ self.initializer = initializer
+ self.initargs = initargs
+
+ def initialize(self):
+ if self.initializer is not None:
+ self.initializer(*self.initargs)
+
+ def finalize(self):
+ pass
+
+ def run(self, task):
+ fn, args, kwargs = task
+ return fn(*args, **kwargs)
+
+
class _WorkItem:
- def __init__(self, future, fn, args, kwargs):
+ def __init__(self, future, task):
self.future = future
- self.fn = fn
- self.args = args
- self.kwargs = kwargs
+ self.task = task
- def run(self):
+ def run(self, ctx):
if not self.future.set_running_or_notify_cancel():
return
try:
- result = self.fn(*self.args, **self.kwargs)
+ result = ctx.run(self.task)
except BaseException as exc:
self.future.set_exception(exc)
# Break a reference cycle with the exception 'exc'
@@ -66,16 +93,15 @@ class _WorkItem:
__class_getitem__ = classmethod(types.GenericAlias)
-def _worker(executor_reference, work_queue, initializer, initargs):
- if initializer is not None:
- try:
- initializer(*initargs)
- except BaseException:
- _base.LOGGER.critical('Exception in initializer:', exc_info=True)
- executor = executor_reference()
- if executor is not None:
- executor._initializer_failed()
- return
+def _worker(executor_reference, ctx, work_queue):
+ try:
+ ctx.initialize()
+ except BaseException:
+ _base.LOGGER.critical('Exception in initializer:', exc_info=True)
+ executor = executor_reference()
+ if executor is not None:
+ executor._initializer_failed()
+ return
try:
while True:
try:
@@ -89,7 +115,7 @@ def _worker(executor_reference, work_queue, initializer, initargs):
work_item = work_queue.get(block=True)
if work_item is not None:
- work_item.run()
+ work_item.run(ctx)
# Delete references to object. See GH-60488
del work_item
continue
@@ -110,6 +136,8 @@ def _worker(executor_reference, work_queue, initializer, initargs):
del executor
except BaseException:
_base.LOGGER.critical('Exception in worker', exc_info=True)
+ finally:
+ ctx.finalize()
class BrokenThreadPool(_base.BrokenExecutor):
@@ -120,11 +148,17 @@ class BrokenThreadPool(_base.BrokenExecutor):
class ThreadPoolExecutor(_base.Executor):
+ BROKEN = BrokenThreadPool
+
# Used to assign unique thread names when thread_name_prefix is not supplied.
_counter = itertools.count().__next__
+ @classmethod
+ def prepare_context(cls, initializer, initargs):
+ return WorkerContext.prepare(initializer, initargs)
+
def __init__(self, max_workers=None, thread_name_prefix='',
- initializer=None, initargs=()):
+ initializer=None, initargs=(), **ctxkwargs):
"""Initializes a new ThreadPoolExecutor instance.
Args:
@@ -133,6 +167,7 @@ class ThreadPoolExecutor(_base.Executor):
thread_name_prefix: An optional name prefix to give our threads.
initializer: A callable used to initialize worker threads.
initargs: A tuple of arguments to pass to the initializer.
+ ctxkwargs: Additional arguments to cls.prepare_context().
"""
if max_workers is None:
# ThreadPoolExecutor is often used to:
@@ -146,8 +181,9 @@ class ThreadPoolExecutor(_base.Executor):
if max_workers <= 0:
raise ValueError("max_workers must be greater than 0")
- if initializer is not None and not callable(initializer):
- raise TypeError("initializer must be a callable")
+ (self._create_worker_context,
+ self._resolve_work_item_task,
+ ) = type(self).prepare_context(initializer, initargs, **ctxkwargs)
self._max_workers = max_workers
self._work_queue = queue.SimpleQueue()
@@ -158,13 +194,11 @@ class ThreadPoolExecutor(_base.Executor):
self._shutdown_lock = threading.Lock()
self._thread_name_prefix = (thread_name_prefix or
("ThreadPoolExecutor-%d" % self._counter()))
- self._initializer = initializer
- self._initargs = initargs
def submit(self, fn, /, *args, **kwargs):
with self._shutdown_lock, _global_shutdown_lock:
if self._broken:
- raise BrokenThreadPool(self._broken)
+ raise self.BROKEN(self._broken)
if self._shutdown:
raise RuntimeError('cannot schedule new futures after shutdown')
@@ -173,7 +207,8 @@ class ThreadPoolExecutor(_base.Executor):
'interpreter shutdown')
f = _base.Future()
- w = _WorkItem(f, fn, args, kwargs)
+ task = self._resolve_work_item_task(fn, args, kwargs)
+ w = _WorkItem(f, task)
self._work_queue.put(w)
self._adjust_thread_count()
@@ -196,9 +231,8 @@ class ThreadPoolExecutor(_base.Executor):
num_threads)
t = threading.Thread(name=thread_name, target=_worker,
args=(weakref.ref(self, weakref_cb),
- self._work_queue,
- self._initializer,
- self._initargs))
+ self._create_worker_context(),
+ self._work_queue))
t.start()
self._threads.add(t)
_threads_queues[t] = self._work_queue
@@ -214,7 +248,7 @@ class ThreadPoolExecutor(_base.Executor):
except queue.Empty:
break
if work_item is not None:
- work_item.future.set_exception(BrokenThreadPool(self._broken))
+ work_item.future.set_exception(self.BROKEN(self._broken))
def shutdown(self, wait=True, *, cancel_futures=False):
with self._shutdown_lock:
diff --git a/Lib/test/test_concurrent_futures/executor.py b/Lib/test/test_concurrent_futures/executor.py
index 4160656..b97d9ff 100644
--- a/Lib/test/test_concurrent_futures/executor.py
+++ b/Lib/test/test_concurrent_futures/executor.py
@@ -23,6 +23,7 @@ def make_dummy_object(_):
class ExecutorTest:
+
# Executor.shutdown() and context manager usage is tested by
# ExecutorShutdownTest.
def test_submit(self):
@@ -52,7 +53,8 @@ class ExecutorTest:
i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
self.assertEqual(i.__next__(), (0, 1))
self.assertEqual(i.__next__(), (0, 1))
- self.assertRaises(ZeroDivisionError, i.__next__)
+ with self.assertRaises(ZeroDivisionError):
+ i.__next__()
@support.requires_resource('walltime')
def test_map_timeout(self):
diff --git a/Lib/test/test_concurrent_futures/test_interpreter_pool.py b/Lib/test/test_concurrent_futures/test_interpreter_pool.py
new file mode 100644
index 0000000..0de03c0
--- /dev/null
+++ b/Lib/test/test_concurrent_futures/test_interpreter_pool.py
@@ -0,0 +1,346 @@
+import asyncio
+import contextlib
+import io
+import os
+import pickle
+import sys
+import time
+import unittest
+from concurrent.futures.interpreter import (
+ ExecutionFailed, BrokenInterpreterPool,
+)
+import _interpreters
+from test import support
+import test.test_asyncio.utils as testasyncio_utils
+from test.support.interpreters import queues
+
+from .executor import ExecutorTest, mul
+from .util import BaseTestCase, InterpreterPoolMixin, setup_module
+
+
+def noop():
+ pass
+
+
+def write_msg(fd, msg):
+ os.write(fd, msg + b'\0')
+
+
+def read_msg(fd):
+ msg = b''
+ while ch := os.read(fd, 1):
+ if ch == b'\0':
+ return msg
+ msg += ch
+
+
+def get_current_name():
+ return __name__
+
+
+def fail(exctype, msg=None):
+ raise exctype(msg)
+
+
+def get_current_interpid(*extra):
+ interpid, _ = _interpreters.get_current()
+ return (interpid, *extra)
+
+
+class InterpretersMixin(InterpreterPoolMixin):
+
+ def pipe(self):
+ r, w = os.pipe()
+ self.addCleanup(lambda: os.close(r))
+ self.addCleanup(lambda: os.close(w))
+ return r, w
+
+
+class InterpreterPoolExecutorTest(
+ InterpretersMixin, ExecutorTest, BaseTestCase):
+
+ @unittest.expectedFailure
+ def test_init_script(self):
+ msg1 = b'step: init'
+ msg2 = b'step: run'
+ r, w = self.pipe()
+ initscript = f"""
+ import os
+ msg = {msg2!r}
+ os.write({w}, {msg1!r} + b'\\0')
+ """
+ script = f"""
+ os.write({w}, msg + b'\\0')
+ """
+ os.write(w, b'\0')
+
+ executor = self.executor_type(initializer=initscript)
+ before_init = os.read(r, 100)
+ fut = executor.submit(script)
+ after_init = read_msg(r)
+ fut.result()
+ after_run = read_msg(r)
+
+ self.assertEqual(before_init, b'\0')
+ self.assertEqual(after_init, msg1)
+ self.assertEqual(after_run, msg2)
+
+ @unittest.expectedFailure
+ def test_init_script_args(self):
+ with self.assertRaises(ValueError):
+ self.executor_type(initializer='pass', initargs=('spam',))
+
+ def test_init_func(self):
+ msg = b'step: init'
+ r, w = self.pipe()
+ os.write(w, b'\0')
+
+ executor = self.executor_type(
+ initializer=write_msg, initargs=(w, msg))
+ before = os.read(r, 100)
+ executor.submit(mul, 10, 10)
+ after = read_msg(r)
+
+ self.assertEqual(before, b'\0')
+ self.assertEqual(after, msg)
+
+ def test_init_closure(self):
+ count = 0
+ def init1():
+ assert count == 0, count
+ def init2():
+ nonlocal count
+ count += 1
+
+ with self.assertRaises(pickle.PicklingError):
+ self.executor_type(initializer=init1)
+ with self.assertRaises(pickle.PicklingError):
+ self.executor_type(initializer=init2)
+
+ def test_init_instance_method(self):
+ class Spam:
+ def initializer(self):
+ raise NotImplementedError
+ spam = Spam()
+
+ with self.assertRaises(pickle.PicklingError):
+ self.executor_type(initializer=spam.initializer)
+
+ def test_init_shared(self):
+ msg = b'eggs'
+ r, w = self.pipe()
+ script = f"""if True:
+ import os
+ if __name__ != '__main__':
+ import __main__
+ spam = __main__.spam
+ os.write({w}, spam + b'\\0')
+ """
+
+ executor = self.executor_type(shared={'spam': msg})
+ fut = executor.submit(exec, script)
+ fut.result()
+ after = read_msg(r)
+
+ self.assertEqual(after, msg)
+
+ @unittest.expectedFailure
+ def test_init_exception_in_script(self):
+ executor = self.executor_type(initializer='raise Exception("spam")')
+ with executor:
+ with contextlib.redirect_stderr(io.StringIO()) as stderr:
+ fut = executor.submit('pass')
+ with self.assertRaises(BrokenInterpreterPool):
+ fut.result()
+ stderr = stderr.getvalue()
+ self.assertIn('ExecutionFailed: Exception: spam', stderr)
+ self.assertIn('Uncaught in the interpreter:', stderr)
+ self.assertIn('The above exception was the direct cause of the following exception:',
+ stderr)
+
+ def test_init_exception_in_func(self):
+ executor = self.executor_type(initializer=fail,
+ initargs=(Exception, 'spam'))
+ with executor:
+ with contextlib.redirect_stderr(io.StringIO()) as stderr:
+ fut = executor.submit(noop)
+ with self.assertRaises(BrokenInterpreterPool):
+ fut.result()
+ stderr = stderr.getvalue()
+ self.assertIn('ExecutionFailed: Exception: spam', stderr)
+ self.assertIn('Uncaught in the interpreter:', stderr)
+ self.assertIn('The above exception was the direct cause of the following exception:',
+ stderr)
+
+ @unittest.expectedFailure
+ def test_submit_script(self):
+ msg = b'spam'
+ r, w = self.pipe()
+ script = f"""
+ import os
+ os.write({w}, __name__.encode('utf-8') + b'\\0')
+ """
+ executor = self.executor_type()
+
+ fut = executor.submit(script)
+ res = fut.result()
+ after = read_msg(r)
+
+ self.assertEqual(after, b'__main__')
+ self.assertIs(res, None)
+
+ def test_submit_closure(self):
+ spam = True
+ def task1():
+ return spam
+ def task2():
+ nonlocal spam
+ spam += 1
+ return spam
+
+ executor = self.executor_type()
+ with self.assertRaises(pickle.PicklingError):
+ executor.submit(task1)
+ with self.assertRaises(pickle.PicklingError):
+ executor.submit(task2)
+
+ def test_submit_local_instance(self):
+ class Spam:
+ def __init__(self):
+ self.value = True
+
+ executor = self.executor_type()
+ with self.assertRaises(pickle.PicklingError):
+ executor.submit(Spam)
+
+ def test_submit_instance_method(self):
+ class Spam:
+ def run(self):
+ return True
+ spam = Spam()
+
+ executor = self.executor_type()
+ with self.assertRaises(pickle.PicklingError):
+ executor.submit(spam.run)
+
+ def test_submit_func_globals(self):
+ executor = self.executor_type()
+ fut = executor.submit(get_current_name)
+ name = fut.result()
+
+ self.assertEqual(name, __name__)
+ self.assertNotEqual(name, '__main__')
+
+ @unittest.expectedFailure
+ def test_submit_exception_in_script(self):
+ fut = self.executor.submit('raise Exception("spam")')
+ with self.assertRaises(Exception) as captured:
+ fut.result()
+ self.assertIs(type(captured.exception), Exception)
+ self.assertEqual(str(captured.exception), 'spam')
+ cause = captured.exception.__cause__
+ self.assertIs(type(cause), ExecutionFailed)
+ for attr in ('__name__', '__qualname__', '__module__'):
+ self.assertEqual(getattr(cause.excinfo.type, attr),
+ getattr(Exception, attr))
+ self.assertEqual(cause.excinfo.msg, 'spam')
+
+ def test_submit_exception_in_func(self):
+ fut = self.executor.submit(fail, Exception, 'spam')
+ with self.assertRaises(Exception) as captured:
+ fut.result()
+ self.assertIs(type(captured.exception), Exception)
+ self.assertEqual(str(captured.exception), 'spam')
+ cause = captured.exception.__cause__
+ self.assertIs(type(cause), ExecutionFailed)
+ for attr in ('__name__', '__qualname__', '__module__'):
+ self.assertEqual(getattr(cause.excinfo.type, attr),
+ getattr(Exception, attr))
+ self.assertEqual(cause.excinfo.msg, 'spam')
+
+ def test_saturation(self):
+ blocker = queues.create()
+ executor = self.executor_type(4, shared=dict(blocker=blocker))
+
+ for i in range(15 * executor._max_workers):
+ executor.submit(exec, 'import __main__; __main__.blocker.get()')
+ #executor.submit('blocker.get()')
+ self.assertEqual(len(executor._threads), executor._max_workers)
+ for i in range(15 * executor._max_workers):
+ blocker.put_nowait(None)
+ executor.shutdown(wait=True)
+
+ @support.requires_gil_enabled("gh-117344: test is flaky without the GIL")
+ def test_idle_thread_reuse(self):
+ executor = self.executor_type()
+ executor.submit(mul, 21, 2).result()
+ executor.submit(mul, 6, 7).result()
+ executor.submit(mul, 3, 14).result()
+ self.assertEqual(len(executor._threads), 1)
+ executor.shutdown(wait=True)
+
+
+class AsyncioTest(InterpretersMixin, testasyncio_utils.TestCase):
+
+ def setUp(self):
+ super().setUp()
+ self.loop = asyncio.new_event_loop()
+ self.set_event_loop(self.loop)
+
+ self.executor = self.executor_type()
+ self.addCleanup(lambda: self.executor.shutdown())
+
+ def tearDown(self):
+ if not self.loop.is_closed():
+ testasyncio_utils.run_briefly(self.loop)
+
+ self.doCleanups()
+ support.gc_collect()
+ super().tearDown()
+
+ def test_run_in_executor(self):
+ unexpected, _ = _interpreters.get_current()
+
+ func = get_current_interpid
+ fut = self.loop.run_in_executor(self.executor, func, 'yo')
+ interpid, res = self.loop.run_until_complete(fut)
+
+ self.assertEqual(res, 'yo')
+ self.assertNotEqual(interpid, unexpected)
+
+ def test_run_in_executor_cancel(self):
+ executor = self.executor_type()
+
+ called = False
+
+ def patched_call_soon(*args):
+ nonlocal called
+ called = True
+
+ func = time.sleep
+ fut = self.loop.run_in_executor(self.executor, func, 0.05)
+ fut.cancel()
+ self.loop.run_until_complete(
+ self.loop.shutdown_default_executor())
+ self.loop.close()
+ self.loop.call_soon = patched_call_soon
+ self.loop.call_soon_threadsafe = patched_call_soon
+ time.sleep(0.4)
+ self.assertFalse(called)
+
+ def test_default_executor(self):
+ unexpected, _ = _interpreters.get_current()
+
+ self.loop.set_default_executor(self.executor)
+ fut = self.loop.run_in_executor(None, get_current_interpid)
+ interpid, = self.loop.run_until_complete(fut)
+
+ self.assertNotEqual(interpid, unexpected)
+
+
+def setUpModule():
+ setup_module()
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/Lib/test/test_concurrent_futures/util.py b/Lib/test/test_concurrent_futures/util.py
index 3b8ec3e..52baab5 100644
--- a/Lib/test/test_concurrent_futures/util.py
+++ b/Lib/test/test_concurrent_futures/util.py
@@ -74,6 +74,10 @@ class ThreadPoolMixin(ExecutorMixin):
executor_type = futures.ThreadPoolExecutor
+class InterpreterPoolMixin(ExecutorMixin):
+ executor_type = futures.InterpreterPoolExecutor
+
+
class ProcessPoolForkMixin(ExecutorMixin):
executor_type = futures.ProcessPoolExecutor
ctx = "fork"
@@ -120,6 +124,7 @@ class ProcessPoolForkserverMixin(ExecutorMixin):
def create_executor_tests(remote_globals, mixin, bases=(BaseTestCase,),
executor_mixins=(ThreadPoolMixin,
+ InterpreterPoolMixin,
ProcessPoolForkMixin,
ProcessPoolForkserverMixin,
ProcessPoolSpawnMixin)):
diff --git a/Misc/NEWS.d/next/Library/2024-09-27-15-42-55.gh-issue-124694.uUy32y.rst b/Misc/NEWS.d/next/Library/2024-09-27-15-42-55.gh-issue-124694.uUy32y.rst
new file mode 100644
index 0000000..1aa1a46
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2024-09-27-15-42-55.gh-issue-124694.uUy32y.rst
@@ -0,0 +1,6 @@
+We've added :class:`concurrent.futures.InterpreterPoolExecutor`, which
+allows you to run code in multiple isolated interpreters. This allows you
+to circumvent the limitations of CPU-bound threads (due to the GIL). Patch
+by Eric Snow.
+
+This addition is unrelated to :pep:`734`.