SmartConsumer¶
-
class
rejected.consumer.
SmartConsumer
(settings, process, drop_invalid_messages=None, message_type=None, error_exchange=None, error_max_retry=None, drop_exchange=None)[source]¶ Base class to ease the implementation of strongly typed message consumers that validate and automatically decode and deserialize the inbound message body based upon the message properties. Additionally, should one of the supported
content_encoding
types (gzip
orbzip2
) be specified in the message’s property, it will automatically be decoded.When publishing a message, the message can be automatically serialized and encoded. If the
content_type
property is specified, the consumer will attempt to automatically serialize the message body. If thecontent_encoding
property is specified using a supported encoding (gzip
orbzip2
), it will automatically be encoded as well.Supported MIME types for automatic serialization and deserialization are:
- application/json
- application/pickle
- application/x-pickle
- application/x-plist
- application/x-vnd.python.pickle
- application/vnd.python.pickle
- text/csv
- text/html (with beautifulsoup4 installed)
- text/xml (with beautifulsoup4 installed)
- text/yaml
- text/x-yaml
In any of the consumer base classes, if the
MESSAGE_TYPE
attribute is set, thetype
property of incoming messages will be validated against when a message is received, checking for string equality against theMESSAGE_TYPE
attribute. If they are not matched, the consumer will not process the message and will drop the message without an exception if theDROP_INVALID_MESSAGES
attribute is set toTrue
. If it isFalse
, aConsumerException
is raised.Note
Since 3.17,
SmartConsumer
andSmartPublishingConsumer
have been combined into the same class.-
app_id
¶ Access the current message’s
app-id
property as an attribute of the consumer class.Return type: str
-
content_encoding
¶ Access the current message’s
content-encoding
AMQP message property as an attribute of the consumer class.Return type: str
-
content_type
¶ Access the current message’s
content-type
AMQP message property as an attribute of the consumer class.Return type: str
-
correlation_id
¶ Access the current message’s
correlation-id
AMAP message property as an attribute of the consumer class. If the message does not have acorrelation-id
then, each message is assigned a new UUIDv4 basedcorrelation-id
value.Return type: str
-
exchange
¶ Access the AMQP exchange the message was published to as an attribute of the consumer class.
Return type: str
-
expiration
¶ Access the current message’s
expiration
AMQP message property as an attribute of the consumer class.Return type: str
-
finish
()¶ Finishes message processing for the current message. If this is called in
prepare()
, theprocess()
method is not invoked for the current message.
-
headers
¶ Access the current message’s
headers
AMQP message property as an attribute of the consumer class.Return type: dict
-
initialize
()¶ Extend this method for any initialization tasks that occur only when the
Consumer
class is created.
-
message_id
¶ Access the current message’s
message-id
AMQP message property as an attribute of the consumer class.Return type: str
-
message_type
¶ Access the current message’s
type
AMQP message property as an attribute of the consumer class.Return type: str
-
on_blocked
(name)¶ Called when a connection for this consumer is blocked.
Override this method to respond to being blocked.
New in version 3.17.
Parameters: name (str) – The connection name that is blocked
-
on_finish
()¶ Called after a message has been processed.
Override this method to perform cleanup, logging, etc. This method is a counterpart to
prepare()
.on_finish
may not produce any output, as it is called after all processing has taken place.If an exception is raised during the processing of a message,
prepare()
is not invoked.Note
Asynchronous support: Decorate this method with
tornado.gen.coroutine()
to make it asynchronous.
-
on_unblocked
(name)¶ Called when a connection for this consumer is unblocked.
Override this method to respond to being blocked.
New in version 3.17.
Parameters: name (str) – The connection name that is blocked
-
prepare
()¶ Called when a message is received before
process()
.Note
Asynchronous support: Decorate this method with
tornado.gen.coroutine()
to make it asynchronous.If this method returns a
Future
, execution will not proceed until the Future has completed.
-
priority
¶ Access the current message’s
priority
AMQP message property as an attribute of the consumer class.Return type: int
-
process
()¶ Extend this method for implementing your Consumer logic.
If the message can not be processed and the Consumer should stop after n failures to process messages, raise the
ConsumerException
.Note
Asynchronous support: Decorate this method with
tornado.gen.coroutine()
to make it asynchronous.Raises: rejected.consumer.ConsumerException
Raises: rejected.consumer.MessageException
Raises: rejected.consumer.ProcessingException
-
properties
¶ Access the current message’s AMQP message properties in dict form as an attribute of the consumer class.
Return type: dict
-
publish_message
(exchange, routing_key, properties, body, no_serialization=False, no_encoding=False, channel=None)[source]¶ Publish a message to RabbitMQ on the same channel the original message was received on.
By default, if you pass a non-string object to the body and the properties have a supported content-type set, the body will be auto-serialized in the specified content-type.
If the properties do not have a timestamp set, it will be set to the current time.
If you specify a content-encoding in the properties and the encoding is supported, the body will be auto-encoded.
Both of these behaviors can be disabled by setting no_serialization or no_encoding to True.
Parameters: - exchange (str) – The exchange to publish to
- routing_key (str) – The routing key to publish with
- properties (dict) – The message properties
- body (mixed) – The message body to publish
- no_serialization (bool) – Turn off auto-serialization of the body
- no_encoding (bool) – Turn off auto-encoding of the body
- channel (str) – The channel/connection name to use. If it is not specified, the channel that the message was delivered on is used.
-
reply
(response_body, properties, auto_id=True, exchange=None, reply_to=None)¶ Reply to the received message.
If
auto_id
isTrue
, a new UUIDv4 value will be generated for themessage_id
AMQP message property. Thecorrelation_id
AMQP message property will be set to themessage_id
of the original message. In addition, thetimestamp
will be assigned the current time of the message. Ifauto_id
isFalse
, neither themessage_id
and thecorrelation_id
AMQP properties will be changed in the properties.If
exchange
is not set, the exchange the message was received on will be used.If
reply_to
is set in the original properties, it will be used as the routing key. If thereply_to
is not set in the properties and it is not passed in, aValueError
will be raised. If reply to is set in the properties, it will be cleared out prior to the message being republished.Parameters: - response_body (any) – The message body to send
- properties (
rejected.data.Properties
) – Message properties to use - auto_id (bool) – Automatically shuffle
message_id
&correlation_id
- exchange (str) – Override the exchange to publish to
- reply_to (str) – Override the
reply_to
AMQP property
Raises:
-
reply_to
¶ Access the current message’s
reply-to
AMQP message property as an attribute of the consumer class.Return type: str
-
returned
¶ Indicates if the message was delivered by consumer previously and returned from RabbitMQ.
New in version 3.17.
Return type: bool
-
send_exception_to_sentry
(exc_info)¶ Send an exception to Sentry if enabled.
Parameters: exc_info (tuple) – exception information as returned from sys.exc_info()
-
sentry_client
¶ Access the Sentry raven
Client
instance orNone
Use this object to add tags or additional context to Sentry error reports (see
raven.base.Client.tags_context()
) or to report messages (viaraven.base.Client.captureMessage()
) directly to Sentry.Return type: raven.base.Client
-
set_sentry_context
(tag, value)¶ Set a context tag in Sentry for the given key and value.
Parameters:
-
settings
¶ Access the consumer settings as specified by the
config
section for the consumer in the rejected configuration.Return type: dict
-
shutdown
()¶ Override to cleanly shutdown when rejected is stopping the consumer.
This could be used for closing database connections or other such activities.
-
stats_add_timing
(key, duration)¶ Add a timing to the per-message measurements
New in version 3.13.0.
Parameters: - key (str) – The key to add the timing to
- duration (int|float) – The timing value
-
stats_incr
(key, value=1)¶ Increment the specified key in the per-message measurements
New in version 3.13.0.
Parameters:
-
stats_set_tag
(key, value=1)¶ Set the specified tag/value in the per-message measurements
New in version 3.13.0.
Parameters:
-
stats_set_value
(key, value=1)¶ Set the specified key/value in the per-message measurements
New in version 3.13.0.
Parameters:
-
stats_track_duration
(*args, **kwds)¶ Time around a context and add to the the per-message measurements
New in version 3.13.0.
Parameters: key (str) – The key for the timing to track
-
statsd_add_timing
(key, duration)¶ Add a timing to the per-message measurements
Parameters: - key (str) – The key to add the timing to
- duration (int|float) – The timing value
Deprecated since version 3.13.0.
-
statsd_incr
(key, value=1)¶ Increment the specified key in the per-message measurements
Parameters: Deprecated since version 3.13.0.
-
statsd_track_duration
(key)¶ Time around a context and add to the the per-message measurements
Parameters: key (str) – The key for the timing to track Deprecated since version 3.13.0.
-
timestamp
¶ Access the unix epoch timestamp value from the AMQP message properties of the current message.
Return type: int
-
unset_sentry_context
(tag)¶ Remove a context tag from sentry
Parameters: tag (str) – The context tag to remove
-
user_id
¶ Access the
user-id
AMQP message property from the current message’s properties.Return type: str
-
yield_to_ioloop
(*args, **kwargs)¶ Function that will allow Rejected to process IOLoop events while in a tight-loop inside an asynchronous consumer.
-
class
rejected.consumer.
SmartPublishingConsumer
(*args, **kwargs)[source]¶ Deprecated, functionality moved to
rejected.consumer.SmartConsumer
Deprecated since version 3.17.0.
-
app_id
¶ Access the current message’s
app-id
property as an attribute of the consumer class.Return type: str
-
content_encoding
¶ Access the current message’s
content-encoding
AMQP message property as an attribute of the consumer class.Return type: str
-
content_type
¶ Access the current message’s
content-type
AMQP message property as an attribute of the consumer class.Return type: str
-
correlation_id
¶ Access the current message’s
correlation-id
AMAP message property as an attribute of the consumer class. If the message does not have acorrelation-id
then, each message is assigned a new UUIDv4 basedcorrelation-id
value.Return type: str
-
exchange
¶ Access the AMQP exchange the message was published to as an attribute of the consumer class.
Return type: str
-
expiration
¶ Access the current message’s
expiration
AMQP message property as an attribute of the consumer class.Return type: str
-
finish
()¶ Finishes message processing for the current message. If this is called in
prepare()
, theprocess()
method is not invoked for the current message.
-
headers
¶ Access the current message’s
headers
AMQP message property as an attribute of the consumer class.Return type: dict
-
initialize
()¶ Extend this method for any initialization tasks that occur only when the
Consumer
class is created.
-
message_id
¶ Access the current message’s
message-id
AMQP message property as an attribute of the consumer class.Return type: str
-
message_type
¶ Access the current message’s
type
AMQP message property as an attribute of the consumer class.Return type: str
-
on_blocked
(name)¶ Called when a connection for this consumer is blocked.
Override this method to respond to being blocked.
New in version 3.17.
Parameters: name (str) – The connection name that is blocked
-
on_finish
()¶ Called after a message has been processed.
Override this method to perform cleanup, logging, etc. This method is a counterpart to
prepare()
.on_finish
may not produce any output, as it is called after all processing has taken place.If an exception is raised during the processing of a message,
prepare()
is not invoked.Note
Asynchronous support: Decorate this method with
tornado.gen.coroutine()
to make it asynchronous.
-
on_unblocked
(name)¶ Called when a connection for this consumer is unblocked.
Override this method to respond to being blocked.
New in version 3.17.
Parameters: name (str) – The connection name that is blocked
-
prepare
()¶ Called when a message is received before
process()
.Note
Asynchronous support: Decorate this method with
tornado.gen.coroutine()
to make it asynchronous.If this method returns a
Future
, execution will not proceed until the Future has completed.
-
priority
¶ Access the current message’s
priority
AMQP message property as an attribute of the consumer class.Return type: int
-
process
()¶ Extend this method for implementing your Consumer logic.
If the message can not be processed and the Consumer should stop after n failures to process messages, raise the
ConsumerException
.Note
Asynchronous support: Decorate this method with
tornado.gen.coroutine()
to make it asynchronous.Raises: rejected.consumer.ConsumerException
Raises: rejected.consumer.MessageException
Raises: rejected.consumer.ProcessingException
-
properties
¶ Access the current message’s AMQP message properties in dict form as an attribute of the consumer class.
Return type: dict
-
publish_message
(exchange, routing_key, properties, body, no_serialization=False, no_encoding=False, channel=None)¶ Publish a message to RabbitMQ on the same channel the original message was received on.
By default, if you pass a non-string object to the body and the properties have a supported content-type set, the body will be auto-serialized in the specified content-type.
If the properties do not have a timestamp set, it will be set to the current time.
If you specify a content-encoding in the properties and the encoding is supported, the body will be auto-encoded.
Both of these behaviors can be disabled by setting no_serialization or no_encoding to True.
Parameters: - exchange (str) – The exchange to publish to
- routing_key (str) – The routing key to publish with
- properties (dict) – The message properties
- body (mixed) – The message body to publish
- no_serialization (bool) – Turn off auto-serialization of the body
- no_encoding (bool) – Turn off auto-encoding of the body
- channel (str) – The channel/connection name to use. If it is not specified, the channel that the message was delivered on is used.
-
reply
(response_body, properties, auto_id=True, exchange=None, reply_to=None)¶ Reply to the received message.
If
auto_id
isTrue
, a new UUIDv4 value will be generated for themessage_id
AMQP message property. Thecorrelation_id
AMQP message property will be set to themessage_id
of the original message. In addition, thetimestamp
will be assigned the current time of the message. Ifauto_id
isFalse
, neither themessage_id
and thecorrelation_id
AMQP properties will be changed in the properties.If
exchange
is not set, the exchange the message was received on will be used.If
reply_to
is set in the original properties, it will be used as the routing key. If thereply_to
is not set in the properties and it is not passed in, aValueError
will be raised. If reply to is set in the properties, it will be cleared out prior to the message being republished.Parameters: - response_body (any) – The message body to send
- properties (
rejected.data.Properties
) – Message properties to use - auto_id (bool) – Automatically shuffle
message_id
&correlation_id
- exchange (str) – Override the exchange to publish to
- reply_to (str) – Override the
reply_to
AMQP property
Raises:
-
reply_to
¶ Access the current message’s
reply-to
AMQP message property as an attribute of the consumer class.Return type: str
-
returned
¶ Indicates if the message was delivered by consumer previously and returned from RabbitMQ.
New in version 3.17.
Return type: bool
-
send_exception_to_sentry
(exc_info)¶ Send an exception to Sentry if enabled.
Parameters: exc_info (tuple) – exception information as returned from sys.exc_info()
-
sentry_client
¶ Access the Sentry raven
Client
instance orNone
Use this object to add tags or additional context to Sentry error reports (see
raven.base.Client.tags_context()
) or to report messages (viaraven.base.Client.captureMessage()
) directly to Sentry.Return type: raven.base.Client
-
set_sentry_context
(tag, value)¶ Set a context tag in Sentry for the given key and value.
Parameters:
-
settings
¶ Access the consumer settings as specified by the
config
section for the consumer in the rejected configuration.Return type: dict
-
shutdown
()¶ Override to cleanly shutdown when rejected is stopping the consumer.
This could be used for closing database connections or other such activities.
-
stats_add_timing
(key, duration)¶ Add a timing to the per-message measurements
New in version 3.13.0.
Parameters: - key (str) – The key to add the timing to
- duration (int|float) – The timing value
-
stats_incr
(key, value=1)¶ Increment the specified key in the per-message measurements
New in version 3.13.0.
Parameters:
-
stats_set_tag
(key, value=1)¶ Set the specified tag/value in the per-message measurements
New in version 3.13.0.
Parameters:
-
stats_set_value
(key, value=1)¶ Set the specified key/value in the per-message measurements
New in version 3.13.0.
Parameters:
-
stats_track_duration
(*args, **kwds)¶ Time around a context and add to the the per-message measurements
New in version 3.13.0.
Parameters: key (str) – The key for the timing to track
-
statsd_add_timing
(key, duration)¶ Add a timing to the per-message measurements
Parameters: - key (str) – The key to add the timing to
- duration (int|float) – The timing value
Deprecated since version 3.13.0.
-
statsd_incr
(key, value=1)¶ Increment the specified key in the per-message measurements
Parameters: Deprecated since version 3.13.0.
-
statsd_track_duration
(key)¶ Time around a context and add to the the per-message measurements
Parameters: key (str) – The key for the timing to track Deprecated since version 3.13.0.
-
timestamp
¶ Access the unix epoch timestamp value from the AMQP message properties of the current message.
Return type: int
-
unset_sentry_context
(tag)¶ Remove a context tag from sentry
Parameters: tag (str) – The context tag to remove
-
user_id
¶ Access the
user-id
AMQP message property from the current message’s properties.Return type: str
-
yield_to_ioloop
(*args, **kwargs)¶ Function that will allow Rejected to process IOLoop events while in a tight-loop inside an asynchronous consumer.
-