summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
authorVinay Sajip <vinay_sajip@yahoo.co.uk>2010-09-23 08:15:54 (GMT)
committerVinay Sajip <vinay_sajip@yahoo.co.uk>2010-09-23 08:15:54 (GMT)
commit0637d493e9a6cee190186a24b98d21b3a6e217f8 (patch)
tree9b4e088447024331984bbaa029c971581aa91067 /Lib
parentd12a420acc4e056ec3a4deec1d730647a845ee11 (diff)
downloadcpython-0637d493e9a6cee190186a24b98d21b3a6e217f8.zip
cpython-0637d493e9a6cee190186a24b98d21b3a6e217f8.tar.gz
cpython-0637d493e9a6cee190186a24b98d21b3a6e217f8.tar.bz2
logging: added QueueListener and documentation.
Diffstat (limited to 'Lib')
-rw-r--r--Lib/logging/handlers.py110
1 files changed, 107 insertions, 3 deletions
diff --git a/Lib/logging/handlers.py b/Lib/logging/handlers.py
index 3d7a678..96cdcc6 100644
--- a/Lib/logging/handlers.py
+++ b/Lib/logging/handlers.py
@@ -1178,8 +1178,8 @@ class QueueHandler(logging.Handler):
def prepare(self, record):
"""
- Prepares a record for queuing. The object returned by this
- method is enqueued.
+ Prepares a record for queuing. The object returned by this method is
+ enqueued.
The base implementation formats the record to merge the message
and arguments, and removes unpickleable items from the record
@@ -1205,7 +1205,7 @@ class QueueHandler(logging.Handler):
"""
Emit a record.
- Writes the LogRecord to the queue, preparing it first.
+ Writes the LogRecord to the queue, preparing it for pickling first.
"""
try:
self.enqueue(self.prepare(record))
@@ -1213,3 +1213,107 @@ class QueueHandler(logging.Handler):
raise
except:
self.handleError(record)
+
+class QueueListener(object):
+ """
+ This class implements an internal threaded listener which watches for
+ LogRecords being added to a queue, removes them and passes them to a
+ list of handlers for processing.
+ """
+ _sentinel = None
+
+ def __init__(self, queue, *handlers):
+ """
+ Initialise an instance with the specified queue and
+ handlers.
+ """
+ self.queue = queue
+ self.handlers = handlers
+ self._stop = threading.Event()
+ self._thread = None
+
+ def dequeue(self, block):
+ """
+ Dequeue a record and return it, optionally blocking.
+
+ The base implementation uses get. You may want to override this method
+ if you want to use timeouts or work with custom queue implementations.
+ """
+ return self.queue.get(block)
+
+ def start(self):
+ """
+ Start the listener.
+
+ This starts up a background thread to monitor the queue for
+ LogRecords to process.
+ """
+ self._thread = t = threading.Thread(target=self._monitor)
+ t.setDaemon(True)
+ t.start()
+
+ def prepare(self , record):
+ """
+ Prepare a record for handling.
+
+ This method just returns the passed-in record. You may want to
+ override this method if you need to do any custom marshalling or
+ manipulation of the record before passing it to the handlers.
+ """
+ return record
+
+ def handle(self, record):
+ """
+ Handle a record.
+
+ This just loops through the handlers offering them the record
+ to handle.
+ """
+ record = self.prepare(record)
+ for handler in self.handlers:
+ handler.handle(record)
+
+ def _monitor(self):
+ """
+ Monitor the queue for records, and ask the handler
+ to deal with them.
+
+ This method runs on a separate, internal thread.
+ The thread will terminate if it sees a sentinel object in the queue.
+ """
+ q = self.queue
+ has_task_done = hasattr(q, 'task_done')
+ while not self._stop.isSet():
+ try:
+ record = self.dequeue(True)
+ if record is self._sentinel:
+ break
+ self.handle(record)
+ if has_task_done:
+ q.task_done()
+ except queue.Empty:
+ pass
+ # There might still be records in the queue.
+ while True:
+ try:
+ record = self.dequeue(False)
+ if record is self._sentinel:
+ break
+ self.handle(record)
+ if has_task_done:
+ q.task_done()
+ except queue.Empty:
+ break
+
+ def stop(self):
+ """
+ Stop the listener.
+
+ This asks the thread to terminate, and then waits for it to do so.
+ Note that if you don't call this before your application exits, there
+ may be some records still left on the queue, which won't be processed.
+ """
+ self._stop.set()
+ self.queue.put_nowait(self._sentinel)
+ self._thread.join()
+ self._thread = None