Source code for amqpctl.command

"""
.. autoclass:: OperationalError
    :show-inheritance:
.. autoclass:: AMQPCTL
"""

from amqplib.client_0_8.connection import Connection
from amqplib.client_0_8.channel import Channel
from amqplib.client_0_8.exceptions import AMQPChannelException
from optparse import OptionParser
import logging

[docs]class OperationalError(Exception): """ Raised when invalid command is given by the user" """
[docs]class AMQPCTL(object): """ This is the command line class. It reads arguments using the standard python :class:optparse.OptionParser and uses `py-amqplib`_ to connect to an AMQP Server. The various methods in this class might serve as useful examples of `py-amqplib`_ usage. .. _py-amqplib: http://code.google.com/p/py-amqplib/ .. automethod:: connect .. automethod:: check_exchange .. automethod:: declare_exchange .. automethod:: delete_exchange .. automethod:: check_queue .. automethod:: declare_queue .. automethod:: delete_queue .. automethod:: purge_queue """ parser = OptionParser() parser.add_option("-s", "--server", dest="server", default="localhost", help="Hostname of the AMQP Server") parser.add_option("-u", "--username", dest="username", default="guest", help="Username (default: guest)") parser.add_option("-p", "--password", dest="password", default="guest", help="Password (default: guest)") parser.add_option("--quiet", dest="quiet", default=False, action="store_true", help="No verbose output") parser.add_option("-e", "--exchange", dest="exchange", default="test_exchange", help="Exchange Name (default: test_exchange)") parser.add_option("-t", "--type", dest="exchange_type", default="direct", help="Type of Exchange (direct or fanout)") parser.add_option("--durable", dest="durable", default=False, action="store_true", help="Declare queues and exchanges durable") parser.add_option("-q", "--queue", dest="queue", default="test_queue", help="Queue Name (default: test_queue)") parser.add_option("--exclusive", dest="exclusive", default=False, action="store_true", help="Declare queues as exclusive") parser.add_option("--declare-exchange", dest="declare_exchange", default=False, action="store_true", help="Declare the exchange") parser.add_option("--delete-exchange", dest="delete_exchange", default=False, action="store_true", help="Delete the exchange") parser.add_option("--declare-queue", dest="declare_queue", default=False, action="store_true", help="Declare the queue") parser.add_option("--delete-queue", dest="delete_queue", default=False, action="store_true", help="Delete the queue") parser.add_option("--purge-queue", dest="purge_queue", default=False, action="store_true", help="Purge messages from the queue") def __init__(self): self.options, self.args = self.parser.parse_args() log_cfg = { "format": "%(asctime)s %(levelname)-5.5s [%(name)s] %(message)s", } if self.options.quiet: log_cfg["level"] = logging.DEBUG else: log_cfg["level"] = logging.INFO logging.basicConfig(**log_cfg) self.log = logging.getLogger(__name__) def command(self): self.connect() if self.options.delete_exchange: if self.check_exchange(): self.delete_exchange() else: self.log.info("No such exchange, nothing to delete") if self.options.declare_exchange: if self.check_exchange(): self.log.info("Exchange already exists, delete it first") else: self.declare_exchange() if self.options.delete_queue: if self.check_exchange(): self.delete_queue() else: self.log.info("No such queue, nothing to delete") if self.options.declare_queue: if self.check_queue(): self.log.info("Queue already exists, delete it first") else: self.declare_queue() if self.options.purge_queue: if self.check_queue(): self.purge_queue() else: self.log.info("No such queue")
[docs] def connect(self): """ Connect to the AMQP Server. The connection object wants the server's coordinates and the user credentials. Interaction with the server is actually done with a :class:`amqplib.client_0_8.channel.Channel` object. """ self.log.info("Connecting: %s@%s" % (self.options.username, self.options.server)) self.connection = Connection( hostname=self.options.server, userid=self.options.username, password=self.options.password )
[docs] def check_exchange(self): """ Check to see if exchange exists """ try: channel = Channel(self.connection) channel.exchange_declare( self.options.exchange, self.options.exchange_type, passive=True, ) return True except AMQPChannelException: return False
[docs] def declare_exchange(self): """ Declare an exchange """ self.log.info("Declaring exchange: %s type: %s durable: %s" % ( self.options.exchange, self.options.exchange_type, self.options.durable )) channel = Channel(self.connection) channel.exchange_declare( self.options.exchange, self.options.exchange_type, passive=False, durable=self.options.durable )
[docs] def delete_exchange(self): """ Delete an exchange """ self.log.info("Deleting exchange: %s" % (self.options.exchange,)) channel = Channel(self.connection) channel.exchange_delete(self.options.exchange)
[docs] def check_queue(self): """ Check to see if queue exists """ try: channel = Channel(self.connection) channel.queue_declare( self.options.queue, passive=True ) return True except AMQPChannelException: return False
[docs] def declare_queue(self): """ Declare a queue """ self.log.info("Declaring queue: %s durable: %s exclusive: %s" % ( self.options.queue, self.options.durable, self.options.exclusive )) channel = Channel(self.connection) channel.queue_declare( self.options.queue, passive=False, durable=self.options.durable, exclusive=self.options.exclusive )
[docs] def delete_queue(self): """ Delete a queue """ self.log.info("Deleting queue: %s" % (self.options.queue,)) channel = Channel(self.connection) channel.queue_delete(self.options.queue)
[docs] def purge_queue(self): """ Purge a queue """ self.log.info("Purging queue: %s" % (self.options.queue,)) channel = Channel(self.connection) channel.queue_purge(self.options.queue)
def amqpctl(): AMQPCTL().command()