SmartConsumer

class rejected.consumer.SmartConsumer(settings, process, drop_invalid_messages=None, message_type=None, error_exchange=None, error_max_retry=None, drop_exchange=None)[source]

Base class to ease the implementation of strongly typed message consumers that validate and automatically decode and deserialize the inbound message body based upon the message properties. Additionally, should one of the supported content_encoding types (gzip or bzip2) be specified in the message’s property, it will automatically be decoded.

When publishing a message, the message can be automatically serialized and encoded. If the content_type property is specified, the consumer will attempt to automatically serialize the message body. If the content_encoding property is specified using a supported encoding (gzip or bzip2), it will automatically be encoded as well.

Supported MIME types for automatic serialization and deserialization are:

  • application/json
  • application/pickle
  • application/x-pickle
  • application/x-plist
  • application/x-vnd.python.pickle
  • application/vnd.python.pickle
  • text/csv
  • text/html (with beautifulsoup4 installed)
  • text/xml (with beautifulsoup4 installed)
  • text/yaml
  • text/x-yaml

In any of the consumer base classes, if the MESSAGE_TYPE attribute is set, the type property of incoming messages will be validated against when a message is received, checking for string equality against the MESSAGE_TYPE attribute. If they are not matched, the consumer will not process the message and will drop the message without an exception if the DROP_INVALID_MESSAGES attribute is set to True. If it is False, a ConsumerException is raised.

Note

Since 3.17, SmartConsumer and SmartPublishingConsumer have been combined into the same class.

app_id

Access the current message’s app-id property as an attribute of the consumer class.

Return type:str
body

Return the message body, unencoded if needed, deserialized if possible.

Return type:any
content_encoding

Access the current message’s content-encoding AMQP message property as an attribute of the consumer class.

Return type:str
content_type

Access the current message’s content-type AMQP message property as an attribute of the consumer class.

Return type:str
correlation_id

Access the current message’s correlation-id AMAP message property as an attribute of the consumer class. If the message does not have a correlation-id then, each message is assigned a new UUIDv4 based correlation-id value.

Return type:str
exchange

Access the AMQP exchange the message was published to as an attribute of the consumer class.

Return type:str
expiration

Access the current message’s expiration AMQP message property as an attribute of the consumer class.

Return type:str
finish()

Finishes message processing for the current message. If this is called in prepare(), the process() method is not invoked for the current message.

headers

Access the current message’s headers AMQP message property as an attribute of the consumer class.

Return type:dict
initialize()

Extend this method for any initialization tasks that occur only when the Consumer class is created.

message_id

Access the current message’s message-id AMQP message property as an attribute of the consumer class.

Return type:str
message_type

Access the current message’s type AMQP message property as an attribute of the consumer class.

Return type:str
name

Property returning the name of the consumer class.

Return type:str
on_blocked(name)

Called when a connection for this consumer is blocked.

Override this method to respond to being blocked.

New in version 3.17.

Parameters:name (str) – The connection name that is blocked
on_finish()

Called after a message has been processed.

Override this method to perform cleanup, logging, etc. This method is a counterpart to prepare(). on_finish may not produce any output, as it is called after all processing has taken place.

If an exception is raised during the processing of a message, prepare() is not invoked.

Note

Asynchronous support: Decorate this method with tornado.gen.coroutine() to make it asynchronous.

on_unblocked(name)

Called when a connection for this consumer is unblocked.

Override this method to respond to being blocked.

New in version 3.17.

Parameters:name (str) – The connection name that is blocked
prepare()

Called when a message is received before process().

Note

Asynchronous support: Decorate this method with tornado.gen.coroutine() to make it asynchronous.

If this method returns a Future, execution will not proceed until the Future has completed.

priority

Access the current message’s priority AMQP message property as an attribute of the consumer class.

Return type:int
process()

Extend this method for implementing your Consumer logic.

If the message can not be processed and the Consumer should stop after n failures to process messages, raise the ConsumerException.

Note

Asynchronous support: Decorate this method with tornado.gen.coroutine() to make it asynchronous.

Raises:rejected.consumer.ConsumerException
Raises:rejected.consumer.MessageException
Raises:rejected.consumer.ProcessingException
properties

Access the current message’s AMQP message properties in dict form as an attribute of the consumer class.

