Consumer¶
-
class
rejected.consumer.
Consumer
(settings, process, drop_invalid_messages=None, message_type=None, error_exchange=None, error_max_retry=None, drop_exchange=None)[source]¶ Base consumer class that defines the contract between rejected and consumer applications.
In any of the consumer base classes, if the
message_type
is specified in the configuration (or set with theMESSAGE_TYPE
attribute), thetype
property of incoming messages will be validated against when a message is received. If there is no match, the consumer will not process the message and will drop the message without an exception if thedrop_invalid_messages
setting is set toTrue
in the configuration (or if theDROP_INVALID_MESSAGES
attribute is set toTrue
). If it isFalse
, aMessageException
is raised.If
DROP_EXCHANGE
is specified either as an attribute of the consumer class or in the consumer configuration, if a message is dropped, it is published to the that exchange prior to rejecting the message in RabbitMQ. When the message is republished, four new values are added to the AMQPheaders
message property:X-Dropped-By
,X-Dropped-Reason
,X-Dropped-Timestamp
,X-Original-Exchange
.The
X-Dropped-By
header value contains the configured name of the consumer that dropped the message.X-Dropped-Reason
contains the reason the message was dropped (eg invalid message type or maximum error count).X-Dropped-Timestamp
value contains the ISO-8601 formatted timestamp of when the message was dropped. Finally, theX-Original-Exchange
value contains the original exchange that the message was published to.If a consumer raises a
ProcessingException
, the message that was being processed will be republished to the exchange specified by theerror
exchange configuration value or theERROR_EXCHANGE
attribute of the consumer’s class. The message will be published using the routing key that was last used for the message. The original message body and properties will be used and two additional header property values may be added:X-Processing-Exception
contains the string value of the- exception that was raised, if specified.
X-Processing-Exceptions
contains the quantity of processing- exceptions that have been raised for the message.
In combination with a queue that has
x-message-ttl
set andx-dead-letter-exchange
that points to the original exchange for the queue the consumer is consuming off of, you can implement a delayed retry cycle for messages that are failing to process due to external resource or service issues.If
error_max_retry
is specified in the configuration orERROR_MAX_RETRY
is set on the class, the headers for each method will be inspected and if the value ofX-Processing-Exceptions
is greater than or equal to the specified value, the message will be dropped.Note
Since 3.17,
Consumer
andPublishingConsumer
have been combined into the same class.-
app_id
¶ Access the current message’s
app-id
property as an attribute of the consumer class.Return type: str
-
content_encoding
¶ Access the current message’s
content-encoding
AMQP message property as an attribute of the consumer class.Return type: str
-
content_type
¶ Access the current message’s
content-type
AMQP message property as an attribute of the consumer class.Return type: str
-
correlation_id
¶ Access the current message’s
correlation-id
AMAP message property as an attribute of the consumer class. If the message does not have acorrelation-id
then, each message is assigned a new UUIDv4 basedcorrelation-id
value.Return type: str
-
exchange
¶ Access the AMQP exchange the message was published to as an attribute of the consumer class.
Return type: str
-
expiration
¶ Access the current message’s
expiration
AMQP message property as an attribute of the consumer class.Return type: str
-
finish
()[source]¶ Finishes message processing for the current message. If this is called in
prepare()
, theprocess()
method is not invoked for the current message.
-
headers
¶ Access the current message’s
headers
AMQP message property as an attribute of the consumer class.Return type: dict
-
initialize
()[source]¶ Extend this method for any initialization tasks that occur only when the
Consumer
class is created.
-
message_id
¶ Access the current message’s
message-id
AMQP message property as an attribute of the consumer class.Return type: str
-
message_type
¶ Access the current message’s
type
AMQP message property as an attribute of the consumer class.Return type: str
-
on_blocked
(name)[source]¶ Called when a connection for this consumer is blocked.
Override this method to respond to being blocked.
New in version 3.17.
Parameters: name (str) – The connection name that is blocked
-
on_finish
()[source]¶ Called after a message has been processed.
Override this method to perform cleanup, logging, etc. This method is a counterpart to
prepare()
.on_finish
may not produce any output, as it is called after all processing has taken place.If an exception is raised during the processing of a message,
prepare()
is not invoked.Note
Asynchronous support: Decorate this method with
tornado.gen.coroutine()
to make it asynchronous.
-
on_unblocked
(name)[source]¶ Called when a connection for this consumer is unblocked.
Override this method to respond to being blocked.
New in version 3.17.
Parameters: name (str) – The connection name that is blocked
-
prepare
()[source]¶ Called when a message is received before
process()
.Note
Asynchronous support: Decorate this method with
tornado.gen.coroutine()
to make it asynchronous.If this method returns a
Future
, execution will not proceed until the Future has completed.
-
priority
¶ Access the current message’s
priority
AMQP message property as an attribute of the consumer class.Return type: int
-
process
()[source]¶ Extend this method for implementing your Consumer logic.
If the message can not be processed and the Consumer should stop after n failures to process messages, raise the
ConsumerException
.Note
Asynchronous support: Decorate this method with
tornado.gen.coroutine()
to make it asynchronous.Raises: rejected.consumer.ConsumerException
Raises: rejected.consumer.MessageException
Raises: rejected.consumer.ProcessingException
-
properties
¶ Access the current message’s AMQP message properties in dict form as an attribute of the consumer class.
Return type: dict
-
publish_message
(exchange, routing_key, properties, body, channel=None)[source]¶ Publish a message to RabbitMQ on the same channel the original message was received on.
Parameters:
-
reply
(response_body, properties, auto_id=True, exchange=None, reply_to=None)[source]¶ Reply to the received message.
If
auto_id
isTrue
, a new UUIDv4 value will be generated for themessage_id
AMQP message property. Thecorrelation_id
AMQP message property will be set to themessage_id
of the original message. In addition, thetimestamp
will be assigned the current time of the message. Ifauto_id
isFalse
, neither themessage_id
and thecorrelation_id
AMQP properties will be changed in the properties.If
exchange
is not set, the exchange the message was received on will be used.If
reply_to
is set in the original properties, it will be used as the routing key. If thereply_to
is not set in the properties and it is not passed in, aValueError
will be raised. If reply to is set in the properties, it will be cleared out prior to the message being republished.Parameters: - response_body (any) – The message body to send
- properties (
rejected.data.Properties
) – Message properties to use - auto_id (bool) – Automatically shuffle
message_id
&correlation_id
- exchange (str) – Override the exchange to publish to
- reply_to (str) – Override the
reply_to
AMQP property
Raises:
-
reply_to
¶ Access the current message’s
reply-to
AMQP message property as an attribute of the consumer class.Return type: str
-
returned
¶ Indicates if the message was delivered by consumer previously and returned from RabbitMQ.
New in version 3.17.
Return type: bool
-
send_exception_to_sentry
(exc_info)[source]¶ Send an exception to Sentry if enabled.
Parameters: exc_info (tuple) – exception information as returned from sys.exc_info()
-
sentry_client
¶ Access the Sentry raven
Client
instance orNone
Use this object to add tags or additional context to Sentry error reports (see
raven.base.Client.tags_context()
) or to report messages (viaraven.base.Client.captureMessage()
) directly to Sentry.Return type: raven.base.Client
-
set_sentry_context
(tag, value)[source]¶ Set a context tag in Sentry for the given key and value.
Parameters:
-
settings
¶ Access the consumer settings as specified by the
config
section for the consumer in the rejected configuration.Return type: dict
-
shutdown
()[source]¶ Override to cleanly shutdown when rejected is stopping the consumer.
This could be used for closing database connections or other such activities.
-
stats_add_timing
(key, duration)[source]¶ Add a timing to the per-message measurements
New in version 3.13.0.
Parameters: - key (str) – The key to add the timing to
- duration (int|float) – The timing value
-
stats_incr
(key, value=1)[source]¶ Increment the specified key in the per-message measurements
New in version 3.13.0.
Parameters:
-
stats_set_tag
(key, value=1)[source]¶ Set the specified tag/value in the per-message measurements
New in version 3.13.0.
Parameters:
-
stats_set_value
(key, value=1)[source]¶ Set the specified key/value in the per-message measurements
New in version 3.13.0.
Parameters:
-
stats_track_duration
(*args, **kwds)[source]¶ Time around a context and add to the the per-message measurements
New in version 3.13.0.
Parameters: key (str) – The key for the timing to track
-
statsd_add_timing
(key, duration)[source]¶ Add a timing to the per-message measurements
Parameters: - key (str) – The key to add the timing to
- duration (int|float) – The timing value
Deprecated since version 3.13.0.
-
statsd_incr
(key, value=1)[source]¶ Increment the specified key in the per-message measurements
Parameters: Deprecated since version 3.13.0.
-
statsd_track_duration
(key)[source]¶ Time around a context and add to the the per-message measurements
Parameters: key (str) – The key for the timing to track Deprecated since version 3.13.0.
-
timestamp
¶ Access the unix epoch timestamp value from the AMQP message properties of the current message.
Return type: int
-
unset_sentry_context
(tag)[source]¶ Remove a context tag from sentry
Parameters: tag (str) – The context tag to remove
-
class
rejected.consumer.
PublishingConsumer
(*args, **kwargs)[source]¶ Deprecated, functionality moved to
rejected.consumer.Consumer
Deprecated since version 3.17.0.
-
app_id
¶ Access the current message’s
app-id
property as an attribute of the consumer class.Return type: str
-
content_encoding
¶ Access the current message’s
content-encoding
AMQP message property as an attribute of the consumer class.Return type: str
-
content_type
¶ Access the current message’s
content-type
AMQP message property as an attribute of the consumer class.Return type: str
-
correlation_id
¶ Access the current message’s
correlation-id
AMAP message property as an attribute of the consumer class. If the message does not have acorrelation-id
then, each message is assigned a new UUIDv4 basedcorrelation-id
value.Return type: str
-
exchange
¶ Access the AMQP exchange the message was published to as an attribute of the consumer class.
Return type: str
-
expiration
¶ Access the current message’s
expiration
AMQP message property as an attribute of the consumer class.Return type: str
-
finish
()¶ Finishes message processing for the current message. If this is called in
prepare()
, theprocess()
method is not invoked for the current message.
-
headers
¶ Access the current message’s
headers
AMQP message property as an attribute of the consumer class.Return type: dict
-
initialize
()¶ Extend this method for any initialization tasks that occur only when the
Consumer
class is created.
-
message_id
¶ Access the current message’s
message-id
AMQP message property as an attribute of the consumer class.Return type: str
-
message_type
¶ Access the current message’s
type
AMQP message property as an attribute of the consumer class.Return type: str
-
on_blocked
(name)¶ Called when a connection for this consumer is blocked.
Override this method to respond to being blocked.
New in version 3.17.
Parameters: name (str) – The connection name that is blocked
-
on_finish
()¶ Called after a message has been processed.
Override this method to perform cleanup, logging, etc. This method is a counterpart to
prepare()
.on_finish
may not produce any output, as it is called after all processing has taken place.If an exception is raised during the processing of a message,
prepare()
is not invoked.Note
Asynchronous support: Decorate this method with
tornado.gen.coroutine()
to make it asynchronous.
-
on_unblocked
(name)¶ Called when a connection for this consumer is unblocked.
Override this method to respond to being blocked.
New in version 3.17.
Parameters: name (str) – The connection name that is blocked
-
prepare
()¶ Called when a message is received before
process()
.Note
Asynchronous support: Decorate this method with
tornado.gen.coroutine()
to make it asynchronous.If this method returns a
Future
, execution will not proceed until the Future has completed.
-
priority
¶ Access the current message’s
priority
AMQP message property as an attribute of the consumer class.Return type: int
-
process
()¶ Extend this method for implementing your Consumer logic.
If the message can not be processed and the Consumer should stop after n failures to process messages, raise the
ConsumerException
.Note
Asynchronous support: Decorate this method with
tornado.gen.coroutine()
to make it asynchronous.Raises: rejected.consumer.ConsumerException
Raises: rejected.consumer.MessageException
Raises: rejected.consumer.ProcessingException
-
properties
¶ Access the current message’s AMQP message properties in dict form as an attribute of the consumer class.
Return type: dict
-
publish_message
(exchange, routing_key, properties, body, channel=None)¶ Publish a message to RabbitMQ on the same channel the original message was received on.
Parameters:
-
reply
(response_body, properties, auto_id=True, exchange=None, reply_to=None)¶ Reply to the received message.
If
auto_id
isTrue
, a new UUIDv4 value will be generated for themessage_id
AMQP message property. Thecorrelation_id
AMQP message property will be set to themessage_id
of the original message. In addition, thetimestamp
will be assigned the current time of the message. Ifauto_id
isFalse
, neither themessage_id
and thecorrelation_id
AMQP properties will be changed in the properties.If
exchange
is not set, the exchange the message was received on will be used.If
reply_to
is set in the original properties, it will be used as the routing key. If thereply_to
is not set in the properties and it is not passed in, aValueError
will be raised. If reply to is set in the properties, it will be cleared out prior to the message being republished.Parameters: - response_body (any) – The message body to send
- properties (
rejected.data.Properties
) – Message properties to use - auto_id (bool) – Automatically shuffle
message_id
&correlation_id
- exchange (str) – Override the exchange to publish to
- reply_to (str) – Override the
reply_to
AMQP property
Raises:
-
reply_to
¶ Access the current message’s
reply-to
AMQP message property as an attribute of the consumer class.Return type: str
-
returned
¶ Indicates if the message was delivered by consumer previously and returned from RabbitMQ.
New in version 3.17.
Return type: bool
-
send_exception_to_sentry
(exc_info)¶ Send an exception to Sentry if enabled.
Parameters: exc_info (tuple) – exception information as returned from sys.exc_info()
-
sentry_client
¶ Access the Sentry raven
Client
instance orNone
Use this object to add tags or additional context to Sentry error reports (see
raven.base.Client.tags_context()
) or to report messages (viaraven.base.Client.captureMessage()
) directly to Sentry.Return type: raven.base.Client
-
set_sentry_context
(tag, value)¶ Set a context tag in Sentry for the given key and value.
Parameters:
-
settings
¶ Access the consumer settings as specified by the
config
section for the consumer in the rejected configuration.Return type: dict
-
shutdown
()¶ Override to cleanly shutdown when rejected is stopping the consumer.
This could be used for closing database connections or other such activities.
-
stats_add_timing
(key, duration)¶ Add a timing to the per-message measurements
New in version 3.13.0.
Parameters: - key (str) – The key to add the timing to
- duration (int|float) – The timing value
-
stats_incr
(key, value=1)¶ Increment the specified key in the per-message measurements
New in version 3.13.0.
Parameters:
-
stats_set_tag
(key, value=1)¶ Set the specified tag/value in the per-message measurements
New in version 3.13.0.
Parameters:
-
stats_set_value
(key, value=1)¶ Set the specified key/value in the per-message measurements
New in version 3.13.0.
Parameters:
-
stats_track_duration
(*args, **kwds)¶ Time around a context and add to the the per-message measurements
New in version 3.13.0.
Parameters: key (str) – The key for the timing to track
-
statsd_add_timing
(key, duration)¶ Add a timing to the per-message measurements
Parameters: - key (str) – The key to add the timing to
- duration (int|float) – The timing value
Deprecated since version 3.13.0.
-
statsd_incr
(key, value=1)¶ Increment the specified key in the per-message measurements
Parameters: Deprecated since version 3.13.0.
-
statsd_track_duration
(key)¶ Time around a context and add to the the per-message measurements
Parameters: key (str) – The key for the timing to track Deprecated since version 3.13.0.
-
timestamp
¶ Access the unix epoch timestamp value from the AMQP message properties of the current message.
Return type: int
-
unset_sentry_context
(tag)¶ Remove a context tag from sentry
Parameters: tag (str) – The context tag to remove
-
user_id
¶ Access the
user-id
AMQP message property from the current message’s properties.Return type: str
-
yield_to_ioloop
(*args, **kwargs)¶ Function that will allow Rejected to process IOLoop events while in a tight-loop inside an asynchronous consumer.
-