Queue Support

The queue support module makes it possible to add log records to a queue system. This is useful for distributed setups where you want multiple processes to log to the same backend. Currently supported are ZeroMQ as well as the multiprocessing Queue class.

ZeroMQ

class logbook.queues.ZeroMQHandler(uri=None, level=0, filter=None, bubble=False, context=None, multi=False)

A handler that acts as a ZeroMQ publisher, which publishes each record as json dump. Requires the pyzmq library.

The queue will be filled with JSON exported log records. To receive such log records from a queue you can use the ZeroMQSubscriber.

If multi is set to True, the handler will use a PUSH socket to publish the records. This allows multiple handlers to use the same uri. The records can be received by using the ZeroMQSubscriber with multi set to True.

Example setup:

handler = ZeroMQHandler('tcp://127.0.0.1:5000')
context = None

the zero mq context

export_record(record)

Exports the record into a dictionary ready for JSON dumping.

socket = None

the zero mq socket.

class logbook.queues.ZeroMQSubscriber(uri=None, context=None, multi=False)

A helper that acts as ZeroMQ subscriber and will dispatch received log records to the active handler setup. There are multiple ways to use this class.

It can be used to receive log records from a queue:

subscriber = ZeroMQSubscriber('tcp://127.0.0.1:5000')
record = subscriber.recv()

But it can also be used to receive and dispatch these in one go:

with target_handler:
    subscriber = ZeroMQSubscriber('tcp://127.0.0.1:5000')
    subscriber.dispatch_forever()

This will take all the log records from that queue and dispatch them over to target_handler. If you want you can also do that in the background:

subscriber = ZeroMQSubscriber('tcp://127.0.0.1:5000')
controller = subscriber.dispatch_in_background(target_handler)

The controller returned can be used to shut down the background thread:

controller.stop()

If multi is set to True, the subscriber will use a PULL socket and listen to records published by a PUSH socket (usually via a ZeroMQHandler with multi set to True). This allows a single subscriber to dispatch multiple handlers.

close()

Closes the zero mq socket.

context = None

the zero mq context

dispatch_forever()

Starts a loop that dispatches log records forever.

dispatch_in_background(setup=None)

Starts a new daemonized thread that dispatches in the background. An optional handler setup can be provided that pushed to the new thread (can be any logbook.base.StackedObject).

Returns a ThreadController object for shutting down the background thread. The background thread will already be running when this function returns.

dispatch_once(timeout=None)

Receives one record from the socket, loads it and dispatches it. Returns True if something was dispatched or False if it timed out.

recv(timeout=None)

Receives a single record from the socket. Timeout of 0 means nonblocking, None means blocking and otherwise it’s a timeout in seconds after which the function just returns with None.

socket = None

the zero mq socket.

AMQP Message Queues

class logbook.queues.MessageQueueHandler(uri=None, queue='logging', level=0, filter=None, bubble=False)

A handler that acts as a message queue publisher, which publishes each record as json dump. Requires the kombu module.

The queue will be filled with JSON exported log records. To receive such log records from a queue you can use the MessageQueueSubscriber.

For an AMQP backend such as RabbitMQ:

handler = MessageQueueHandler('amqp://guest:guest@localhost//')

This requires the py-amqp or the librabbitmq client library.

For Redis (requires redis client library):

handler = MessageQueueHandler('redis://localhost:8889/0')

For MongoDB (requires pymongo):

handler = MessageQueueHandler('mongodb://localhost:27017/logging')

Several other backends are also supported. Refer to the kombu documentation

export_record(record)

Exports the record into a dictionary ready for JSON dumping.

class logbook.queues.MessageQueueSubscriber(uri=None, queue='logging')

A helper that acts as a message queue subscriber and will dispatch received log records to the active handler setup. There are multiple ways to use this class.

It can be used to receive log records from a queue:

subscriber = MessageQueueSubscriber('mongodb://localhost:27017/logging')
record = subscriber.recv()

But it can also be used to receive and dispatch these in one go:

with target_handler:
    subscriber = MessageQueueSubscriber('mongodb://localhost:27017/logging')
    subscriber.dispatch_forever()

This will take all the log records from that queue and dispatch them over to target_handler. If you want you can also do that in the background:

subscriber = MessageQueueSubscriber('mongodb://localhost:27017/logging')
controller = subscriber.dispatch_in_background(target_handler)

The controller returned can be used to shut down the background thread:

controller.stop()
recv(timeout=None)