Return type:dict
publish_message(exchange, routing_key, properties, body, no_serialization=False, no_encoding=False, channel=None)[source]

Publish a message to RabbitMQ on the same channel the original message was received on.

By default, if you pass a non-string object to the body and the properties have a supported content-type set, the body will be auto-serialized in the specified content-type.

If the properties do not have a timestamp set, it will be set to the current time.

If you specify a content-encoding in the properties and the encoding is supported, the body will be auto-encoded.

Both of these behaviors can be disabled by setting no_serialization or no_encoding to True.

Parameters:
  • exchange (str) – The exchange to publish to
  • routing_key (str) – The routing key to publish with
  • properties (dict) – The message properties
  • body (mixed) – The message body to publish
  • no_serialization (bool) – Turn off auto-serialization of the body
  • no_encoding (bool) – Turn off auto-encoding of the body
  • channel (str) – The channel/connection name to use. If it is not specified, the channel that the message was delivered on is used.
redelivered

Indicates if the current message has been redelivered.

Return type:bool
reply(response_body, properties, auto_id=True, exchange=None, reply_to=None)

Reply to the received message.

If auto_id is True, a new UUIDv4 value will be generated for the message_id AMQP message property. The correlation_id AMQP message property will be set to the message_id of the original message. In addition, the timestamp will be assigned the current time of the message. If auto_id is False, neither the message_id and the correlation_id AMQP properties will be changed in the properties.

If exchange is not set, the exchange the message was received on will be used.

If reply_to is set in the original properties, it will be used as the routing key. If the reply_to is not set in the properties and it is not passed in, a ValueError will be raised. If reply to is set in the properties, it will be cleared out prior to the message being republished.

Parameters:
  • response_body (any) – The message body to send
  • properties (rejected.data.Properties) – Message properties to use
  • auto_id (bool) – Automatically shuffle message_id & correlation_id
  • exchange (str) – Override the exchange to publish to
  • reply_to (str) – Override the reply_to AMQP property
Raises:

ValueError

reply_to

Access the current message’s reply-to AMQP message property as an attribute of the consumer class.

Return type:str
returned

Indicates if the message was delivered by consumer previously and returned from RabbitMQ.

New in version 3.17.

Return type:bool
routing_key

Access the routing key for the current message.

Return type:str
send_exception_to_sentry(exc_info)

Send an exception to Sentry if enabled.

Parameters:exc_info (tuple) – exception information as returned from sys.exc_info()
sentry_client

Access the Sentry raven Client instance or None

Use this object to add tags or additional context to Sentry error reports (see raven.base.Client.tags_context()) or to report messages (via raven.base.Client.captureMessage()) directly to Sentry.

Return type:raven.base.Client
set_sentry_context(tag, value)

Set a context tag in Sentry for the given key and value.

Parameters:
  • tag (str) – The context tag name
  • value (str) – The context value
settings

Access the consumer settings as specified by the config section for the consumer in the rejected configuration.

Return type:dict
shutdown()

Override to cleanly shutdown when rejected is stopping the consumer.

This could be used for closing database connections or other such activities.

stats_add_timing(key, duration)

Add a timing to the per-message measurements

New in version 3.13.0.

Parameters:
  • key (str) – The key to add the timing to
  • duration (int|float) – The timing value
stats_incr(key, value=1)

Increment the specified key in the per-message measurements

New in version 3.13.0.

Parameters:
  • key (str) – The key to increment
  • value (int) – The value to increment the key by
stats_set_tag(key, value=1)

Set the specified tag/value in the per-message measurements

New in version 3.13.0.

Parameters:
  • key (str) – The key to increment
  • value (int) – The value to increment the key by
stats_set_value(key, value=1)

Set the specified key/value in the per-message measurements

New in version 3.13.0.

Parameters:
  • key (str) – The key to increment
  • value (int) – The value to increment the key by
stats_track_duration(*args, **kwds)

Time around a context and add to the the per-message measurements

New in version 3.13.0.

Parameters:key (str) – The key for the timing to track
statsd_add_timing(key, duration)

Add a timing to the per-message measurements

Parameters:
  • key (str) – The key to add the timing to
  • duration (int|float) – The timing value

Deprecated since version 3.13.0.

statsd_incr(key, value=1)

Increment the specified key in the per-message measurements

Parameters:
  • key (str) – The key to increment
  • value (int) – The value to increment the key by

