summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVinay Sajip <vinay_sajip@yahoo.co.uk>2010-09-23 08:15:54 (GMT)
committerVinay Sajip <vinay_sajip@yahoo.co.uk>2010-09-23 08:15:54 (GMT)
commit0637d493e9a6cee190186a24b98d21b3a6e217f8 (patch)
tree9b4e088447024331984bbaa029c971581aa91067
parentd12a420acc4e056ec3a4deec1d730647a845ee11 (diff)
downloadcpython-0637d493e9a6cee190186a24b98d21b3a6e217f8.zip
cpython-0637d493e9a6cee190186a24b98d21b3a6e217f8.tar.gz
cpython-0637d493e9a6cee190186a24b98d21b3a6e217f8.tar.bz2
logging: added QueueListener and documentation.
-rw-r--r--Doc/library/logging.rst98
-rw-r--r--Lib/logging/handlers.py110
-rw-r--r--Misc/NEWS3
3 files changed, 207 insertions, 4 deletions
diff --git a/Doc/library/logging.rst b/Doc/library/logging.rst
index 04a1e5b..9aa4dbb 100644
--- a/Doc/library/logging.rst
+++ b/Doc/library/logging.rst
@@ -2651,12 +2651,18 @@ The :class:`QueueHandler` class, located in the :mod:`logging.handlers` module,
supports sending logging messages to a queue, such as those implemented in the
:mod:`queue` or :mod:`multiprocessing` modules.
+Along with the :class:`QueueListener` class, :class:`QueueHandler` can be used
+to let handlers do their work on a separate thread from the one which does the
+logging. This is important in Web applications and also other service
+applications where threads servicing clients need to respond as quickly as
+possible, while any potentially slow operations (such as sending an email via
+:class:`SMTPHandler`) are done on a separate thread.
.. class:: QueueHandler(queue)
Returns a new instance of the :class:`QueueHandler` class. The instance is
initialized with the queue to send messages to. The queue can be any queue-
- like object; it's passed as-is to the :meth:`enqueue` method, which needs
+ like object; it's used as-is by the :meth:`enqueue` method, which needs
to know how to send messages to it.
@@ -2688,8 +2694,80 @@ supports sending logging messages to a queue, such as those implemented in the
The :class:`QueueHandler` class was not present in previous versions.
+.. queue-listener:
+
+QueueListener
+^^^^^^^^^^^^^
+
+The :class:`QueueListener` class, located in the :mod:`logging.handlers`
+module, supports receiving logging messages from a queue, such as those
+implemented in the :mod:`queue` or :mod:`multiprocessing` modules. The
+messages are received from a queue in an internal thread and passed, on
+the same thread, to one or more handlers for processing.
+
+Along with the :class:`QueueHandler` class, :class:`QueueListener` can be used
+to let handlers do their work on a separate thread from the one which does the
+logging. This is important in Web applications and also other service
+applications where threads servicing clients need to respond as quickly as
+possible, while any potentially slow operations (such as sending an email via
+:class:`SMTPHandler`) are done on a separate thread.
+
+.. class:: QueueListener(queue, *handlers)
+
+ Returns a new instance of the :class:`QueueListener` class. The instance is
+ initialized with the queue to send messages to and a list of handlers which
+ will handle entries placed on the queue. The queue can be any queue-
+ like object; it's passed as-is to the :meth:`dequeue` method, which needs
+ to know how to get messages from it.
+
+ .. method:: dequeue(block)
+
+ Dequeues a record and return it, optionally blocking.
+
+ The base implementation uses ``get()``. You may want to override this
+ method if you want to use timeouts or work with custom queue
+ implementations.
+
+ .. method:: prepare(record)
+
+ Prepare a record for handling.
+
+ This implementation just returns the passed-in record. You may want to
+ override this method if you need to do any custom marshalling or
+ manipulation of the record before passing it to the handlers.
+
+ .. method:: handle(record)
+
+ Handle a record.
+
+ This just loops through the handlers offering them the record
+ to handle. The actual object passed to the handlers is that which
+ is returned from :meth:`prepare`.
+
+ .. method:: start()
+
+ Starts the listener.
+
+ This starts up a background thread to monitor the queue for
+ LogRecords to process.
+
+ .. method:: stop()
+
+ Stops the listener.
+
+ This asks the thread to terminate, and then waits for it to do so.
+ Note that if you don't call this before your application exits, there
+ may be some records still left on the queue, which won't be processed.
+
+.. versionadded:: 3.2
+
+The :class:`QueueListener` class was not present in previous versions.
+
.. _zeromq-handlers:
+Subclassing QueueHandler
+^^^^^^^^^^^^^^^^^^^^^^^^
+
You can use a :class:`QueueHandler` subclass to send messages to other kinds
of queues, for example a ZeroMQ "publish" socket. In the example below,the
socket is created separately and passed to the handler (as its 'queue')::
@@ -2716,6 +2794,7 @@ data needed by the handler to create the socket::
def __init__(self, uri, socktype=zmq.PUB, ctx=None):
self.ctx = ctx or zmq.Context()
socket = zmq.Socket(self.ctx, socktype)
+ socket.bind(uri)
QueueHandler.__init__(self, socket)
def enqueue(self, record):
@@ -2725,6 +2804,23 @@ data needed by the handler to create the socket::
def close(self):
self.queue.close()
+Subclassing QueueListener
+^^^^^^^^^^^^^^^^^^^^^^^^^
+
+You can also subclass :class:`QueueListener` to get messages from other kinds
+of queues, for example a ZeroMQ "subscribe" socket. Here's an example::
+
+ class ZeroMQSocketListener(QueueListener):
+ def __init__(self, uri, *handlers, **kwargs):
+ self.ctx = kwargs.get('ctx') or zmq.Context()
+ socket = zmq.Socket(self.ctx, zmq.SUB)
+ socket.setsockopt(zmq.SUBSCRIBE, '') # subscribe to everything
+ socket.connect(uri)
+
+ def dequeue(self):
+ msg = self.queue.recv()
+ return logging.makeLogRecord(json.loads(msg))
+
.. _formatter-objects:
Formatter Objects
diff --git a/Lib/logging/handlers.py b/Lib/logging/handlers.py
index 3d7a678..96cdcc6 100644
--- a/Lib/logging/handlers.py
+++ b/Lib/logging/handlers.py
@@ -1178,8 +1178,8 @@ class QueueHandler(logging.Handler):
def prepare(self, record):
"""
- Prepares a record for queuing. The object returned by this
- method is enqueued.
+ Prepares a record for queuing. The object returned by this method is
+ enqueued.
The base implementation formats the record to merge the message
and arguments, and removes unpickleable items from the record
@@ -1205,7 +1205,7 @@ class QueueHandler(logging.Handler):
"""
Emit a record.
- Writes the LogRecord to the queue, preparing it first.
+ Writes the LogRecord to the queue, preparing it for pickling first.
"""
try:
self.enqueue(self.prepare(record))
@@ -1213,3 +1213,107 @@ class QueueHandler(logging.Handler):
raise
except:
self.handleError(record)
+
+class QueueListener(object):
+ """
+ This class implements an internal threaded listener which watches for
+ LogRecords being added to a queue, removes them and passes them to a
+ list of handlers for processing.
+ """
+ _sentinel = None
+
+ def __init__(self, queue, *handlers):
+ """
+ Initialise an instance with the specified queue and
+ handlers.
+ """
+ self.queue = queue
+ self.handlers = handlers
+ self._stop = threading.Event()
+ self._thread = None
+
+ def dequeue(self, block):
+ """
+ Dequeue a record and return it, optionally blocking.
+
+ The base implementation uses get. You may want to override this method
+ if you want to use timeouts or work with custom queue implementations.
+ """
+ return self.queue.get(block)
+
+ def start(self):
+ """
+ Start the listener.
+
+ This starts up a background thread to monitor the queue for
+ LogRecords to process.
+ """
+ self._thread = t = threading.Thread(target=self._monitor)
+ t.setDaemon(True)
+ t.start()
+
+ def prepare(self , record):
+ """
+ Prepare a record for handling.
+
+ This method just returns the passed-in record. You may want to
+ override this method if you need to do any custom marshalling or
+ manipulation of the record before passing it to the handlers.
+ """
+ return record
+
+ def handle(self, record):
+ """
+ Handle a record.
+
+ This just loops through the handlers offering them the record
+ to handle.
+ """
+ record = self.prepare(record)
+ for handler in self.handlers:
+ handler.handle(record)
+
+ def _monitor(self):
+ """
+ Monitor the queue for records, and ask the handler
+ to deal with them.
+
+ This method runs on a separate, internal thread.
+ The thread will terminate if it sees a sentinel object in the queue.
+ """
+ q = self.queue
+ has_task_done = hasattr(q, 'task_done')
+ while not self._stop.isSet():
+ try:
+ record = self.dequeue(True)
+ if record is self._sentinel:
+ break
+ self.handle(record)
+ if has_task_done:
+ q.task_done()
+ except queue.Empty:
+ pass
+ # There might still be records in the queue.
+ while True:
+ try:
+ record = self.dequeue(False)
+ if record is self._sentinel:
+ break
+ self.handle(record)
+ if has_task_done:
+ q.task_done()
+ except queue.Empty:
+ break
+
+ def stop(self):
+ """
+ Stop the listener.
+
+ This asks the thread to terminate, and then waits for it to do so.
+ Note that if you don't call this before your application exits, there
+ may be some records still left on the queue, which won't be processed.
+ """
+ self._stop.set()
+ self.queue.put_nowait(self._sentinel)
+ self._thread.join()
+ self._thread = None
diff --git a/Misc/NEWS b/Misc/NEWS
index b4f830b..15b4da2 100644
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -62,6 +62,9 @@ Core and Builtins
Library
-------
+- Logging: Added QueueListener class to facilitate logging usage for
+ performance-critical threads.
+
- Issue #9916: Add some missing errno symbols.
- Issue #9877: Expose sysconfig.get_makefile_filename()