Reference guide¶
Connecting to the AMQP broker¶
-
asynqp.
connect
(host='localhost', port=5672, username='guest', password='guest', virtual_host='/', *, loop=None, sock=None, **kwargs)¶ Connect to an AMQP server on the given host and port.
Log in to the given virtual host using the supplied credentials. This function is a coroutine.
Parameters: - host (str) – the host server to connect to.
- port (int) – the port which the AMQP server is listening on.
- username (str) – the username to authenticate with.
- password (str) – the password to authenticate with.
- virtual_host (str) – the AMQP virtual host to connect to.
- loop (BaseEventLoop) – An instance of
BaseEventLoop
to use. (Defaults toasyncio.get_event_loop
) - sock (socket) – A
socket
instance to use for the connection. This is passed on toloop.create_connection()
. Ifsock
is supplied thenhost
andport
will be ignored.
Further keyword arguments are passed on to
loop.create_connection()
.This function will set TCP_NODELAY on TCP and TCP6 sockets either on supplied
sock
or created one.Returns: the Connection
object.
-
asynqp.
connect_and_open_channel
(host='localhost', port=5672, username='guest', password='guest', virtual_host='/', *, loop=None, **kwargs)¶ Connect to an AMQP server and open a channel on the connection. This function is a coroutine.
Parameters of this function are the same as
connect
.Returns: a tuple of (connection, channel)
.Equivalent to:
connection = yield from connect(host, port, username, password, virtual_host, loop=loop, **kwargs) channel = yield from connection.open_channel() return connection, channel
Managing Connections and Channels¶
Connections¶
-
class
asynqp.
Connection
¶ Manage connections to AMQP brokers.
A
Connection
is a long-lasting mode of communication with a remote server. Each connection occupies a single TCP connection, and may carry multipleChannels
. A connection communicates with a single virtual host on the server; virtual hosts are sandboxed and may not communicate with one another.Applications are advised to use one connection for each AMQP peer it needs to communicate with; if you need to perform multiple concurrent tasks you should open multiple channels.
Connections are created using
asynqp.connect()
.-
transport
¶ The
BaseTransport
over which the connection is communicating with the server
-
is_closed
()¶ Returns True if connection was closed
-
Channels¶
-
class
asynqp.
Channel
¶ Manage AMQP Channels.
A Channel is a ‘virtual connection’ over which messages are sent and received. Several independent channels can be multiplexed over the same
Connection
, so peers can perform several tasks concurrently while using a single socket.Channels are created using
Connection.open_channel()
.-
declare_exchange
(name, type, *, durable=True, auto_delete=False, passive=False, internal=False, nowait=False, arguments=None)¶ Declare an
Exchange
on the broker. If the exchange does not exist, it will be created.This method is a coroutine.
Parameters: - name (str) – the name of the exchange.
- type (str) – the type of the exchange
(usually one of
'fanout'
,'direct'
,'topic'
, or'headers'
) - durable (bool) – If true, the exchange will be re-created when the server restarts.
- auto_delete (bool) – If true, the exchange will be deleted when the last queue is un-bound from it.
- passive (bool) – If true and exchange with such a name does
not exist it will raise a
exceptions.NotFound
. If false server will create it. Argumentsdurable
,auto_delete
andinternal
are ignored if passive=True. - internal (bool) – If true, the exchange cannot be published to directly; it can only be bound to other exchanges.
- nowait (bool) – If true, the method will not wait for declare-ok to arrive and return right away.
- arguments (dict) – Table of optional parameters for extensions to the AMQP protocol. See Protocol extensions.
Returns: the new
Exchange
object.
-
declare_queue
(name='', *, durable=True, exclusive=False, auto_delete=False, passive=False, nowait=False, arguments=None)¶ Declare a queue on the broker. If the queue does not exist, it will be created.
This method is a coroutine.
Parameters: - name (str) – the name of the queue. Supplying a name of ‘’ will create a queue with a unique name of the server’s choosing.
- durable (bool) – If true, the queue will be re-created when the server restarts.
- exclusive (bool) – If true, the queue can only be accessed by the current connection, and will be deleted when the connection is closed.
- auto_delete (bool) – If true, the queue will be deleted when the last consumer is cancelled. If there were never any conusmers, the queue won’t be deleted.
- passive (bool) – If true and queue with such a name does not
exist it will raise a
exceptions.NotFound
instead of creating it. Argumentsdurable
,auto_delete
andexclusive
are ignored ifpassive=True
. - nowait (bool) – If true, will not wait for a declare-ok to arrive.
- arguments (dict) – Table of optional parameters for extensions to the AMQP protocol. See Protocol extensions.
Returns: The new
Queue
object.
-
set_qos
(prefetch_size=0, prefetch_count=0, apply_globally=False)¶ Specify quality of service by requesting that messages be pre-fetched from the server. Pre-fetching means that the server will deliver messages to the client while the client is still processing unacknowledged messages.
This method is a coroutine.
Parameters: - prefetch_size (int) – Specifies a prefetch window in bytes. Messages smaller than this will be sent from the server in advance. This value may be set to 0, which means “no specific limit”.
- prefetch_count (int) – Specifies a prefetch window in terms of whole messages.
- apply_globally (bool) –
If true, apply these QoS settings on a global level. The meaning of this is implementation-dependent. From the RabbitMQ documentation:
RabbitMQ has reinterpreted this field. The original specification said: “By default the QoS settings apply to the current channel only. If this field is set, they are applied to the entire connection.” Instead, RabbitMQ takes global=false to mean that the QoS settings should apply per-consumer (for new consumers on the channel; existing ones being unaffected) and global=true to mean that the QoS settings should apply per-channel.
-
set_return_handler
(handler)¶ Set
handler
as the callback function for undeliverable messages that were returned by the server.By default, an exception is raised, which will be handled by the event loop’s exception handler (see
BaseEventLoop.set_exception_handler
). Ifhandler
is None, this default behaviour is set.Parameters: handler (callable) – A function to be called when a message is returned. The callback will be passed the undelivered message.
-
Sending and receiving messages with Queues and Exchanges¶
Queues¶
-
class
asynqp.
Queue
¶ Manage AMQP Queues and consume messages.
A queue is a collection of messages, to which new messages can be delivered via an
Exchange
, and from which messages can be consumed by an application.Queues are created using
Channel.declare_queue()
.-
name
¶ the name of the queue
-
durable
¶ if True, the queue will be re-created when the broker restarts
-
exclusive
¶ if True, the queue is only accessible over one channel
-
auto_delete
¶ if True, the queue will be deleted when its last consumer is removed
-
arguments
¶ A dictionary of the extra arguments that were used to declare the queue.
-
bind
(exchange, routing_key, *, arguments=None)¶ Bind a queue to an exchange, with the supplied routing key.
This action ‘subscribes’ the queue to the routing key; the precise meaning of this varies with the exchange type.
This method is a coroutine.
Parameters: - exchange (asynqp.Exchange) – the
Exchange
to bind to - routing_key (str) – the routing key under which to bind
- arguments (dict) – Table of optional parameters for extensions to the AMQP protocol. See Protocol extensions.
Returns: The new
QueueBinding
object- exchange (asynqp.Exchange) – the
-
consume
(callback, *, no_local=False, no_ack=False, exclusive=False, arguments=None)¶ Start a consumer on the queue. Messages will be delivered asynchronously to the consumer. The callback function will be called whenever a new message arrives on the queue.
Advanced usage: the callback object must be callable (it must be a function or define a
__call__
method), but may also define some further methods:callback.on_cancel()
: called with no parameters when the consumer is successfully cancelled.callback.on_error(exc)
: called when the channel is closed due to an error. The argument passed is the exception which caused the error.
This method is a coroutine.
Parameters: - callback (callable) – a callback to be called when a message is delivered.
The callback must accept a single argument (an instance of
IncomingMessage
). - no_local (bool) – If true, the server will not deliver messages that were published by this connection.
- no_ack (bool) – If true, messages delivered to the consumer don’t require acknowledgement.
- exclusive (bool) – If true, only this consumer can access the queue.
- arguments (dict) – Table of optional parameters for extensions to the AMQP protocol. See Protocol extensions.
Returns: The newly created
Consumer
object.
-
Exchanges¶
-
class
asynqp.
Exchange
¶ Manage AMQP Exchanges and publish messages.
An exchange is a ‘routing node’ to which messages can be published. When a message is published to an exchange, the exchange determines which
Queue
to deliver the message to by inspecting the message’s routing key and the exchange’s bindings. You can bind a queue to an exchange, to start receiving messages on the queue, usingQueue.bind
.Exchanges are created using
Channel.declare_exchange()
.-
name
¶ the name of the exchange.
-
type
¶ the type of the exchange (usually one of
'fanout'
,'direct'
,'topic'
, or'headers'
).
-
delete
(*, if_unused=True)¶ Delete the exchange.
This method is a coroutine.
Parameters: if_unused (bool) – If true, the exchange will only be deleted if it has no queues bound to it.
-
publish
(message, routing_key, *, mandatory=True)¶ Publish a message on the exchange, to be asynchronously delivered to queues.
Parameters: - message (asynqp.Message) – the message to send
- routing_key (str) – the routing key with which to publish the message
-
Bindings¶
-
class
asynqp.
QueueBinding
¶ Manage queue-exchange bindings.
Represents a binding between a
Queue
and anExchange
. Once a queue has been bound to an exchange, messages published to that exchange will be delivered to the queue. The delivery may be conditional, depending on the type of the exchange.QueueBindings are created using
Queue.bind()
.-
routing_key
¶ the routing key used for the binding
-
Consumers¶
-
class
asynqp.
Consumer
¶ A consumer asynchronously recieves messages from a queue as they arrive.
Consumers are created using
Queue.consume()
.-
tag
¶ A string representing the consumer tag used by the server to identify this consumer.
-
callback
¶ The callback function that is called when messages are delivered to the consumer. This is the function that was passed to
Queue.consume()
, and should accept a singleIncomingMessage
argument.
-
cancelled
¶ Boolean. True if the consumer has been successfully cancelled.
-
Message objects¶
-
class
asynqp.
Message
(body, *, headers=None, content_type=None, content_encoding=None, delivery_mode=None, priority=None, correlation_id=None, reply_to=None, expiration=None, message_id=None, timestamp=None, type=None, user_id=None, app_id=None)¶ An AMQP Basic message.
Some of the constructor parameters are ignored by the AMQP broker and are provided just for the convenience of user applications. They are marked “for applications” in the list below.
Parameters: - body –
bytes
,str
ordict
representing the body of the message. Strings will be encoded according to the content_encoding parameter; dicts will be converted to a string using JSON. - headers (dict) – a dictionary of message headers
- content_type (str) – MIME content type
(defaults to ‘application/json’ if
body
is adict
, or ‘application/octet-stream’ otherwise) - content_encoding (str) – MIME encoding (defaults to ‘utf-8’)
- delivery_mode (int) – 1 for non-persistent, 2 for persistent
- priority (int) – message priority - integer between 0 and 9
- correlation_id (str) – correlation id of the message (for applications)
- reply_to (str) – reply-to address (for applications)
- expiration (str) – expiration specification (for applications)
- message_id (str) – unique id of the message (for applications)
- timestamp (datetime.datetime) –
datetime
of when the message was sent (default:datetime.now()
) - type (str) – message type (for applications)
- user_id (str) – ID of the user sending the message (for applications)
- app_id (str) – ID of the application sending the message (for applications)
Attributes are the same as the constructor parameters.
-
json
()¶ Parse the message body as JSON.
Returns: the parsed JSON.
- body –
Exceptions¶
-
exception
asynqp.exceptions.
AMQPConnectionError
¶
-
exception
asynqp.exceptions.
ConnectionLostError
(message, exc=None)¶ Connection was closed unexpectedly
-
exception
asynqp.exceptions.
ConnectionClosed
(reply_text, reply_code=None)¶ Connection was closed by client
-
exception
asynqp.exceptions.
ChannelClosed
¶ Channel was closed by client
-
exception
asynqp.exceptions.
UndeliverableMessage
¶
-
exception
asynqp.exceptions.
Deleted
¶
-
exception
asynqp.exceptions.
AMQPError
¶
-
exception
asynqp.exceptions.
AMQPChannelError
¶
-
exception
asynqp.exceptions.
ResourceLocked
¶
-
exception
asynqp.exceptions.
NotAllowed
¶
-
exception
asynqp.exceptions.
AccessRefused
¶
-
exception
asynqp.exceptions.
NotFound
¶
-
exception
asynqp.exceptions.
FrameError
¶
-
exception
asynqp.exceptions.
InvalidPath
¶
-
exception
asynqp.exceptions.
NoConsumers
¶
-
exception
asynqp.exceptions.
ChannelError
¶
-
exception
asynqp.exceptions.
CommandInvalid
¶
-
exception
asynqp.exceptions.
ContentTooLarge
¶
-
exception
asynqp.exceptions.
ConnectionForced
¶
-
exception
asynqp.exceptions.
UnexpectedFrame
¶
-
exception
asynqp.exceptions.
SyntaxError
¶
-
exception
asynqp.exceptions.
PreconditionFailed
¶
-
exception
asynqp.exceptions.
ResourceError
¶
-
exception
asynqp.exceptions.
InternalError
¶
-
exception
asynqp.exceptions.
NotImplemented
¶