Source code for nucleon.amqp.message
[docs]class Message(object):
"""A wrapper for a received AMQP message.
This class presents an API that allows messages to be conveniently
consumed; typical operations can be performed in the callback by accessing
properties and calling methods of the message object.
"""
def __init__(self, channel, frame, headers, body):
"""Construct a message.
`conn` is a PukaConnection that will be used to communicate with the
source AMQP server. `result` is the puka Frame received.
"""
self.channel = channel
self._frame = frame
self.headers = headers
self.body = body
def __getitem__(self, key):
"""Allow attributes to be read with subscript.
This is for backwards compatibility.
"""
return getattr(self._frame, key)
@property
[docs] def exchange(self):
"""Retrieve the exchange."""
return self._frame.exchange
@property
[docs] def routing_key(self):
"""Retrieve the routing key."""
return self._frame.routing_key
@property
[docs] def redelivered(self):
"""Retrieve the redelivery status."""
return self._frame.redelivered
@property
[docs] def delivery_tag(self):
"""Retrieve the delivery tag."""
return self._frame.delivery_tag
@property
[docs] def consumer_tag(self):
"""Retrieve the consumer tag.
If the message has been retrieved with basic_get, it won't have this.
"""
return self._frame.consumer_tag
[docs] def ack(self, **kwargs):
"""Acknowledge the message."""
self.channel.basic_ack(self.delivery_tag, **kwargs)
[docs] def reply(self, **kwargs):
"""Publish a new message back to the connection"""
params = {
'exchange': self.exchange,
'routing_key': self.routing_key
}
params.update(kwargs)
self.channel.basic_publish(**params)
[docs] def reject(self, **kwargs):
"""Reject a message, returning it to the queue.
Note that this doesn't mean the message won't be redelivered to this
same client. As the spec says:
"The client MUST NOT use this method as a means of selecting
messages to process."
"""
self.channel.basic_reject(self.delivery_tag, **kwargs)
[docs] def cancel_consume(self, **kwargs):
"""Cancel the consumer."""
self.channel.basic_cancel(self.consumer_tag, **kwargs)