carrot.messaging

Sending/Receiving Messages.

class carrot.messaging.Consumer(connection, queue=None, exchange=None, routing_key=None, **kwargs)

Message consumer.

Parameters:
connection

The connection to the broker. A carrot.connection.BrokerConnection instance.

queue

Name of the queue.

exchange

Name of the exchange the queue binds to.

routing_key

The routing key (if any). The interpretation of the routing key depends on the value of the exchange_type attribute:

  • direct exchange

    Matches if the routing key property of the message and the routing_key attribute are identical.

  • fanout exchange

    Always matches, even if the binding does not have a key.

  • topic exchange

    Matches the routing key property of the message by a primitive pattern matching scheme. The message routing key then consists of words separated by dots (".", like domain names), and two special characters are available; star ("*") and hash ("#"). The star matches any word, and the hash matches zero or more words. For example "*.stock.#" matches the routing keys "usd.stock" and "eur.stock.db" but not "stock.nasdaq".

durable

Durable exchanges remain active when a server restarts. Non-durable exchanges (transient exchanges) are purged when a server restarts. Default is True.

auto_delete

If set, the exchange is deleted when all queues have finished using it. Default is False.

exclusive

Exclusive queues may only be consumed from by the current connection. When exclusive is on, this also implies auto_delete. Default is False.

exchange_type