Deprecated since version 3.13.0.

statsd_track_duration(key)

Time around a context and add to the the per-message measurements

Parameters:key (str) – The key for the timing to track

Deprecated since version 3.13.0.

timestamp

Access the unix epoch timestamp value from the AMQP message properties of the current message.

Return type:int
unset_sentry_context(tag)

Remove a context tag from sentry

Parameters:tag (str) – The context tag to remove
user_id

Access the user-id AMQP message property from the current message’s properties.

Return type:str
yield_to_ioloop(*args, **kwargs)

Function that will allow Rejected to process IOLoop events while in a tight-loop inside an asynchronous consumer.

class rejected.consumer.SmartPublishingConsumer(*args, **kwargs)[source]

Deprecated, functionality moved to rejected.consumer.SmartConsumer

Deprecated since version 3.17.0.

app_id

Access the current message’s app-id property as an attribute of the consumer class.

Return type:str
body

Return the message body, unencoded if needed, deserialized if possible.

Return type:any
content_encoding

Access the current message’s content-encoding AMQP message property as an attribute of the consumer class.

Return type:str
content_type

Access the current message’s content-type AMQP message property as an attribute of the consumer class.

Return type:str
correlation_id

Access the current message’s correlation-id AMAP message property as an attribute of the consumer class. If the message does not have a correlation-id then, each message is assigned a new UUIDv4 based correlation-id value.

Return type:str
exchange

Access the AMQP exchange the message was published to as an attribute of the consumer class.

Return type:str
expiration

Access the current message’s expiration AMQP message property as an attribute of the consumer class.

Return type:str
finish()

Finishes message processing for the current message. If this is called in prepare(), the process() method is not invoked for the current message.

headers

Access the current message’s headers AMQP message property as an attribute of the consumer class.

Return type:dict
initialize()

Extend this method for any initialization tasks that occur only when the Consumer class is created.

message_id

Access the current message’s message-id AMQP message property as an attribute of the consumer class.

Return type:str
message_type

Access the current message’s type AMQP message property as an attribute of the consumer class.

Return type:str
name

Property returning the name of the consumer class.

Return type:str
on_blocked(name)

Called when a connection for this consumer is blocked.

Override this method to respond to being blocked.

New in version 3.17.

Parameters:name (str) – The connection name that is blocked
on_finish()

Called after a message has been processed.

Override this method to perform cleanup, logging, etc. This method is a counterpart to prepare(). on_finish may not produce any output, as it is called after all processing has taken place.

If an exception is raised during the processing of a message, prepare() is not invoked.

Note

Asynchronous support: Decorate this method with tornado.gen.coroutine() to make it asynchronous.

on_unblocked(name)

Called when a connection for this consumer is unblocked.

Override this method to respond to being blocked.

New in version 3.17.

Parameters:name (str) – The connection name that is blocked
prepare()

Called when a message is received before process().

Note

Asynchronous support: Decorate this method with tornado.gen.coroutine() to make it asynchronous.

If this method returns a Future, execution will not proceed until the Future has completed.

priority

Access the current message’s priority AMQP message property as an attribute of the consumer class.

Return type:int
process()

Extend this method for implementing your Consumer logic.

If the message can not be processed and the Consumer should stop after n failures to process messages, raise the ConsumerException.

Note

Asynchronous support: Decorate this method with tornado.gen.coroutine() to make it asynchronous.

Raises:rejected.consumer.ConsumerException
Raises:rejected.consumer.MessageException
Raises:rejected.consumer.ProcessingException
properties

Access the current message’s AMQP message properties in dict form as an attribute of the consumer class.

Return type:dict
publish_message(exchange, routing_key, properties, body, no_serialization=False, no_encoding=False, channel=None)

Publish a message to RabbitMQ on the same channel the original message was received on.

By default, if you pass a non-string object to the body and the properties have a supported content-type set, the body will be auto-serialized in the specified content-type.

If the properties do not have a timestamp set, it will be set to the current time.

If you specify a content-encoding in the properties and the encoding is supported, the body will be auto-encoded.

Both of these behaviors can be disabled by setting no_serialization or no_encoding to True.

