priority_queue¶
Priority message queue with Redis
PriorityQueue¶
-
class
pyrediq.priority_queue.PriorityQueue(name, redis_conn=None)¶ The priority queue implementation using multiple Redis lists.
Parameters: - name (
str) – The queue name. - redis_conn (
redis.StrictRedis) – The Redis connection client. If not provided, a newStrictRedisclient is created without any arguments.
-
MAX_PRIORITY= 7¶ The maximum priority allowed. (This cannot be changed.)
-
MIN_PRIORITY= -8¶ The minimum priority allowed. (This cannot be changed.)
-
consumer(*args, **kwds)¶ Get a message consumer for the priority queue. This method should be used with a context manager (i.e., the with statement) so that the appropriate consumer cleanup action gets run once the consumer is no longer needed.
Return type: MessageConsumerReturns: The message consumer of the priority queue.
-
is_empty()¶ Test if the priority queue is empty. The message count used for the check includes the messages currently processed by active consumers.
Return type: boolReturns: Trueif empty,Falseif not empty.
-
name¶ The name of the priority queue.
-
purge()¶ Purge the priority queue.
Raises: RuntimeErrorwhen consumers are active.
-
put(payload, priority=0)¶ Create a new message in the priority queue.
Parameters: - payload (
msgpackserializable) – The payload for a new message. - priority (
int) – The priority for the new message. It must be in the range of [-8, 7], inclusive, and a value outside the range will be capped to the min/max.
- payload (
-
size()¶ The number of messages in the priority queue. The message count includes the messages currently processed by active consumers.
Return type: intReturns: The message count.
- name (
MessageConsumer¶
-
class
pyrediq.priority_queue.MessageConsumer(queue)¶ The consumer of messages from
PriorityQueue.An object of this type is obtained via
PriorityQueue.consumer().Parameters: queue ( PriorityQueue) – The queue from which the consumer consumes messages.-
ack(message)¶ Ack the message and remove from the priority queue.
Parameters: message ( Message) – The message to ack.
-
cleanup()¶ Clean up the consumer. This means all the messages that have been consumed but not acked/rejected are requeued back to the priority queue.
-
get(block=True, timeout=None)¶ Consume a message from the priority queue.
Parameters: - block (
bool) – IfTrueand timeout isNone(the default), block until a message is available. If timeout is a positive integer, it blocks at most timeout seconds and raisesQueueEmptyif no message was available within that time. Otherwise (i.e., block isFalse), return a message if one is immediately available, else raiseQueueEmpty(timeout is ignored in that case). - timeout (
intorNone) – The timeout in seconds. Note that due to a limitation of brpop command of Redis, a fractional timeout cannot be specified, so the shortest timeout is one second.
- block (
-
id¶ The identifier which uniquely identifies the consumer.
-
reject(message, requeue=False)¶ Reject the message. When requeue is
True, the message will be requeued back to the priority queue. Otherwise (False, which is the default), it will be removed.Parameters: - message (
Message) – The message to reject. - requeue (
bool) – Whether to requeue the rejected message or not.
- message (
-
QueueEmpty¶
-
class
pyrediq.priority_queue.QueueEmpty¶ The exception raised when a priority queue is empty.
Message¶
-
class
pyrediq.priority_queue.Message(payload=None, priority=0, _id=None)¶ The unpacked representation of a message exchanged through the priority queue.
Parameters: - payload (
msgpackserializable) – The message payload which must be serializable bymsgpack. - priority (
int) – The priority of the message. - _id – Do not use; this is only for internal use.
- payload (
Packed¶
-
class
pyrediq.priority_queue.Packed¶ The packed representation of a message. The
str-casted value is stored in Redis.-
deserialize()¶ Deserialize
Packedobject to aMessage.Return type: MessageReturns: The unpacked representation of the object.
-
get_message_id()¶ Get the message ID from the packed representation.
Return type: strReturns: The message ID.
-
get_priority()¶ Get the message priority from the packed representation.
Return type: intReturns: The message priority.
-