diff options
author | Vinay Sajip <vinay_sajip@yahoo.co.uk> | 2010-09-23 08:15:54 (GMT) |
---|---|---|
committer | Vinay Sajip <vinay_sajip@yahoo.co.uk> | 2010-09-23 08:15:54 (GMT) |
commit | 0637d493e9a6cee190186a24b98d21b3a6e217f8 (patch) | |
tree | 9b4e088447024331984bbaa029c971581aa91067 | |
parent | d12a420acc4e056ec3a4deec1d730647a845ee11 (diff) | |
download | cpython-0637d493e9a6cee190186a24b98d21b3a6e217f8.zip cpython-0637d493e9a6cee190186a24b98d21b3a6e217f8.tar.gz cpython-0637d493e9a6cee190186a24b98d21b3a6e217f8.tar.bz2 |
logging: added QueueListener and documentation.
-rw-r--r-- | Doc/library/logging.rst | 98 | ||||
-rw-r--r-- | Lib/logging/handlers.py | 110 | ||||
-rw-r--r-- | Misc/NEWS | 3 |
3 files changed, 207 insertions, 4 deletions
diff --git a/Doc/library/logging.rst b/Doc/library/logging.rst index 04a1e5b..9aa4dbb 100644 --- a/Doc/library/logging.rst +++ b/Doc/library/logging.rst @@ -2651,12 +2651,18 @@ The :class:`QueueHandler` class, located in the :mod:`logging.handlers` module, supports sending logging messages to a queue, such as those implemented in the :mod:`queue` or :mod:`multiprocessing` modules. +Along with the :class:`QueueListener` class, :class:`QueueHandler` can be used +to let handlers do their work on a separate thread from the one which does the +logging. This is important in Web applications and also other service +applications where threads servicing clients need to respond as quickly as +possible, while any potentially slow operations (such as sending an email via +:class:`SMTPHandler`) are done on a separate thread. .. class:: QueueHandler(queue) Returns a new instance of the :class:`QueueHandler` class. The instance is initialized with the queue to send messages to. The queue can be any queue- - like object; it's passed as-is to the :meth:`enqueue` method, which needs + like object; it's used as-is by the :meth:`enqueue` method, which needs to know how to send messages to it. @@ -2688,8 +2694,80 @@ supports sending logging messages to a queue, such as those implemented in the The :class:`QueueHandler` class was not present in previous versions. +.. queue-listener: + +QueueListener +^^^^^^^^^^^^^ + +The :class:`QueueListener` class, located in the :mod:`logging.handlers` +module, supports receiving logging messages from a queue, such as those +implemented in the :mod:`queue` or :mod:`multiprocessing` modules. The +messages are received from a queue in an internal thread and passed, on +the same thread, to one or more handlers for processing. + +Along with the :class:`QueueHandler` class, :class:`QueueListener` can be used +to let handlers do their work on a separate thread from the one which does the +logging. This is important in Web applications and also other service +applications where threads servicing clients need to respond as quickly as +possible, while any potentially slow operations (such as sending an email via +:class:`SMTPHandler`) are done on a separate thread. + +.. class:: QueueListener(queue, *handlers) + + Returns a new instance of the :class:`QueueListener` class. The instance is + initialized with the queue to send messages to and a list of handlers which + will handle entries placed on the queue. The queue can be any queue- + like object; it's passed as-is to the :meth:`dequeue` method, which needs + to know how to get messages from it. + + .. method:: dequeue(block) + + Dequeues a record and return it, optionally blocking. + + The base implementation uses ``get()``. You may want to override this + method if you want to use timeouts or work with custom queue + implementations. + + .. method:: prepare(record) + + Prepare a record for handling. + + This implementation just returns the passed-in record. You may want to + override this method if you need to do any custom marshalling or + manipulation of the record before passing it to the handlers. + + .. method:: handle(record) + + Handle a record. + + This just loops through the handlers offering them the record + to handle. The actual object passed to the handlers is that which + is returned from :meth:`prepare`. + + .. method:: start() + + Starts the listener. + + This starts up a background thread to monitor the queue for + LogRecords to process. + + .. method:: stop() + + Stops the listener. + + This asks the thread to terminate, and then waits for it to do so. + Note that if you don't call this before your application exits, there + may be some records still left on the queue, which won't be processed. + +.. versionadded:: 3.2 + +The :class:`QueueListener` class was not present in previous versions. + .. _zeromq-handlers: +Subclassing QueueHandler +^^^^^^^^^^^^^^^^^^^^^^^^ + You can use a :class:`QueueHandler` subclass to send messages to other kinds of queues, for example a ZeroMQ "publish" socket. In the example below,the socket is created separately and passed to the handler (as its 'queue'):: @@ -2716,6 +2794,7 @@ data needed by the handler to create the socket:: def __init__(self, uri, socktype=zmq.PUB, ctx=None): self.ctx = ctx or zmq.Context() socket = zmq.Socket(self.ctx, socktype) + socket.bind(uri) QueueHandler.__init__(self, socket) def enqueue(self, record): @@ -2725,6 +2804,23 @@ data needed by the handler to create the socket:: def close(self): self.queue.close() +Subclassing QueueListener +^^^^^^^^^^^^^^^^^^^^^^^^^ + +You can also subclass :class:`QueueListener` to get messages from other kinds +of queues, for example a ZeroMQ "subscribe" socket. Here's an example:: + + class ZeroMQSocketListener(QueueListener): + def __init__(self, uri, *handlers, **kwargs): + self.ctx = kwargs.get('ctx') or zmq.Context() + socket = zmq.Socket(self.ctx, zmq.SUB) + socket.setsockopt(zmq.SUBSCRIBE, '') # subscribe to everything + socket.connect(uri) + + def dequeue(self): + msg = self.queue.recv() + return logging.makeLogRecord(json.loads(msg)) + .. _formatter-objects: Formatter Objects diff --git a/Lib/logging/handlers.py b/Lib/logging/handlers.py index 3d7a678..96cdcc6 100644 --- a/Lib/logging/handlers.py +++ b/Lib/logging/handlers.py @@ -1178,8 +1178,8 @@ class QueueHandler(logging.Handler): def prepare(self, record): """ - Prepares a record for queuing. The object returned by this - method is enqueued. + Prepares a record for queuing. The object returned by this method is + enqueued. The base implementation formats the record to merge the message and arguments, and removes unpickleable items from the record @@ -1205,7 +1205,7 @@ class QueueHandler(logging.Handler): """ Emit a record. - Writes the LogRecord to the queue, preparing it first. + Writes the LogRecord to the queue, preparing it for pickling first. """ try: self.enqueue(self.prepare(record)) @@ -1213,3 +1213,107 @@ class QueueHandler(logging.Handler): raise except: self.handleError(record) + +class QueueListener(object): + """ + This class implements an internal threaded listener which watches for + LogRecords being added to a queue, removes them and passes them to a + list of handlers for processing. + """ + _sentinel = None + + def __init__(self, queue, *handlers): + """ + Initialise an instance with the specified queue and + handlers. + """ + self.queue = queue + self.handlers = handlers + self._stop = threading.Event() + self._thread = None + + def dequeue(self, block): + """ + Dequeue a record and return it, optionally blocking. + + The base implementation uses get. You may want to override this method + if you want to use timeouts or work with custom queue implementations. + """ + return self.queue.get(block) + + def start(self): + """ + Start the listener. + + This starts up a background thread to monitor the queue for + LogRecords to process. + """ + self._thread = t = threading.Thread(target=self._monitor) + t.setDaemon(True) + t.start() + + def prepare(self , record): + """ + Prepare a record for handling. + + This method just returns the passed-in record. You may want to + override this method if you need to do any custom marshalling or + manipulation of the record before passing it to the handlers. + """ + return record + + def handle(self, record): + """ + Handle a record. + + This just loops through the handlers offering them the record + to handle. + """ + record = self.prepare(record) + for handler in self.handlers: + handler.handle(record) + + def _monitor(self): + """ + Monitor the queue for records, and ask the handler + to deal with them. + + This method runs on a separate, internal thread. + The thread will terminate if it sees a sentinel object in the queue. + """ + q = self.queue + has_task_done = hasattr(q, 'task_done') + while not self._stop.isSet(): + try: + record = self.dequeue(True) + if record is self._sentinel: + break + self.handle(record) + if has_task_done: + q.task_done() + except queue.Empty: + pass + # There might still be records in the queue. + while True: + try: + record = self.dequeue(False) + if record is self._sentinel: + break + self.handle(record) + if has_task_done: + q.task_done() + except queue.Empty: + break + + def stop(self): + """ + Stop the listener. + + This asks the thread to terminate, and then waits for it to do so. + Note that if you don't call this before your application exits, there + may be some records still left on the queue, which won't be processed. + """ + self._stop.set() + self.queue.put_nowait(self._sentinel) + self._thread.join() + self._thread = None @@ -62,6 +62,9 @@ Core and Builtins Library ------- +- Logging: Added QueueListener class to facilitate logging usage for + performance-critical threads. + - Issue #9916: Add some missing errno symbols. - Issue #9877: Expose sysconfig.get_makefile_filename() |