Parameters:
  • exchange (str) – The exchange to publish to
  • routing_key (str) – The routing key to publish with
  • properties (dict) – The message properties
  • body (mixed) – The message body to publish
  • no_serialization (bool) – Turn off auto-serialization of the body
  • no_encoding (bool) – Turn off auto-encoding of the body
  • channel (str) – The channel/connection name to use. If it is not specified, the channel that the message was delivered on is used.
redelivered

Indicates if the current message has been redelivered.

Return type:bool
reply(response_body, properties, auto_id=True, exchange=None, reply_to=None)

Reply to the received message.

If auto_id is True, a new UUIDv4 value will be generated for the message_id AMQP message property. The correlation_id AMQP message property will be set to the message_id of the original message. In addition, the timestamp will be assigned the current time of the message. If auto_id is False, neither the message_id and the correlation_id AMQP properties will be changed in the properties.

If exchange is not set, the exchange the message was received on will be used.

If reply_to is set in the original properties, it will be used as the routing key. If the reply_to is not set in the properties and it is not passed in, a ValueError will be raised. If reply to is set in the properties, it will be cleared out prior to the message being republished.

Parameters:
  • response_body (any) – The message body to send
  • properties (rejected.data.Properties) – Message properties to use
  • auto_id (bool) – Automatically shuffle message_id & correlation_id
  • exchange (str) – Override the exchange to publish to
  • reply_to (str) – Override the reply_to AMQP property
Raises:

ValueError

reply_to

Access the current message’s reply-to AMQP message property as an attribute of the consumer class.

Return type:str
returned

Indicates if the message was delivered by consumer previously and returned from RabbitMQ.

New in version 3.17.

Return type:bool
routing_key

Access the routing key for the current message.

Return type:str
send_exception_to_sentry(exc_info)

Send an exception to Sentry if enabled.

Parameters:exc_info (tuple) – exception information as returned from sys.exc_info()
sentry_client

Access the Sentry raven Client instance or None

Use this object to add tags or additional context to Sentry error reports (see raven.base.Client.tags_context()) or to report messages (via raven.base.Client.captureMessage()) directly to Sentry.

Return type:raven.base.Client
set_sentry_context(tag, value)

Set a context tag in Sentry for the given key and value.

Parameters:
  • tag (str) – The context tag name
  • value (str) – The context value
settings

Access the consumer settings as specified by the config section for the consumer in the rejected configuration.

Return type:dict
shutdown()

Override to cleanly shutdown when rejected is stopping the consumer.

This could be used for closing database connections or other such activities.

stats_add_timing(key, duration)

Add a timing to the per-message measurements

New in version 3.13.0.

Parameters:
  • key (str) – The key to add the timing to
  • duration (int|float) – The timing value
stats_incr(key, value=1)

Increment the specified key in the per-message measurements

New in version 3.13.0.

Parameters:
  • key (str) – The key to increment
  • value (int) – The value to increment the key by
stats_set_tag(key, value=1)

Set the specified tag/value in the per-message measurements

New in version 3.13.0.

Parameters:
  • key (str) – The key to increment
  • value (int) – The value to increment the key by
stats_set_value(key, value=1)

Set the specified key/value in the per-message measurements

New in version 3.13.0.

Parameters:
  • key (str) – The key to increment
  • value (int) – The value to increment the key by
stats_track_duration(*args, **kwds)

Time around a context and add to the the per-message measurements

New in version 3.13.0.

Parameters:key (str) – The key for the timing to track
statsd_add_timing(key, duration)

Add a timing to the per-message measurements

Parameters:
  • key (str) – The key to add the timing to
  • duration (int|float) – The timing value

Deprecated since version 3.13.0.

statsd_incr(key, value=1)

Increment the specified key in the per-message measurements

Parameters:
  • key (str) – The key to increment
  • value (int) – The value to increment the key by

Deprecated since version 3.13.0.

statsd_track_duration(key)

Time around a context and add to the the per-message measurements

Parameters:key (str) – The key for the timing to track

Deprecated since version 3.13.0.

timestamp

Access the unix epoch timestamp value from the AMQP message properties of the current message.

Return type:int
unset_sentry_context(tag)

Remove a context tag from sentry

Parameters:tag (str) – The context tag to remove
user_id

Access the user-id AMQP message property from the current message’s properties.

Return type:str
yield_to_ioloop(*args, **kwargs)

Function that will allow Rejected to process IOLoop events while in a tight-loop inside an asynchronous consumer.