From bf2e7e55d7306b1e2fce7dce767e8df5ff42cf1c Mon Sep 17 00:00:00 2001 From: Asheesh Laroia Date: Sun, 7 Feb 2021 19:15:51 -0800 Subject: bpo-40692: Run more test_concurrent_futures tests (GH-20239) In the case of multiprocessing.synchronize() being missing, the test_concurrent_futures test suite now skips only the tests that require multiprocessing.synchronize(). Validate that multiprocessing.synchronize exists as part of _check_system_limits(), allowing ProcessPoolExecutor to raise NotImplementedError during __init__, rather than crashing with ImportError during __init__ when creating a lock imported from multiprocessing.synchronize. Use _check_system_limits() to disable tests of ProcessPoolExecutor on systems without multiprocessing.synchronize. Running the test suite without multiprocessing.synchronize reveals that Lib/compileall.py crashes when it uses a ProcessPoolExecutor. Therefore, change Lib/compileall.py to call _check_system_limits() before creating the ProcessPoolExecutor. Note that both Lib/compileall.py and Lib/test/test_compileall.py were attempting to sanity-check ProcessPoolExecutor by expecting ImportError. In multiprocessing.resource_tracker, sem_unlink() is also absent on platforms where POSIX semaphores aren't available. Avoid using sem_unlink() if it, too, does not exist. Co-authored-by: Pablo Galindo --- Lib/compileall.py | 10 ++++++---- Lib/concurrent/futures/process.py | 8 ++++++++ Lib/multiprocessing/resource_tracker.py | 10 +++++++++- Lib/test/test_compileall.py | 11 +++++++++-- Lib/test/test_concurrent_futures.py | 19 ++++++++++++++++--- .../2020-05-19-22-10-05.bpo-40692.ajEhrR.rst | 1 + 6 files changed, 49 insertions(+), 10 deletions(-) create mode 100644 Misc/NEWS.d/next/Core and Builtins/2020-05-19-22-10-05.bpo-40692.ajEhrR.rst diff --git a/Lib/compileall.py b/Lib/compileall.py index fe7f450..672cb43 100644 --- a/Lib/compileall.py +++ b/Lib/compileall.py @@ -84,12 +84,14 @@ def compile_dir(dir, maxlevels=None, ddir=None, force=False, if workers < 0: raise ValueError('workers must be greater or equal to 0') if workers != 1: + # Check if this is a system where ProcessPoolExecutor can function. + from concurrent.futures.process import _check_system_limits try: - # Only import when needed, as low resource platforms may - # fail to import it - from concurrent.futures import ProcessPoolExecutor - except ImportError: + _check_system_limits() + except NotImplementedError: workers = 1 + else: + from concurrent.futures import ProcessPoolExecutor if maxlevels is None: maxlevels = sys.getrecursionlimit() files = _walk_dir(dir, quiet=quiet, maxlevels=maxlevels) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 90bc98b..7647198 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -533,6 +533,14 @@ def _check_system_limits(): raise NotImplementedError(_system_limited) _system_limits_checked = True try: + import multiprocessing.synchronize + except ImportError: + _system_limited = ( + "This Python build lacks multiprocessing.synchronize, usually due " + "to named semaphores being unavailable on this platform." + ) + raise NotImplementedError(_system_limited) + try: nsems_max = os.sysconf("SC_SEM_NSEMS_MAX") except (AttributeError, ValueError): # sysconf not available or setting not available diff --git a/Lib/multiprocessing/resource_tracker.py b/Lib/multiprocessing/resource_tracker.py index c9bfa9b..cc42dbd 100644 --- a/Lib/multiprocessing/resource_tracker.py +++ b/Lib/multiprocessing/resource_tracker.py @@ -37,8 +37,16 @@ if os.name == 'posix': import _multiprocessing import _posixshmem + # Use sem_unlink() to clean up named semaphores. + # + # sem_unlink() may be missing if the Python build process detected the + # absence of POSIX named semaphores. In that case, no named semaphores were + # ever opened, so no cleanup would be necessary. + if hasattr(_multiprocessing, 'sem_unlink'): + _CLEANUP_FUNCS.update({ + 'semaphore': _multiprocessing.sem_unlink, + }) _CLEANUP_FUNCS.update({ - 'semaphore': _multiprocessing.sem_unlink, 'shared_memory': _posixshmem.shm_unlink, }) diff --git a/Lib/test/test_compileall.py b/Lib/test/test_compileall.py index be1149a..fa24b3c 100644 --- a/Lib/test/test_compileall.py +++ b/Lib/test/test_compileall.py @@ -16,10 +16,14 @@ import time import unittest from unittest import mock, skipUnless +from concurrent.futures import ProcessPoolExecutor try: - from concurrent.futures import ProcessPoolExecutor + # compileall relies on ProcessPoolExecutor if ProcessPoolExecutor exists + # and it can function. + from concurrent.futures.process import _check_system_limits + _check_system_limits() _have_multiprocessing = True -except ImportError: +except NotImplementedError: _have_multiprocessing = False from test import support @@ -188,6 +192,7 @@ class CompileallTestsBase: self.assertRegex(line, r'Listing ([^WindowsPath|PosixPath].*)') self.assertTrue(os.path.isfile(self.bc_path)) + @skipUnless(_have_multiprocessing, "requires multiprocessing") @mock.patch('concurrent.futures.ProcessPoolExecutor') def test_compile_pool_called(self, pool_mock): compileall.compile_dir(self.directory, quiet=True, workers=5) @@ -198,11 +203,13 @@ class CompileallTestsBase: "workers must be greater or equal to 0"): compileall.compile_dir(self.directory, workers=-1) + @skipUnless(_have_multiprocessing, "requires multiprocessing") @mock.patch('concurrent.futures.ProcessPoolExecutor') def test_compile_workers_cpu_count(self, pool_mock): compileall.compile_dir(self.directory, quiet=True, workers=0) self.assertEqual(pool_mock.call_args[1]['max_workers'], None) + @skipUnless(_have_multiprocessing, "requires multiprocessing") @mock.patch('concurrent.futures.ProcessPoolExecutor') @mock.patch('compileall.compile_file') def test_compile_one_worker(self, compile_file_mock, pool_mock): diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index a182b14..99651f5 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -4,8 +4,6 @@ from test.support import threading_helper # Skip tests if _multiprocessing wasn't built. import_helper.import_module('_multiprocessing') -# Skip tests if sem_open implementation is broken. -support.skip_if_broken_multiprocessing_synchronize() from test.support import hashlib_helper from test.support.script_helper import assert_python_ok @@ -27,7 +25,7 @@ from concurrent import futures from concurrent.futures._base import ( PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future, BrokenExecutor) -from concurrent.futures.process import BrokenProcessPool +from concurrent.futures.process import BrokenProcessPool, _check_system_limits from multiprocessing import get_context import multiprocessing.process @@ -161,6 +159,10 @@ class ProcessPoolForkMixin(ExecutorMixin): ctx = "fork" def get_context(self): + try: + _check_system_limits() + except NotImplementedError: + self.skipTest("ProcessPoolExecutor unavailable on this system") if sys.platform == "win32": self.skipTest("require unix system") return super().get_context() @@ -170,12 +172,23 @@ class ProcessPoolSpawnMixin(ExecutorMixin): executor_type = futures.ProcessPoolExecutor ctx = "spawn" + def get_context(self): + try: + _check_system_limits() + except NotImplementedError: + self.skipTest("ProcessPoolExecutor unavailable on this system") + return super().get_context() + class ProcessPoolForkserverMixin(ExecutorMixin): executor_type = futures.ProcessPoolExecutor ctx = "forkserver" def get_context(self): + try: + _check_system_limits() + except NotImplementedError: + self.skipTest("ProcessPoolExecutor unavailable on this system") if sys.platform == "win32": self.skipTest("require unix system") return super().get_context() diff --git a/Misc/NEWS.d/next/Core and Builtins/2020-05-19-22-10-05.bpo-40692.ajEhrR.rst b/Misc/NEWS.d/next/Core and Builtins/2020-05-19-22-10-05.bpo-40692.ajEhrR.rst new file mode 100644 index 0000000..b92dcdd --- /dev/null +++ b/Misc/NEWS.d/next/Core and Builtins/2020-05-19-22-10-05.bpo-40692.ajEhrR.rst @@ -0,0 +1 @@ +In the :class:`concurrent.futures.ProcessPoolExecutor`, validate that :func:`multiprocess.synchronize` is available on a given platform and rely on that check in the :mod:`concurrent.futures` test suite so we can run tests that are unrelated to :class:`ProcessPoolExecutor` on those platforms. -- cgit v0.12