Consumer¶
-
class
rejected.consumer.Consumer(settings, process, drop_invalid_messages=None, message_type=None, error_exchange=None, error_max_retry=None, drop_exchange=None)[source]¶ Base consumer class that defines the contract between rejected and consumer applications.
In any of the consumer base classes, if the
message_typeis specified in the configuration (or set with theMESSAGE_TYPEattribute), thetypeproperty of incoming messages will be validated against when a message is received. If there is no match, the consumer will not process the message and will drop the message without an exception if thedrop_invalid_messagessetting is set toTruein the configuration (or if theDROP_INVALID_MESSAGESattribute is set toTrue). If it isFalse, aMessageExceptionis raised.If
DROP_EXCHANGEis specified either as an attribute of the consumer class or in the consumer configuration, if a message is dropped, it is published to the that exchange prior to rejecting the message in RabbitMQ. When the message is republished, four new values are added to the AMQPheadersmessage property:X-Dropped-By,X-Dropped-Reason,X-Dropped-Timestamp,X-Original-Exchange.The
X-Dropped-Byheader value contains the configured name of the consumer that dropped the message.X-Dropped-Reasoncontains the reason the message was dropped (eg invalid message type or maximum error count).X-Dropped-Timestampvalue contains the ISO-8601 formatted timestamp of when the message was dropped. Finally, theX-Original-Exchangevalue contains the original exchange that the message was published to.If a consumer raises a
ProcessingException, the message that was being processed will be republished to the exchange specified by theerrorexchange configuration value or theERROR_EXCHANGEattribute of the consumer’s class. The message will be published using the routing key that was last used for the message. The original message body and properties will be used and two additional header property values may be added:X-Processing-Exceptioncontains the string value of the- exception that was raised, if specified.
X-Processing-Exceptionscontains the quantity of processing- exceptions that have been raised for the message.
In combination with a queue that has
x-message-ttlset andx-dead-letter-exchangethat points to the original exchange for the queue the consumer is consuming off of, you can implement a delayed retry cycle for messages that are failing to process due to external resource or service issues.If
error_max_retryis specified in the configuration orERROR_MAX_RETRYis set on the class, the headers for each method will be inspected and if the value ofX-Processing-Exceptionsis greater than or equal to the specified value, the message will be dropped.Note
Since 3.17,
ConsumerandPublishingConsumerhave been combined into the same class.-
app_id¶ Access the current message’s
app-idproperty as an attribute of the consumer class.Return type: str
-
content_encoding¶ Access the current message’s
content-encodingAMQP message property as an attribute of the consumer class.Return type: str
-
content_type¶ Access the current message’s
content-typeAMQP message property as an attribute of the consumer class.Return type: str
-
correlation_id¶ Access the current message’s
correlation-idAMAP message property as an attribute of the consumer class. If the message does not have acorrelation-idthen, each message is assigned a new UUIDv4 basedcorrelation-idvalue.Return type: str
-
exchange¶ Access the AMQP exchange the message was published to as an attribute of the consumer class.
Return type: str
-
expiration¶ Access the current message’s
expirationAMQP message property as an attribute of the consumer class.Return type: str
-
finish()[source]¶ Finishes message processing for the current message. If this is called in
prepare(), theprocess()method is not invoked for the current message.
-
headers¶ Access the current message’s
headersAMQP message property as an attribute of the consumer class.Return type: dict
-
initialize()[source]¶ Extend this method for any initialization tasks that occur only when the
Consumerclass is created.
-
message_id¶ Access the current message’s
message-idAMQP message property as an attribute of the consumer class.Return type: str
-
message_type¶ Access the current message’s
typeAMQP message property as an attribute of the consumer class.Return type: str
-
on_blocked(name)[source]¶ Called when a connection for this consumer is blocked.
Override this method to respond to being blocked.
New in version 3.17.
Parameters: name (str) – The connection name that is blocked
-
on_finish()[source]¶ Called after a message has been processed.
Override this method to perform cleanup, logging, etc. This method is a counterpart to
prepare().on_finishmay not produce any output, as it is called after all processing has taken place.If an exception is raised during the processing of a message,
prepare()is not invoked.Note
Asynchronous support: Decorate this method with
tornado.gen.coroutine()to make it asynchronous.
-
on_unblocked(name)[source]¶ Called when a connection for this consumer is unblocked.
Override this method to respond to being blocked.
New in version 3.17.
Parameters: name (str) – The connection name that is blocked
-
prepare()[source]¶ Called when a message is received before
process().Note
Asynchronous support: Decorate this method with
tornado.gen.coroutine()to make it asynchronous.If this method returns a
Future, execution will not proceed until the Future has completed.
-
priority¶ Access the current message’s
priorityAMQP message property as an attribute of the consumer class.Return type: int
-
process()[source]¶ Extend this method for implementing your Consumer logic.
If the message can not be processed and the Consumer should stop after n failures to process messages, raise the
ConsumerException.Note
Asynchronous support: Decorate this method with
tornado.gen.coroutine()to make it asynchronous.Raises: rejected.consumer.ConsumerExceptionRaises: rejected.consumer.MessageExceptionRaises: rejected.consumer.ProcessingException
-
properties¶ Access the current message’s AMQP message properties in dict form as an attribute of the consumer class.
Return type: dict
-
publish_message(exchange, routing_key, properties, body, channel=None)[source]¶ Publish a message to RabbitMQ on the same channel the original message was received on.
Parameters:
-
reply(response_body, properties, auto_id=True, exchange=None, reply_to=None)[source]¶ Reply to the received message.
If
auto_idisTrue, a new UUIDv4 value will be generated for themessage_idAMQP message property. Thecorrelation_idAMQP message property will be set to themessage_idof the original message. In addition, thetimestampwill be assigned the current time of the message. Ifauto_idisFalse, neither themessage_idand thecorrelation_idAMQP properties will be changed in the properties.If
exchangeis not set, the exchange the message was received on will be used.If
reply_tois set in the original properties, it will be used as the routing key. If thereply_tois not set in the properties and it is not passed in, aValueErrorwill be raised. If reply to is set in the properties, it will be cleared out prior to the message being republished.Parameters: - response_body (any) – The message body to send
- properties (
rejected.data.Properties) – Message properties to use - auto_id (bool) – Automatically shuffle
message_id&correlation_id - exchange (str) – Override the exchange to publish to
- reply_to (str) – Override the
reply_toAMQP property
Raises:
-
reply_to¶ Access the current message’s
reply-toAMQP message property as an attribute of the consumer class.Return type: str
-
returned¶ Indicates if the message was delivered by consumer previously and returned from RabbitMQ.
New in version 3.17.
Return type: bool
-
send_exception_to_sentry(exc_info)[source]¶ Send an exception to Sentry if enabled.
Parameters: exc_info (tuple) – exception information as returned from sys.exc_info()
-
sentry_client¶ Access the Sentry raven
Clientinstance orNoneUse this object to add tags or additional context to Sentry error reports (see
raven.base.Client.tags_context()) or to report messages (viaraven.base.Client.captureMessage()) directly to Sentry.Return type: raven.base.Client
-
set_sentry_context(tag, value)[source]¶ Set a context tag in Sentry for the given key and value.
Parameters:
-
settings¶ Access the consumer settings as specified by the
configsection for the consumer in the rejected configuration.Return type: dict
-
shutdown()[source]¶ Override to cleanly shutdown when rejected is stopping the consumer.
This could be used for closing database connections or other such activities.
-
stats_add_timing(key, duration)[source]¶ Add a timing to the per-message measurements
New in version 3.13.0.
Parameters: - key (str) – The key to add the timing to
- duration (int|float) – The timing value
-
stats_incr(key, value=1)[source]¶ Increment the specified key in the per-message measurements
New in version 3.13.0.
Parameters:
-
stats_set_tag(key, value=1)[source]¶ Set the specified tag/value in the per-message measurements
New in version 3.13.0.
Parameters:
-
stats_set_value(key, value=1)[source]¶ Set the specified key/value in the per-message measurements
New in version 3.13.0.
Parameters:
-
stats_track_duration(*args, **kwds)[source]¶ Time around a context and add to the the per-message measurements
New in version 3.13.0.
Parameters: key (str) – The key for the timing to track
-
statsd_add_timing(key, duration)[source]¶ Add a timing to the per-message measurements
Parameters: - key (str) – The key to add the timing to
- duration (int|float) – The timing value
Deprecated since version 3.13.0.
-
statsd_incr(key, value=1)[source]¶ Increment the specified key in the per-message measurements
Parameters: Deprecated since version 3.13.0.
-
statsd_track_duration(key)[source]¶ Time around a context and add to the the per-message measurements
Parameters: key (str) – The key for the timing to track Deprecated since version 3.13.0.
-
timestamp¶ Access the unix epoch timestamp value from the AMQP message properties of the current message.
Return type: int
-
unset_sentry_context(tag)[source]¶ Remove a context tag from sentry
Parameters: tag (str) – The context tag to remove
-
class
rejected.consumer.PublishingConsumer(*args, **kwargs)[source]¶ Deprecated, functionality moved to
rejected.consumer.ConsumerDeprecated since version 3.17.0.
-
app_id¶ Access the current message’s
app-idproperty as an attribute of the consumer class.Return type: str
-
content_encoding¶ Access the current message’s
content-encodingAMQP message property as an attribute of the consumer class.Return type: str
-
content_type¶ Access the current message’s
content-typeAMQP message property as an attribute of the consumer class.Return type: str
-
correlation_id¶ Access the current message’s
correlation-idAMAP message property as an attribute of the consumer class. If the message does not have acorrelation-idthen, each message is assigned a new UUIDv4 basedcorrelation-idvalue.Return type: str
-
exchange¶ Access the AMQP exchange the message was published to as an attribute of the consumer class.
Return type: str
-
expiration¶ Access the current message’s
expirationAMQP message property as an attribute of the consumer class.Return type: str
-
finish()¶ Finishes message processing for the current message. If this is called in
prepare(), theprocess()method is not invoked for the current message.
-
headers¶ Access the current message’s
headersAMQP message property as an attribute of the consumer class.Return type: dict
-
initialize()¶ Extend this method for any initialization tasks that occur only when the
Consumerclass is created.
-
message_id¶ Access the current message’s
message-idAMQP message property as an attribute of the consumer class.Return type: str
-
message_type¶ Access the current message’s
typeAMQP message property as an attribute of the consumer class.Return type: str
-
on_blocked(name)¶ Called when a connection for this consumer is blocked.
Override this method to respond to being blocked.
New in version 3.17.
Parameters: name (str) – The connection name that is blocked
-
on_finish()¶ Called after a message has been processed.
Override this method to perform cleanup, logging, etc. This method is a counterpart to
prepare().on_finishmay not produce any output, as it is called after all processing has taken place.If an exception is raised during the processing of a message,
prepare()is not invoked.Note
Asynchronous support: Decorate this method with
tornado.gen.coroutine()to make it asynchronous.
-
on_unblocked(name)¶ Called when a connection for this consumer is unblocked.
Override this method to respond to being blocked.
New in version 3.17.
Parameters: name (str) – The connection name that is blocked
-
prepare()¶ Called when a message is received before
process().Note
Asynchronous support: Decorate this method with
tornado.gen.coroutine()to make it asynchronous.If this method returns a
Future, execution will not proceed until the Future has completed.
-
priority¶ Access the current message’s
priorityAMQP message property as an attribute of the consumer class.Return type: int
-
process()¶ Extend this method for implementing your Consumer logic.
If the message can not be processed and the Consumer should stop after n failures to process messages, raise the
ConsumerException.Note
Asynchronous support: Decorate this method with
tornado.gen.coroutine()to make it asynchronous.Raises: rejected.consumer.ConsumerExceptionRaises: rejected.consumer.MessageExceptionRaises: rejected.consumer.ProcessingException
-
properties¶ Access the current message’s AMQP message properties in dict form as an attribute of the consumer class.
Return type: dict
-
publish_message(exchange, routing_key, properties, body, channel=None)¶ Publish a message to RabbitMQ on the same channel the original message was received on.
Parameters:
-
reply(response_body, properties, auto_id=True, exchange=None, reply_to=None)¶ Reply to the received message.
If
auto_idisTrue, a new UUIDv4 value will be generated for themessage_idAMQP message property. Thecorrelation_idAMQP message property will be set to themessage_idof the original message. In addition, thetimestampwill be assigned the current time of the message. Ifauto_idisFalse, neither themessage_idand thecorrelation_idAMQP properties will be changed in the properties.If
exchangeis not set, the exchange the message was received on will be used.If
reply_tois set in the original properties, it will be used as the routing key. If thereply_tois not set in the properties and it is not passed in, aValueErrorwill be raised. If reply to is set in the properties, it will be cleared out prior to the message being republished.Parameters: - response_body (any) – The message body to send
- properties (
rejected.data.Properties) – Message properties to use - auto_id (bool) – Automatically shuffle
message_id&correlation_id - exchange (str) – Override the exchange to publish to
- reply_to (str) – Override the
reply_toAMQP property
Raises:
-
reply_to¶ Access the current message’s
reply-toAMQP message property as an attribute of the consumer class.Return type: str
-
returned¶ Indicates if the message was delivered by consumer previously and returned from RabbitMQ.
New in version 3.17.
Return type: bool
-
send_exception_to_sentry(exc_info)¶ Send an exception to Sentry if enabled.
Parameters: exc_info (tuple) – exception information as returned from sys.exc_info()
-
sentry_client¶ Access the Sentry raven
Clientinstance orNoneUse this object to add tags or additional context to Sentry error reports (see
raven.base.Client.tags_context()) or to report messages (viaraven.base.Client.captureMessage()) directly to Sentry.Return type: raven.base.Client
-
set_sentry_context(tag, value)¶ Set a context tag in Sentry for the given key and value.
Parameters:
-
settings¶ Access the consumer settings as specified by the
configsection for the consumer in the rejected configuration.Return type: dict
-
shutdown()¶ Override to cleanly shutdown when rejected is stopping the consumer.
This could be used for closing database connections or other such activities.
-
stats_add_timing(key, duration)¶ Add a timing to the per-message measurements
New in version 3.13.0.
Parameters: - key (str) – The key to add the timing to
- duration (int|float) – The timing value
-
stats_incr(key, value=1)¶ Increment the specified key in the per-message measurements
New in version 3.13.0.
Parameters:
-
stats_set_tag(key, value=1)¶ Set the specified tag/value in the per-message measurements
New in version 3.13.0.
Parameters:
-
stats_set_value(key, value=1)¶ Set the specified key/value in the per-message measurements
New in version 3.13.0.
Parameters:
-
stats_track_duration(*args, **kwds)¶ Time around a context and add to the the per-message measurements
New in version 3.13.0.
Parameters: key (str) – The key for the timing to track
-
statsd_add_timing(key, duration)¶ Add a timing to the per-message measurements
Parameters: - key (str) – The key to add the timing to
- duration (int|float) – The timing value
Deprecated since version 3.13.0.
-
statsd_incr(key, value=1)¶ Increment the specified key in the per-message measurements
Parameters: Deprecated since version 3.13.0.
-
statsd_track_duration(key)¶ Time around a context and add to the the per-message measurements
Parameters: key (str) – The key for the timing to track Deprecated since version 3.13.0.
-
timestamp¶ Access the unix epoch timestamp value from the AMQP message properties of the current message.
Return type: int
-
unset_sentry_context(tag)¶ Remove a context tag from sentry
Parameters: tag (str) – The context tag to remove
-
user_id¶ Access the
user-idAMQP message property from the current message’s properties.Return type: str
-
yield_to_ioloop(*args, **kwargs)¶ Function that will allow Rejected to process IOLoop events while in a tight-loop inside an asynchronous consumer.
-