priority_queue

Priority message queue with Redis

PriorityQueue

class pyrediq.priority_queue.PriorityQueue(name, redis_conn=None)

The priority queue implementation using multiple Redis lists.

Parameters:
  • name (str) – The queue name.
  • redis_conn (redis.StrictRedis) – The Redis connection client. If not provided, a new StrictRedis client is created without any arguments.
MAX_PRIORITY = 7

The maximum priority allowed. (This cannot be changed.)

MIN_PRIORITY = -8

The minimum priority allowed. (This cannot be changed.)

consumer(*args, **kwds)

Get a message consumer for the priority queue. This method should be used with a context manager (i.e., the with statement) so that the appropriate consumer cleanup action gets run once the consumer is no longer needed.

Return type:MessageConsumer
Returns:The message consumer of the priority queue.
is_empty()

Test if the priority queue is empty. The message count used for the check includes the messages currently processed by active consumers.

Return type:bool
Returns:True if empty, False if not empty.
name

The name of the priority queue.

purge()

Purge the priority queue.

Raises:RuntimeError when consumers are active.
put(payload, priority=0)

Create a new message in the priority queue.

Parameters:
  • payload (msgpack serializable) – The payload for a new message.
  • priority (int) – The priority for the new message. It must be in the range of [-8, 7], inclusive, and a value outside the range will be capped to the min/max.
size()

The number of messages in the priority queue. The message count includes the messages currently processed by active consumers.

Return type:int
Returns:The message count.

MessageConsumer

class pyrediq.priority_queue.MessageConsumer(queue)

The consumer of messages from PriorityQueue.

An object of this type is obtained via PriorityQueue.consumer().

Parameters:queue (PriorityQueue) – The queue from which the consumer consumes messages.
ack(message)

Ack the message and remove from the priority queue.

Parameters:message (Message) – The message to ack.
cleanup()

Clean up the consumer. This means all the messages that have been consumed but not acked/rejected are requeued back to the priority queue.

get(block=True, timeout=None)

Consume a message from the priority queue.

Parameters:
  • block (bool) – If True and timeout is None (the default), block until a message is available. If timeout is a positive integer, it blocks at most timeout seconds and raises QueueEmpty if no message was available within that time. Otherwise (i.e., block is False), return a message if one is immediately available, else raise QueueEmpty (timeout is ignored in that case).
  • timeout (int or None) – The timeout in seconds. Note that due to a limitation of brpop command of Redis, a fractional timeout cannot be specified, so the shortest timeout is one second.
id

The identifier which uniquely identifies the consumer.

reject(message, requeue=False)

Reject the message. When requeue is True, the message will be requeued back to the priority queue. Otherwise (False, which is the default), it will be removed.

Parameters:
  • message (Message) – The message to reject.
  • requeue (bool) – Whether to requeue the rejected message or not.

QueueEmpty

class pyrediq.priority_queue.QueueEmpty

The exception raised when a priority queue is empty.

Message

class pyrediq.priority_queue.Message(payload=None, priority=0, _id=None)

The unpacked representation of a message exchanged through the priority queue.

Parameters:
  • payload (msgpack serializable) – The message payload which must be serializable by msgpack.
  • priority (int) – The priority of the message.
  • _id – Do not use; this is only for internal use.
serialize()

Serialize the message into its packed represenation.

Return type:Packed
Returns:The packed representation of the message.

Packed

class pyrediq.priority_queue.Packed

The packed representation of a message. The str-casted value is stored in Redis.

deserialize()

Deserialize Packed object to a Message.

Return type:Message
Returns:The unpacked representation of the object.
get_message_id()

Get the message ID from the packed representation.

Return type:str
Returns:The message ID.
get_priority()

Get the message priority from the packed representation.

Return type:int
Returns:The message priority.
classmethod serialize(message)

Serialize a Message object to its Packed representation.

Parameters:message (Message) – The message to be packed.
Return type:Packed
Returns:The packed representation of the message.