AMQP defines four default exchange types (routing algorithms) that covers most of the common messaging use cases. An AMQP broker can also define additional exchange types, so see your message brokers manual for more information about available exchange types.

  • Direct

    Direct match between the routing key in the message, and the routing criteria used when a queue is bound to this exchange.

  • Topic

    Wildcard match between the routing key and the routing pattern specified in the binding. The routing key is treated as zero or more words delimited by "." and supports special wildcard characters. "*" matches a single word and "#" matches zero or more words.

  • Fanout

    Queues are bound to this exchange with no arguments. Hence any message sent to this exchange will be forwarded to all queues bound to this exchange.

  • Headers

    Queues are bound to this exchange with a table of arguments containing headers and values (optional). A special argument named “x-match” determines the matching algorithm, where "all" implies an AND (all pairs must match) and "any" implies OR (at least one pair must match).

    Use the routing_key` is used to specify the arguments, the same when sending messages.

This description of AMQP exchange types was shamelessly stolen from the blog post AMQP in 10 minutes: Part 4 by Rajith Attapattu. Recommended reading.

callbacks

List of registered callbacks to trigger when a message is received by wait(), process_next() or iterqueue().

warn_if_exists

Emit a warning if the queue has already been declared. If a queue already exists, and you try to redeclare the queue with new settings, the new settings will be silently ignored, so this can be useful if you’ve recently changed the routing_key attribute or other settings.

auto_ack

Acknowledgement is handled automatically once messages are received. This means that the carrot.backends.base.BaseMessage.ack() and carrot.backends.base.BaseMessage.reject() methods on the message object are no longer valid. By default auto_ack is set to False, and the receiver is required to manually handle acknowledgment.

no_ack

Disable acknowledgement on the server-side. This is different from auto_ack in that acknowledgement is turned off altogether. This functionality increases performance but at the cost of reliability. Messages can get lost if a client dies before it can deliver them to the application.

Raises amqplib.client_0_8.channel.AMQPChannelException:
 if the queue is exclusive and the queue already exists and is owned by another connection.

Example Usage

>>> consumer = Consumer(connection=DjangoBrokerConnection(),
...               queue="foo", exchange="foo", routing_key="foo")
>>> def process_message(message_data, message):
...     print("Got message %s: %s" % (
...             message.delivery_tag, message_data))
>>> consumer.register_callback(process_message)
>>> consumer.wait() # Go into receive loop
cancel()

Cancel a running iterconsume() session.

close()

Close the channel to the queue.

consume(no_ack=None)

Declare consumer.

declare()

Declares the queue, the exchange and binds the queue to the exchange.

discard_all(filterfunc=None)

Discard all waiting messages.

Parameters:
  • filterfunc – A filter function to only discard the messages this filter returns.
Returns:

the number of messages discarded.

WARNING: All incoming messages will be ignored and not processed.

Example using filter:

>>> def waiting_feeds_only(message):
...     try:
...         message_data = message.decode()
...     except: # Should probably be more specific.
...         pass
...
...     if message_data.get("type") == "feed":
...         return True
...     else:
...         return False
fetch(no_ack=None, auto_ack=None, enable_callbacks=False)

Receive the next message waiting on the queue.

Returns:

A carrot.backends.base.BaseMessage instance, or None if there’s no messages to be received.

Parameters:
  • enable_callbacks – Enable callbacks. The message will be processed with all registered callbacks. Default is disabled.
  • auto_ack – Override the default auto_ack setting.
  • no_ack – Override the default no_ack setting.
flow(active)

This method asks the peer to pause or restart the flow of content data.

This is a simple flow-control mechanism that a peer can use to avoid oveflowing its queues or otherwise finding itself receiving more messages than it can process. Note that this method is not intended for window control. The peer that receives a request to stop sending content should finish sending the current content, if any, and then wait until it receives the flow(active=True) restart method.

iterconsume(limit=None, no_ack=None)

Iterator processing new messages as they arrive. Every new message will be passed to the callbacks, and the iterator returns True. The iterator is infinite unless the limit argument is specified or someone closes the consumer.

iterconsume() uses transient requests for messages on the server, while iterequeue() uses synchronous access. In most cases you want iterconsume(), but if your environment does not support this behaviour you can resort to using iterqueue() instead.

Also, iterconsume() does not return the message at each step, something which iterqueue() does.

Parameters:
  • limit – Maximum number of messages to process.
Raises StopIteration:
 

if limit is set and the message limit has been reached.

iterqueue(limit=None, infinite=False)

Infinite iterator yielding pending messages, by using synchronous direct access to the queue (basic_get).

iterqueue() is used where synchronous functionality is more important than performance. If you can, use iterconsume() instead.

Parameters:
  • limit – If set, the iterator stops when it has processed this number of messages in total.
  • infinite – Don’t raise StopIteration if there is no messages waiting, but return None instead. If infinite you obviously shouldn’t consume the whole iterator at once without using a limit.
Raises StopIteration:
 

If there is no messages waiting, and the iterator is not infinite.

process_next()

DEPRECATED Use fetch() like this instead:

>>> message = self.fetch(enable_callbacks=True)
qos(prefetch_size=0, prefetch_count=0, apply_global=False)

Request specific Quality of Service.

This method requests a specific quality of service. The QoS can be specified for the current channel or for all channels on the connection. The particular properties and semantics of a qos method always depend on the content class semantics. Though the qos method could in principle apply to both peers, it is currently meaningful only for the server.

Parameters:
  • prefetch_size – Prefetch window in octets. The client can request that messages be sent in advance so that when the client finishes processing a message, the following message is already held locally, rather than needing to be sent down the channel. Prefetching gives a performance improvement. This field specifies the prefetch window size in octets. The server will send a message in advance if it is equal to or smaller in size than the available prefetch size (and also falls into other prefetch limits). May be set to zero, meaning “no specific limit”, although other prefetch limits may still apply. The prefetch_size is ignored if the no_ack option is set.
  • prefetch_count – Specifies a prefetch window in terms of whole messages. This field may be used in combination with prefetch_size; A message will only be sent in advance if both prefetch windows (and those at the channel and connection level) allow it. The prefetch- count is ignored if the no_ack option is set.
  • apply_global – By default the QoS settings apply to the current channel only. If this is set, they are applied to the entire connection.
receive(message_data, message)

This method is called when a new message is received by running wait(), process_next() or iterqueue().

When a message is received, it passes the message on to the callbacks listed in the callbacks attribute. You can register callbacks using register_callback().

Parameters:
Raises NotImplementedError:
 

If no callbacks has been registered.

register_callback(callback)

Register a callback function to be triggered by receive().

The callback function must take two arguments:

wait(limit=None)

Go into consume mode.

Mostly for testing purposes and simple programs, you probably want iterconsume() or iterqueue() instead.

This runs an infinite loop, processing all incoming messages using receive() to apply the message to all registered callbacks.

class carrot.messaging.ConsumerSet(connection, from_dict=None, consumers=None, callbacks=None, **options)

Receive messages from multiple consumers.

Parameters:
connection

The connection to the broker. A carrot.connection.BrokerConnection instance.

callbacks

A list of callbacks to be called when a message is received. See Consumer.register_callback.

from_dict

Add consumers from a dictionary configuration:

{
    "webshot": {
                "exchange": "link_exchange",
                "exchange_type": "topic",
                "binding_key": "links.webshot",
                "default_routing_key": "links.webshot",
        },
    "retrieve": {
                "exchange": "link_exchange",
                "exchange_type" = "topic",
                "binding_key": "links.*",
                "default_routing_key": "links.retrieve",
                "auto_delete": True,
                # ...
        },
}
consumers

Add consumers from a list of Consumer instances.

auto_ack

Default value for the Consumer.auto_ack attribute.

add_consumer(consumer)

Add another consumer from a Consumer instance.

add_consumer_from_dict(queue, **options)

Add another consumer from dictionary configuration.

cancel()

Cancel a running iterconsume() session.

cancel_by_queue(queue)
close()

Close all consumers.

consume()

Declare consumers.

discard_all()

Discard all messages. Does not support filtering. See Consumer.discard_all().

flow(active)

This method asks the peer to pause or restart the flow of content data.

See Consumer.flow().

iterconsume(limit=None)

Cycle between all consumers in consume mode.

See Consumer.iterconsume().

qos(prefetch_size=0, prefetch_count=0, apply_global=False)

Request specific Quality of Service.

See Consumer.cos().

receive(message_data, message)

What to do when a message is received. See Consumer.receive().

register_callback(callback)

Register new callback to be called when a message is received. See Consumer.register_callback()

class carrot.messaging.Messaging(connection, **kwargs)

A combined message publisher and consumer.

close()

Close any open channels.

consumer_cls

alias of Consumer

fetch(**kwargs)

See Consumer.fetch()

publisher_cls

alias of Publisher

receive(message_data, message)

See Consumer.receive()

register_callback(callback)

See Consumer.register_callback()

send(message_data, delivery_mode=None)

See Publisher.send()

class carrot.messaging.Publisher(connection, exchange=None, routing_key=None, **kwargs)

Message publisher.

Parameters:
connection

The connection to the broker. A carrot.connection.BrokerConnection instance.

exchange

Name of the exchange we send messages to.

routing_key

The default routing key for messages sent using this publisher. See Consumer.routing_key for more information. You can override the routing key by passing an explicit routing_key argument to send().

delivery_mode

The default delivery mode used for messages. The value is an integer. The following delivery modes are supported by (at least) RabbitMQ:

  • 1 or “transient”

    The message is transient. Which means it is stored in memory only, and is lost if the server dies or restarts.

  • 2 or “persistent”

    The message is persistent. Which means the message is stored both in-memory, and on disk, and therefore preserved if the server dies or restarts.

The default value is 2 (persistent).

exchange_type

See Consumer.exchange_type.

durable

See Consumer.durable.

auto_delete

See Consumer.auto_delete.

auto_declare

If this is True and the exchange name is set, the exchange will be automatically declared at instantiation. You can manually the declare the exchange by using the declare() method.

Auto declare is on by default.

serializer

A string identifying the default serialization method to use. Defaults to json. Can be json (default), raw, pickle, hessian, yaml, or any custom serialization methods that have been registered with carrot.serialization.registry.

close()

Close connection to queue.

create_message(message_data, delivery_mode=None, priority=None, content_type=None, content_encoding=None, serializer=None)

With any data, serialize it and encapsulate it in a AMQP message with the proper headers set.

declare()

Declare the exchange.

Creates the exchange on the broker.

send(message_data, routing_key=None, delivery_mode=None, mandatory=False, immediate=False, priority=0, content_type=None, content_encoding=None, serializer=None, exchange=None)

Send a message.

Parameters:
  • message_data – The message data to send. Can be a list, dictionary or a string.
  • routing_key – A custom routing key for the message. If not set, the default routing key set in the routing_key attribute is used.
  • mandatory – If set, the message has mandatory routing. By default the message is silently dropped by the server if it can’t be routed to a queue. However - If the message is mandatory, an exception will be raised instead.
  • immediate – Request immediate delivery. If the message cannot be routed to a queue consumer immediately, an exception will be raised. This is instead of the default behaviour, where the server will accept and queue the message, but with no guarantee that the message will ever be consumed.
  • delivery_mode – Override the default delivery_mode.
  • priority – The message priority, 0 to 9.
  • content_type – The messages content_type. If content_type is set, no serialization occurs as it is assumed this is either a binary object, or you’ve done your own serialization. Leave blank if using built-in serialization as our library properly sets content_type.
  • content_encoding – The character set in which this object is encoded. Use “binary” if sending in raw binary objects. Leave blank if using built-in serialization as our library properly sets content_encoding.
  • serializer – Override the default serializer.
  • exchange – Override the exchange to publish to. Note that this exchange must have been declared.

Previous topic

carrot.connection

Next topic

carrot.backends

This Page