Sending/Receiving Messages.
Message consumer.
Parameters: |
|
---|
The connection to the broker. A carrot.connection.BrokerConnection instance.
Name of the queue.
Name of the exchange the queue binds to.
The routing key (if any). The interpretation of the routing key depends on the value of the exchange_type attribute:
direct exchange
Matches if the routing key property of the message and the routing_key attribute are identical.
fanout exchange
Always matches, even if the binding does not have a key.
topic exchange
Matches the routing key property of the message by a primitive pattern matching scheme. The message routing key then consists of words separated by dots (".", like domain names), and two special characters are available; star ("*") and hash ("#"). The star matches any word, and the hash matches zero or more words. For example "*.stock.#" matches the routing keys "usd.stock" and "eur.stock.db" but not "stock.nasdaq".
Durable exchanges remain active when a server restarts. Non-durable exchanges (transient exchanges) are purged when a server restarts. Default is True.
If set, the exchange is deleted when all queues have finished using it. Default is False.
Exclusive queues may only be consumed from by the current connection. When exclusive is on, this also implies auto_delete. Default is False.
AMQP defines four default exchange types (routing algorithms) that covers most of the common messaging use cases. An AMQP broker can also define additional exchange types, so see your message brokers manual for more information about available exchange types.
Direct
Direct match between the routing key in the message, and the routing criteria used when a queue is bound to this exchange.
Topic
Wildcard match between the routing key and the routing pattern specified in the binding. The routing key is treated as zero or more words delimited by "." and supports special wildcard characters. "*" matches a single word and "#" matches zero or more words.
Fanout
Queues are bound to this exchange with no arguments. Hence any message sent to this exchange will be forwarded to all queues bound to this exchange.
Headers
Queues are bound to this exchange with a table of arguments containing headers and values (optional). A special argument named “x-match” determines the matching algorithm, where "all" implies an AND (all pairs must match) and "any" implies OR (at least one pair must match).
Use the routing_key` is used to specify the arguments, the same when sending messages.
This description of AMQP exchange types was shamelessly stolen from the blog post AMQP in 10 minutes: Part 4 by Rajith Attapattu. Recommended reading.
List of registered callbacks to trigger when a message is received by wait(), process_next() or iterqueue().
Emit a warning if the queue has already been declared. If a queue already exists, and you try to redeclare the queue with new settings, the new settings will be silently ignored, so this can be useful if you’ve recently changed the routing_key attribute or other settings.
Acknowledgement is handled automatically once messages are received. This means that the carrot.backends.base.BaseMessage.ack() and carrot.backends.base.BaseMessage.reject() methods on the message object are no longer valid. By default auto_ack is set to False, and the receiver is required to manually handle acknowledgment.
Disable acknowledgement on the server-side. This is different from auto_ack in that acknowledgement is turned off altogether. This functionality increases performance but at the cost of reliability. Messages can get lost if a client dies before it can deliver them to the application.
Raises amqplib.client_0_8.channel.AMQPChannelException: | |
---|---|
if the queue is exclusive and the queue already exists and is owned by another connection. |
Example Usage
>>> consumer = Consumer(connection=DjangoBrokerConnection(),
... queue="foo", exchange="foo", routing_key="foo")
>>> def process_message(message_data, message):
... print("Got message %s: %s" % (
... message.delivery_tag, message_data))
>>> consumer.register_callback(process_message)
>>> consumer.wait() # Go into receive loop
Cancel a running iterconsume() session.
Close the channel to the queue.
Declare consumer.
Declares the queue, the exchange and binds the queue to the exchange.
Discard all waiting messages.
Parameters: |
|
---|---|
Returns: | the number of messages discarded. |
WARNING: All incoming messages will be ignored and not processed.
Example using filter:
>>> def waiting_feeds_only(message):
... try:
... message_data = message.decode()
... except: # Should probably be more specific.
... pass
...
... if message_data.get("type") == "feed":
... return True
... else:
... return False
Receive the next message waiting on the queue.
Returns: | A carrot.backends.base.BaseMessage instance, or None if there’s no messages to be received. |
---|---|
Parameters: |
This method asks the peer to pause or restart the flow of content data.
This is a simple flow-control mechanism that a peer can use to avoid oveflowing its queues or otherwise finding itself receiving more messages than it can process. Note that this method is not intended for window control. The peer that receives a request to stop sending content should finish sending the current content, if any, and then wait until it receives the flow(active=True) restart method.
Iterator processing new messages as they arrive. Every new message will be passed to the callbacks, and the iterator returns True. The iterator is infinite unless the limit argument is specified or someone closes the consumer.
iterconsume() uses transient requests for messages on the server, while iterequeue() uses synchronous access. In most cases you want iterconsume(), but if your environment does not support this behaviour you can resort to using iterqueue() instead.
Also, iterconsume() does not return the message at each step, something which iterqueue() does.
Parameters: |
|
---|---|
Raises StopIteration: | |
if limit is set and the message limit has been reached. |
Infinite iterator yielding pending messages, by using synchronous direct access to the queue (basic_get).
iterqueue() is used where synchronous functionality is more important than performance. If you can, use iterconsume() instead.
Parameters: |
|
---|---|
Raises StopIteration: | |
If there is no messages waiting, and the iterator is not infinite. |
DEPRECATED Use fetch() like this instead:
>>> message = self.fetch(enable_callbacks=True)
Request specific Quality of Service.
This method requests a specific quality of service. The QoS can be specified for the current channel or for all channels on the connection. The particular properties and semantics of a qos method always depend on the content class semantics. Though the qos method could in principle apply to both peers, it is currently meaningful only for the server.
Parameters: |
|
---|
This method is called when a new message is received by running wait(), process_next() or iterqueue().
When a message is received, it passes the message on to the callbacks listed in the callbacks attribute. You can register callbacks using register_callback().
Parameters: |
|
---|---|
Raises NotImplementedError: | |
If no callbacks has been registered. |
Register a callback function to be triggered by receive().
The callback function must take two arguments:
message_data
The deserialized message data
message
The carrot.backends.base.BaseMessage instance.
Go into consume mode.
Mostly for testing purposes and simple programs, you probably want iterconsume() or iterqueue() instead.
This runs an infinite loop, processing all incoming messages using receive() to apply the message to all registered callbacks.
Receive messages from multiple consumers.
Parameters: |
|
---|
The connection to the broker. A carrot.connection.BrokerConnection instance.
A list of callbacks to be called when a message is received. See Consumer.register_callback.
Add consumers from a dictionary configuration:
{
"webshot": {
"exchange": "link_exchange",
"exchange_type": "topic",
"binding_key": "links.webshot",
"default_routing_key": "links.webshot",
},
"retrieve": {
"exchange": "link_exchange",
"exchange_type" = "topic",
"binding_key": "links.*",
"default_routing_key": "links.retrieve",
"auto_delete": True,
# ...
},
}
Default value for the Consumer.auto_ack attribute.
Add another consumer from dictionary configuration.
Cancel a running iterconsume() session.
Close all consumers.
Declare consumers.
Discard all messages. Does not support filtering. See Consumer.discard_all().
This method asks the peer to pause or restart the flow of content data.
See Consumer.flow().
Cycle between all consumers in consume mode.
Request specific Quality of Service.
See Consumer.cos().
What to do when a message is received. See Consumer.receive().
Register new callback to be called when a message is received. See Consumer.register_callback()
A combined message publisher and consumer.
Close any open channels.
See Consumer.fetch()
See Publisher.send()
Message publisher.
Parameters: |
|
---|
The connection to the broker. A carrot.connection.BrokerConnection instance.
Name of the exchange we send messages to.
The default routing key for messages sent using this publisher. See Consumer.routing_key for more information. You can override the routing key by passing an explicit routing_key argument to send().
The default delivery mode used for messages. The value is an integer. The following delivery modes are supported by (at least) RabbitMQ:
1 or “transient”
The message is transient. Which means it is stored in memory only, and is lost if the server dies or restarts.
- 2 or “persistent”
The message is persistent. Which means the message is stored both in-memory, and on disk, and therefore preserved if the server dies or restarts.
The default value is 2 (persistent).
See Consumer.durable.
See Consumer.auto_delete.
If this is True and the exchange name is set, the exchange will be automatically declared at instantiation. You can manually the declare the exchange by using the declare() method.
Auto declare is on by default.
A string identifying the default serialization method to use. Defaults to json. Can be json (default), raw, pickle, hessian, yaml, or any custom serialization methods that have been registered with carrot.serialization.registry.
Close connection to queue.
With any data, serialize it and encapsulate it in a AMQP message with the proper headers set.
Declare the exchange.
Creates the exchange on the broker.
Send a message.
Parameters: |
|
---|