Consuming messages

AMQP normally applies a policy that the consumer of a message is responsible for acknowledging that it has been successfully processed (or otherwise). Messages that are not acknowledged can be redelivered, both upon request (with Channel.basic_recover()) and to another consumer if the connection is closed before acknowledgements have been received. This ensures that messages are not deleted from the server before they have been processed, but does not guarantee that all messages will be processed in a timely fashion.

The core method to begin consuming messages is basic_consume:

MessageChannel.basic_consume(queue='', no_local=False, no_ack=False, exclusive=False, arguments={}, callback=None)[source]

Begin consuming messages from a queue.

Consumers last as long as the channel they were declared on, or until the client cancels them.

Parameters:
  • queue – Specifies the name of the queue to consume from.
  • no_local – Do not deliver own messages. If this flag is set the server will not send messages to the connection that published them.
  • no_ack – Don’t require acknowledgements. If this flag is set the server does not expect acknowledgements for messages. That is, when a message is delivered to the client the server assumes the delivery will succeed and immediately dequeues it. This functionality may increase performance but at the cost of reliability. Messages can get lost if a client dies before they are delivered to the application.
  • exclusive – Request exclusive consumer access, meaning only this consumer can access the queue.
  • arguments – A set of arguments for the consume. The syntax and semantics of these arguments depends on the server implementation.
  • callback – A callback to be called for each message received.

In nucleon.amqp basic_consume has two modes of operations: callback-based, and queue-based.

To use the callback semantics, pass a callback that takes a single parameter. This callback will be called for each message received:

def on_message(message):
    """Process an incoming message."""
    print message.body
    message.ack()

# NB. can't use the context manager here because leaving the context would
# close the channel and terminate the consumer. So use .allocate_channel()
# which allocates a persistent channel.
channel = conn.allocate_channel()

# Set up the queue/binding
channel.queue_declare(queue=qname)
channel.queue_bind(
    queue=qname,
    exchange=exchange,
    routing_key=routing_key
)

# Start consuming
channel.basic_consume(queue=qname, callback=on_message)

The other alternative is used when no callback is passed. In this case, basic_consume returns an object that implements the Python Queue interface. It is then possible to write synchronous consumer code by repeatedly calling MessageQueue.get():

with conn.channel() as channel:
    # Set up the queue/binding
    channel.queue_declare(queue=qname)
    channel.queue_bind(
        queue=qname,
        exchange=exchange,
        routing_key=routing_key
    )

    # Start consuming
    queue = channel.basic_consume(queue=qname)

    while True:
        # Loop forever, handling messages from the queue
        message = queue.get()
        print message.body
        message.ack()

The advantage of this approach is that exceptions will be raised from the call to queue.get(). The queue object also supports cancelling the consumer.

class nucleon.amqp.channels.MessageQueue(channel, consumer_tag)[source]

A queue that receives messages from a consumer, and also exceptions.

cancel()[source]

Cancel consuming with this consumer.

There may still be messages in the queue, of course, which can still be processed, acknowledged, etc.

get(block=True, timeout=None)[source]

Get a message from the queue.

This method may also raise arbitrary AMQP exceptions, such as ConnectionError when the connection is lost.

Parameters:
  • block – If True, block until a message is available in the queue. If False, raise Empty immediately if there are no messages in the queue.
  • timeout – Time in seconds to wait. If not message is received after this time, Empty is raised.
get_nowait()[source]

Get a message from the queue.

Raises Empty immediately if there are no messages in the queue.

Whether you are using the callback approach or the queue-based approach, but perhaps particularly in the case of the latter, you may want to control how many messages can be delivered to nucleon.amqp before being acknowledged. Note that even when using callbacks messages are still queued in the nucleon.amqp machinery pending dispatch, so it is useful to set a limit:

Channel.basic_qos(prefetch_size=0, prefetch_count=0, global_=False)

This method requests a specific quality of service.

The QoS can be specified for the current channel or for all channels on the connection. The particular properties and semantics of a qos method always depend on the content class semantics. Though the qos method could in principle apply to both peers, it is currently meaningful only for the server.

Parameters:
  • prefetch_size – The client can request that messages be sent in advance so that when the client finishes processing a message, the following message is already held locally, rather than needing to be sent down the channel. Prefetching gives a performance improvement. This field specifies the prefetch window size in octets. The server will send a message in advance if it is equal to or smaller in size than the available prefetch size (and also falls into other prefetch limits). May be set to zero, meaning “no specific limit”, although other prefetch limits may still apply. The prefetch-size is ignored if the no-ack option is set.
  • prefetch_count – Specifies a prefetch window in terms of whole messages. This field may be used in combination with the prefetch-size field; a message will only be sent in advance if both prefetch windows (and those at the channel and connection level) allow it. The prefetch-count is ignored if the no-ack option is set.
  • global – By default the QoS settings apply to the current channel only. If this field is set, they are applied to the entire connection.

Note that this limit is ignored if the consumer was started with the no_ack flag.

Message Object

Methods that contain messages are received from nucleon.amqp as a message object which provides a higher-level interface for interpreting messages, as well as operations that act on an already received message, such as basic_ack() and basic_reject().

class nucleon.amqp.message.Message(channel, frame, headers, body)[source]

A wrapper for a received AMQP message.