Receives a single record from the socket. Timeout of 0 means nonblocking, None means blocking and otherwise it’s a timeout in seconds after which the function just returns with None.

MultiProcessing

class logbook.queues.MultiProcessingHandler(queue, level=0, filter=None, bubble=False)

Implements a handler that dispatches over a queue to a different process. It is connected to a subscriber with a multiprocessing.Queue:

from multiprocessing import Queue
from logbook.queues import MultiProcessingHandler
queue = Queue(-1)
handler = MultiProcessingHandler(queue)
class logbook.queues.MultiProcessingSubscriber(queue=None)

Receives log records from the given multiprocessing queue and dispatches them to the active handler setup. Make sure to use the same queue for both handler and subscriber. Idaelly the queue is set up with maximum size (-1):

from multiprocessing import Queue
queue = Queue(-1)

It can be used to receive log records from a queue:

subscriber = MultiProcessingSubscriber(queue)
record = subscriber.recv()

But it can also be used to receive and dispatch these in one go:

with target_handler:
    subscriber = MultiProcessingSubscriber(queue)
    subscriber.dispatch_forever()

This will take all the log records from that queue and dispatch them over to target_handler. If you want you can also do that in the background:

subscriber = MultiProcessingSubscriber(queue)
controller = subscriber.dispatch_in_background(target_handler)

The controller returned can be used to shut down the background thread:

controller.stop()

If no queue is provided the subscriber will create one. This one can the be used by handlers:

subscriber = MultiProcessingSubscriber()
handler = MultiProcessingHandler(subscriber.queue)
dispatch_forever()

Starts a loop that dispatches log records forever.

dispatch_in_background(setup=None)

Starts a new daemonized thread that dispatches in the background. An optional handler setup can be provided that pushed to the new thread (can be any logbook.base.StackedObject).

Returns a ThreadController object for shutting down the background thread. The background thread will already be running when this function returns.

dispatch_once(timeout=None)

Receives one record from the socket, loads it and dispatches it. Returns True if something was dispatched or False if it timed out.

Other

class logbook.queues.ThreadedWrapperHandler(handler, maxsize=0)

This handled uses a single background thread to dispatch log records to a specific other handler using an internal queue. The idea is that if you are using a handler that requires some time to hand off the log records (such as the mail handler) and would block your request, you can let Logbook do that in a background thread.

The threaded wrapper handler will automatically adopt the methods and properties of the wrapped handler. All the values will be reflected:

>>> twh = ThreadedWrapperHandler(TestHandler())
>>> from logbook import WARNING
>>> twh.level_name = 'WARNING'
>>> twh.handler.level_name
'WARNING'
class logbook.queues.SubscriberGroup(subscribers=None, queue_limit=10)

This is a subscriber which represents a group of subscribers.

This is helpful if you are writing a server-like application which has “slaves”. This way a user is easily able to view every log record which happened somewhere in the entire system without having to check every single slave:

subscribers = SubscriberGroup([
    MultiProcessingSubscriber(queue),
    ZeroMQSubscriber('tcp://127.0.0.1:5000')
])
with target_handler:
    subscribers.dispatch_forever()
add(subscriber)

Adds the given subscriber to the group.

stop()

Stops the group from internally recieving any more messages, once the internal queue is exhausted recv() will always return None.

Base Interface

class logbook.queues.SubscriberBase

Baseclass for all subscribers.

dispatch_forever()

Starts a loop that dispatches log records forever.

dispatch_in_background(setup=None)

Starts a new daemonized thread that dispatches in the background. An optional handler setup can be provided that pushed to the new thread (can be any logbook.base.StackedObject).

Returns a ThreadController object for shutting down the background thread. The background thread will already be running when this function returns.

dispatch_once(timeout=None)

Receives one record from the socket, loads it and dispatches it. Returns True if something was dispatched or False if it timed out.

recv(timeout=None)

Receives a single record from the socket. Timeout of 0 means nonblocking, None means blocking and otherwise it’s a timeout in seconds after which the function just returns with None.

Subclasses have to override this.

class logbook.queues.ThreadController(subscriber, setup=None)

A helper class used by queue subscribers to control the background thread. This is usually created and started in one go by dispatch_in_background() or a comparable function.

start()

Starts the task thread.

stop()

Stops the task thread.

class logbook.queues.TWHThreadController(wrapper_handler)

A very basic thread controller that pulls things in from a queue and sends it to a handler. Both queue and handler are taken from the passed ThreadedWrapperHandler.

start()

Starts the task thread.

stop()

Stops the task thread.