summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
authorBénédikt Tran <10796600+picnixz@users.noreply.github.com>2024-08-02 11:16:32 (GMT)
committerGitHub <noreply@github.com>2024-08-02 11:16:32 (GMT)
commitfb864c76cd5e450e789a7b4095832e118cc49a39 (patch)
treedf02db9e2ebdae479fb26f4f728e4110504d2d2a /Lib
parentaddbb73927f55855dfcc62fd47b0018de8a814ed (diff)
downloadcpython-fb864c76cd5e450e789a7b4095832e118cc49a39.zip
cpython-fb864c76cd5e450e789a7b4095832e118cc49a39.tar.gz
cpython-fb864c76cd5e450e789a7b4095832e118cc49a39.tar.bz2
gh-121723: Relax constraints on queue objects for `logging.handlers.QueueHandler`. (GH-122154)
Diffstat (limited to 'Lib')
-rw-r--r--Lib/logging/config.py55
-rw-r--r--Lib/test/test_logging.py107
2 files changed, 115 insertions, 47 deletions
diff --git a/Lib/logging/config.py b/Lib/logging/config.py
index 95e129a..3781cb1 100644
--- a/Lib/logging/config.py
+++ b/Lib/logging/config.py
@@ -497,6 +497,33 @@ class BaseConfigurator(object):
value = tuple(value)
return value
+def _is_queue_like_object(obj):
+ """Check that *obj* implements the Queue API."""
+ if isinstance(obj, queue.Queue):
+ return True
+ # defer importing multiprocessing as much as possible
+ from multiprocessing.queues import Queue as MPQueue
+ if isinstance(obj, MPQueue):
+ return True
+ # Depending on the multiprocessing start context, we cannot create
+ # a multiprocessing.managers.BaseManager instance 'mm' to get the
+ # runtime type of mm.Queue() or mm.JoinableQueue() (see gh-119819).
+ #
+ # Since we only need an object implementing the Queue API, we only
+ # do a protocol check, but we do not use typing.runtime_checkable()
+ # and typing.Protocol to reduce import time (see gh-121723).
+ #
+ # Ideally, we would have wanted to simply use strict type checking
+ # instead of a protocol-based type checking since the latter does
+ # not check the method signatures.
+ queue_interface = [
+ 'empty', 'full', 'get', 'get_nowait',
+ 'put', 'put_nowait', 'join', 'qsize',
+ 'task_done',
+ ]
+ return all(callable(getattr(obj, method, None))
+ for method in queue_interface)
+
class DictConfigurator(BaseConfigurator):
"""
Configure logging using a dictionary-like object to describe the
@@ -791,32 +818,8 @@ class DictConfigurator(BaseConfigurator):
if '()' not in qspec:
raise TypeError('Invalid queue specifier %r' % qspec)
config['queue'] = self.configure_custom(dict(qspec))
- else:
- from multiprocessing.queues import Queue as MPQueue
-
- if not isinstance(qspec, (queue.Queue, MPQueue)):
- # Safely check if 'qspec' is an instance of Manager.Queue
- # / Manager.JoinableQueue
-
- from multiprocessing import Manager as MM
- from multiprocessing.managers import BaseProxy
-
- # if it's not an instance of BaseProxy, it also can't be
- # an instance of Manager.Queue / Manager.JoinableQueue
- if isinstance(qspec, BaseProxy):
- # Sometimes manager or queue creation might fail
- # (e.g. see issue gh-120868). In that case, any
- # exception during the creation of these queues will
- # propagate up to the caller and be wrapped in a
- # `ValueError`, whose cause will indicate the details of
- # the failure.
- mm = MM()
- proxy_queue = mm.Queue()
- proxy_joinable_queue = mm.JoinableQueue()
- if not isinstance(qspec, (type(proxy_queue), type(proxy_joinable_queue))):
- raise TypeError('Invalid queue specifier %r' % qspec)
- else:
- raise TypeError('Invalid queue specifier %r' % qspec)
+ elif not _is_queue_like_object(qspec):
+ raise TypeError('Invalid queue specifier %r' % qspec)
if 'listener' in config:
lspec = config['listener']
diff --git a/Lib/test/test_logging.py b/Lib/test/test_logging.py
index 6d688d4..4952375 100644
--- a/Lib/test/test_logging.py
+++ b/Lib/test/test_logging.py
@@ -2368,6 +2368,26 @@ class CustomListener(logging.handlers.QueueListener):
class CustomQueue(queue.Queue):
pass
+class CustomQueueProtocol:
+ def __init__(self, maxsize=0):
+ self.queue = queue.Queue(maxsize)
+
+ def __getattr__(self, attribute):
+ queue = object.__getattribute__(self, 'queue')
+ return getattr(queue, attribute)
+
+class CustomQueueFakeProtocol(CustomQueueProtocol):
+ # An object implementing the Queue API (incorrect signatures).
+ # The object will be considered a valid queue class since we
+ # do not check the signatures (only callability of methods)
+ # but will NOT be usable in production since a TypeError will
+ # be raised due to a missing argument.
+ def empty(self, x):
+ pass
+
+class CustomQueueWrongProtocol(CustomQueueProtocol):
+ empty = None
+
def queueMaker():
return queue.Queue()
@@ -3901,18 +3921,16 @@ class ConfigDictTest(BaseTest):
@threading_helper.requires_working_threading()
@support.requires_subprocess()
def test_config_queue_handler(self):
- q = CustomQueue()
- dq = {
- '()': __name__ + '.CustomQueue',
- 'maxsize': 10
- }
+ qs = [CustomQueue(), CustomQueueProtocol()]
+ dqs = [{'()': f'{__name__}.{cls}', 'maxsize': 10}
+ for cls in ['CustomQueue', 'CustomQueueProtocol']]
dl = {
'()': __name__ + '.listenerMaker',
'arg1': None,
'arg2': None,
'respect_handler_level': True
}
- qvalues = (None, __name__ + '.queueMaker', __name__ + '.CustomQueue', dq, q)
+ qvalues = (None, __name__ + '.queueMaker', __name__ + '.CustomQueue', *dqs, *qs)
lvalues = (None, __name__ + '.CustomListener', dl, CustomListener)
for qspec, lspec in itertools.product(qvalues, lvalues):
self.do_queuehandler_configuration(qspec, lspec)
@@ -3932,15 +3950,21 @@ class ConfigDictTest(BaseTest):
@support.requires_subprocess()
@patch("multiprocessing.Manager")
def test_config_queue_handler_does_not_create_multiprocessing_manager(self, manager):
- # gh-120868
+ # gh-120868, gh-121723
from multiprocessing import Queue as MQ
q1 = {"()": "queue.Queue", "maxsize": -1}
q2 = MQ()
q3 = queue.Queue()
-
- for qspec in (q1, q2, q3):
+ # CustomQueueFakeProtocol passes the checks but will not be usable
+ # since the signatures are incompatible. Checking the Queue API
+ # without testing the type of the actual queue is a trade-off
+ # between usability and the work we need to do in order to safely
+ # check that the queue object correctly implements the API.
+ q4 = CustomQueueFakeProtocol()
+
+ for qspec in (q1, q2, q3, q4):
self.apply_config(
{
"version": 1,
@@ -3956,21 +3980,62 @@ class ConfigDictTest(BaseTest):
@patch("multiprocessing.Manager")
def test_config_queue_handler_invalid_config_does_not_create_multiprocessing_manager(self, manager):
- # gh-120868
+ # gh-120868, gh-121723
- with self.assertRaises(ValueError):
- self.apply_config(
- {
- "version": 1,
- "handlers": {
- "queue_listener": {
- "class": "logging.handlers.QueueHandler",
- "queue": object(),
+ for qspec in [object(), CustomQueueWrongProtocol()]:
+ with self.assertRaises(ValueError):
+ self.apply_config(
+ {
+ "version": 1,
+ "handlers": {
+ "queue_listener": {
+ "class": "logging.handlers.QueueHandler",
+ "queue": qspec,
+ },
},
- },
+ }
+ )
+ manager.assert_not_called()
+
+ @skip_if_tsan_fork
+ @support.requires_subprocess()
+ @unittest.skipUnless(support.Py_DEBUG, "requires a debug build for testing"
+ "assertions in multiprocessing")
+ def test_config_queue_handler_multiprocessing_context(self):
+ # regression test for gh-121723
+ if support.MS_WINDOWS:
+ start_methods = ['spawn']
+ else:
+ start_methods = ['spawn', 'fork', 'forkserver']
+ for start_method in start_methods:
+ with self.subTest(start_method=start_method):
+ ctx = multiprocessing.get_context(start_method)
+ with ctx.Manager() as manager:
+ q = manager.Queue()
+ records = []
+ # use 1 process and 1 task per child to put 1 record
+ with ctx.Pool(1, initializer=self._mpinit_issue121723,
+ initargs=(q, "text"), maxtasksperchild=1):
+ records.append(q.get(timeout=60))
+ self.assertTrue(q.empty())
+ self.assertEqual(len(records), 1)
+
+ @staticmethod
+ def _mpinit_issue121723(qspec, message_to_log):
+ # static method for pickling support
+ logging.config.dictConfig({
+ 'version': 1,
+ 'disable_existing_loggers': True,
+ 'handlers': {
+ 'log_to_parent': {
+ 'class': 'logging.handlers.QueueHandler',
+ 'queue': qspec
}
- )
- manager.assert_not_called()
+ },
+ 'root': {'handlers': ['log_to_parent'], 'level': 'DEBUG'}
+ })
+ # log a message (this creates a record put in the queue)
+ logging.getLogger().info(message_to_log)
@skip_if_tsan_fork
@support.requires_subprocess()