This class presents an API that allows messages to be conveniently consumed; typical operations can be performed in the callback by accessing properties and calling methods of the message object.

headers

A dictionary of headers and basic message properties associated with the message.

ack(**kwargs)[source]

Acknowledge the message.

cancel_consume(**kwargs)[source]

Cancel the consumer.

consumer_tag[source]

Retrieve the consumer tag.

If the message has been retrieved with basic_get, it won’t have this.

delivery_tag[source]

Retrieve the delivery tag.

exchange[source]

Retrieve the exchange.

redelivered[source]

Retrieve the redelivery status.

reject(**kwargs)[source]

Reject a message, returning it to the queue.

Note that this doesn’t mean the message won’t be redelivered to this same client. As the spec says:

“The client MUST NOT use this method as a means of selecting messages to process.”
reply(**kwargs)[source]

Publish a new message back to the connection

routing_key[source]

Retrieve the routing key.

Acknowledging or Rejecting Messages

Unless you have set the no_ack flag when consuming the message, it is necessary to acknowledge or reject a message to inform the server that the message has been successfully processed, or couldn’t be processed respectively.

The Message Object has shortcuts to do this, but it is also possible to call these methods on the channel itself. To call these it is necessary to pass the message’s server-generated delivery_tag to the corresponding method:

Channel.basic_ack(delivery_tag=0, multiple=False)

Warning

This is an asynchronous method.

When sent by the client, this method acknowledges one or more messages delivered via the Deliver or Get-Ok methods.

When sent by server, this method acknowledges one or more messages published with the Publish method on a channel in confirm mode. The acknowledgement can be for a single message or a set of messages up to and including a specific message.

Parameters:multiple – If set to 1, the delivery tag is treated as “up to and including”, so that multiple messages can be acknowledged with a single method. If set to zero, the delivery tag refers to a single message. If the multiple field is 1, and the delivery tag is zero, this indicates acknowledgement of all outstanding messages.
Channel.basic_reject(delivery_tag=0, requeue=True)

Warning

This is an asynchronous method.

This method allows a client to reject a message.

It can be used to interrupt and cancel large incoming messages, or return untreatable messages to their original queue.

Parameters:requeue – If requeue is true, the server will attempt to requeue the message. If requeue is false or the requeue attempt fails the messages are discarded or dead-lettered.

Unacknowledged messages should not be allowed to build up on the server as they will consume resources. Closing the channel without acknowledging messages will automatically cause unacknowledged messages to be requeued, but it is also possible to use recover methods to redeliver or requeue unacknowledged messages:

Channel.basic_recover(requeue=False)

This method asks the server to redeliver all unacknowledged messages on a specified channel.

Zero or more messages may be redelivered. This method replaces the asynchronous Recover.

Parameters:requeue – If this field is zero, the message will be redelivered to the original recipient. If this bit is 1, the server will attempt to requeue the message, potentially then delivering it to an alternative subscriber.
Channel.basic_recover_async(requeue=False)

Warning

This is an asynchronous method.

This method asks the server to redeliver all unacknowledged messages on a specified channel.

Zero or more messages may be redelivered. This method is deprecated in favour of the synchronous Recover/Recover-Ok.

Parameters:requeue – If this field is zero, the message will be redelivered to the original recipient. If this bit is 1, the server will attempt to requeue the message, potentially then delivering it to an alternative subscriber.

Poll a queue

It may be useful to use poll to get the first message in a queue:

Channel.basic_get(queue='', no_ack=False)

This method provides a direct access to the messages in a queue using a synchronous dialogue that is designed for specific types of application where synchronous functionality is more important than performance.

Parameters:queue – Specifies the name of the queue to get a message from.

If there are no messages in a queue, this method returns None, otherwise it returns the first message (which must be acknowledged as usual unless the no_ack flag was set).

Note that this is not the normal way to retrieve messages, for a number of reasons. One is performance - if you are polling then there is unnecessary additional latency between publishing and consuming a message, not to mention a CPU and network cost to making the request even when there are no messages present. Another reason is that some AMQP functionality is built around the concept of active consumers - for example, auto-delete queues. It can also impact the performance of the broker, as it must persist a message rather than simply delivering it straight to an active consumer.

RabbitMQ Extensions

basic.nack is a RabbitMQ extensions for rejecting multiple messages at the same time:

Channel.basic_nack(delivery_tag=0, multiple=False, requeue=True)

Warning

This is an asynchronous method.

This method allows a client to reject one or more incoming messages.

It can be used to interrupt and cancel large incoming messages, or return untreatable messages to their original queue. This method is also used by the server to inform publishers on channels in confirm mode of unhandled messages. If a publisher receives this method, it probably needs to republish the offending messages.

Parameters:
  • multiple – If set to 1, the delivery tag is treated as “up to and including”, so that multiple messages can be rejected with a single method. If set to zero, the delivery tag refers to a single message. If the multiple field is 1, and the delivery tag is zero, this indicates rejection of all outstanding messages.
  • requeue – If requeue is true, the server will attempt to requeue the message. If requeue is false or the requeue attempt fails the messages are discarded or dead-lettered. Clients receiving the Nack methods should ignore this flag.

Table Of Contents

Previous topic

Publishing Messages

Next topic

Transactions

This Page