Source code for nucleon.amqp.spec

# Autogenerated - do not edit
import struct
import inspect
from functools import wraps, partial

import table



# Tables of method ids to decode functions
METHODS = {}


class FrameMeta(type):
    def __new__(cls, main, bases, dict):
        t = type.__new__(cls, main, bases, dict)
        if 'METHOD_ID' in dict:
            METHODS[dict['METHOD_ID']] = t
        return t


class Frame(object):
    __metaclass__ = FrameMeta

    has_content = False
    is_error = False

    def __init__(self, *args):
        for k, v in zip(self.__slots__, args):
            setattr(self, k, v)

PREAMBLE = 'AMQP\x00\x00\x09\x01'

CLASS_BASIC             = 0x003C


class FrameConnectionStart(Frame):
    __slots__ = ('version_major', 'version_minor', 'server_properties', 'mechanisms', 'locales')
    name = 'connection.start'
    METHOD_ID = 0x000A000A  # 10,10 655370
    has_content = False

    @staticmethod
    def decode(buffer):
        version_major, version_minor, = buffer.read('!BB')
        server_properties = buffer.read_table()
        mechanisms = buffer.read_string('!I')
        locales = buffer.read_string('!I')
        return FrameConnectionStart(version_major, version_minor, server_properties, mechanisms, locales)

    def encode(self):
        server_properties_raw = table.encode(self.server_properties)
        yield (0x01,
            ''.join([
                struct.pack('!IBB', self.METHOD_ID, self.version_major, self.version_minor),
                server_properties_raw,
                struct.pack('!I', len(self.mechanisms)),
                self.mechanisms,
                struct.pack('!I', len(self.locales)),
                self.locales,
            ])
        )


class FrameConnectionStartOk(Frame):
    __slots__ = ('client_properties', 'mechanism', 'response', 'locale')
    name = 'connection.start-ok'
    METHOD_ID = 0x000A000B  # 10,11 655371
    has_content = False

    @staticmethod
    def decode(buffer):
        client_properties = buffer.read_table()
        mechanism = buffer.read_string('!B')
        response = buffer.read_string('!I')
        locale = buffer.read_string('!B')
        return FrameConnectionStartOk(client_properties, mechanism, response, locale)

    def encode(self):
        client_properties_raw = table.encode(self.client_properties)
        yield (0x01,
            ''.join([
                struct.pack('!I', self.METHOD_ID),
                client_properties_raw,
                struct.pack('!B', len(self.mechanism)),
                self.mechanism,
                struct.pack('!I', len(self.response)),
                self.response,
                struct.pack('!B', len(self.locale)),
                self.locale,
            ])
        )


class FrameConnectionSecure(Frame):
    __slots__ = ('challenge',)
    name = 'connection.secure'
    METHOD_ID = 0x000A0014  # 10,20 655380
    has_content = False

    @staticmethod
    def decode(buffer):
        challenge = buffer.read_string('!I')
        return FrameConnectionSecure(challenge)

    def encode(self):
        yield (0x01,
            ''.join([
                struct.pack('!II', self.METHOD_ID, len(self.challenge)),
                self.challenge,
            ])
        )


class FrameConnectionSecureOk(Frame):
    __slots__ = ('response',)
    name = 'connection.secure-ok'
    METHOD_ID = 0x000A0015  # 10,21 655381
    has_content = False

    @staticmethod
    def decode(buffer):
        response = buffer.read_string('!I')
        return FrameConnectionSecureOk(response)

    def encode(self):
        yield (0x01,
            ''.join([
                struct.pack('!II', self.METHOD_ID, len(self.response)),
                self.response,
            ])
        )


class FrameConnectionTune(Frame):
    __slots__ = ('channel_max', 'frame_max', 'heartbeat')
    name = 'connection.tune'
    METHOD_ID = 0x000A001E  # 10,30 655390
    has_content = False

    @staticmethod
    def decode(buffer):
        channel_max, frame_max, heartbeat, = buffer.read('!HIH')
        return FrameConnectionTune(channel_max, frame_max, heartbeat)

    def encode(self):
        yield (0x01,
            struct.pack('!IHIH', self.METHOD_ID, self.channel_max, self.frame_max, self.heartbeat),
        )


class FrameConnectionTuneOk(Frame):
    __slots__ = ('channel_max', 'frame_max', 'heartbeat')
    name = 'connection.tune-ok'
    METHOD_ID = 0x000A001F  # 10,31 655391
    has_content = False

    @staticmethod
    def decode(buffer):
        channel_max, frame_max, heartbeat, = buffer.read('!HIH')
        return FrameConnectionTuneOk(channel_max, frame_max, heartbeat)

    def encode(self):
        yield (0x01,
            struct.pack('!IHIH', self.METHOD_ID, self.channel_max, self.frame_max, self.heartbeat),
        )


class FrameConnectionOpen(Frame):
    __slots__ = ('virtual_host', 'capabilities', 'insist')
    name = 'connection.open'
    METHOD_ID = 0x000A0028  # 10,40 655400
    has_content = False

    @staticmethod
    def decode(buffer):
        virtual_host = buffer.read_string('!B')
        capabilities = buffer.read_string('!B')
        bits, = buffer.read('!B')
        insist = bool(bits & 0x1)
        return FrameConnectionOpen(virtual_host, capabilities, insist)

    def encode(self):
        yield (0x01,
            ''.join([
                struct.pack('!IB', self.METHOD_ID, len(self.virtual_host)),
                self.virtual_host,
                struct.pack('!B', len(self.capabilities)),
                self.capabilities,
                struct.pack('!B', (self.insist and 0x1 or 0)),
            ])
        )


class FrameConnectionOpenOk(Frame):
    __slots__ = ('known_hosts',)
    name = 'connection.open-ok'
    METHOD_ID = 0x000A0029  # 10,41 655401
    has_content = False

    @staticmethod
    def decode(buffer):
        known_hosts = buffer.read_string('!B')
        return FrameConnectionOpenOk(known_hosts)

    def encode(self):
        yield (0x01,
            ''.join([
                struct.pack('!IB', self.METHOD_ID, len(self.known_hosts)),
                self.known_hosts,
            ])
        )


class FrameConnectionClose(Frame):
    __slots__ = ('reply_code', 'reply_text', 'class_id', 'method_id')
    name = 'connection.close'
    METHOD_ID = 0x000A0032  # 10,50 655410
    has_content = False

    @staticmethod
    def decode(buffer):
        reply_code, = buffer.read('!H')
        reply_text = buffer.read_string('!B')
        class_id, method_id, = buffer.read('!HH')
        return FrameConnectionClose(reply_code, reply_text, class_id, method_id)

    def encode(self):
        yield (0x01,
            ''.join([
                struct.pack('!IHB', self.METHOD_ID, self.reply_code, len(self.reply_text)),
                self.reply_text,
                struct.pack('!HH', self.class_id, self.method_id),
            ])
        )


class FrameConnectionCloseOk(Frame):
    __slots__ = ()
    name = 'connection.close-ok'
    METHOD_ID = 0x000A0033  # 10,51 655411
    has_content = False

    @staticmethod
    def decode(buffer):
        return FrameConnectionCloseOk()

    def encode(self):
        yield (0x01,
            struct.pack('!I', self.METHOD_ID),
        )


class FrameChannelOpen(Frame):
    __slots__ = ('out_of_band',)
    name = 'channel.open'
    METHOD_ID = 0x0014000A  # 20,10 1310730
    has_content = False

    @staticmethod
    def decode(buffer):
        out_of_band = buffer.read_string('!B')
        return FrameChannelOpen(out_of_band)

    def encode(self):
        yield (0x01,
            ''.join([
                struct.pack('!IB', self.METHOD_ID, len(self.out_of_band)),
                self.out_of_band,
            ])
        )


class FrameChannelOpenOk(Frame):
    __slots__ = ('channel_id',)
    name = 'channel.open-ok'
    METHOD_ID = 0x0014000B  # 20,11 1310731
    has_content = False

    @staticmethod
    def decode(buffer):
        channel_id = buffer.read_string('!I')
        return FrameChannelOpenOk(channel_id)

    def encode(self):
        yield (0x01,
            ''.join([
                struct.pack('!II', self.METHOD_ID, len(self.channel_id)),
                self.channel_id,
            ])
        )


class FrameChannelFlow(Frame):
    __slots__ = ('active',)
    name = 'channel.flow'
    METHOD_ID = 0x00140014  # 20,20 1310740
    has_content = False

    @staticmethod
    def decode(buffer):
        bits, = buffer.read('!B')
        active = bool(bits & 0x1)
        return FrameChannelFlow(active)

    def encode(self):
        yield (0x01,
            struct.pack('!IB', self.METHOD_ID, (self.active and 0x1 or 0)),
        )


class FrameChannelFlowOk(Frame):
    __slots__ = ('active',)
    name = 'channel.flow-ok'
    METHOD_ID = 0x00140015  # 20,21 1310741
    has_content = False

    @staticmethod
    def decode(buffer):
        bits, = buffer.read('!B')
        active = bool(bits & 0x1)
        return FrameChannelFlowOk(active)

    def encode(self):
        yield (0x01,
            struct.pack('!IB', self.METHOD_ID, (self.active and 0x1 or 0)),
        )


class FrameChannelClose(Frame):
    __slots__ = ('reply_code', 'reply_text', 'class_id', 'method_id')
    name = 'channel.close'
    METHOD_ID = 0x00140028  # 20,40 1310760
    has_content = False

    @staticmethod
    def decode(buffer):
        reply_code, = buffer.read('!H')
        reply_text = buffer.read_string('!B')
        class_id, method_id, = buffer.read('!HH')
        return FrameChannelClose(reply_code, reply_text, class_id, method_id)

    def encode(self):
        yield (0x01,
            ''.join([
                struct.pack('!IHB', self.METHOD_ID, self.reply_code, len(self.reply_text)),
                self.reply_text,
                struct.pack('!HH', self.class_id, self.method_id),
            ])
        )


class FrameChannelCloseOk(Frame):
    __slots__ = ()
    name = 'channel.close-ok'
    METHOD_ID = 0x00140029  # 20,41 1310761
    has_content = False

    @staticmethod
    def decode(buffer):
        return FrameChannelCloseOk()

    def encode(self):
        yield (0x01,
            struct.pack('!I', self.METHOD_ID),
        )


class FrameAccessRequest(Frame):
    __slots__ = ('realm', 'exclusive', 'passive', 'active', 'write', 'read')
    name = 'access.request'
    METHOD_ID = 0x001E000A  # 30,10 1966090
    has_content = False

    @staticmethod
    def decode(buffer):
        realm = buffer.read_string('!B')
        bits, = buffer.read('!B')
        exclusive = bool(bits & 0x1)
        passive = bool(bits & 0x2)
        active = bool(bits & 0x4)
        write = bool(bits & 0x8)
        read = bool(bits & 0x10)
        return FrameAccessRequest(realm, exclusive, passive, active, write, read)

    def encode(self):
        yield (0x01,
            ''.join([
                struct.pack('!IB', self.METHOD_ID, len(self.realm)),
                self.realm,
                struct.pack('!B', (self.exclusive and 0x1 or 0) | (self.passive and 0x2 or 0) | (self.active and 0x4 or 0) | (self.write and 0x8 or 0) | (self.read and 0x10 or 0)),
            ])
        )


class FrameAccessRequestOk(Frame):
    __slots__ = ('ticket',)
    name = 'access.request-ok'
    METHOD_ID = 0x001E000B  # 30,11 1966091
    has_content = False

    @staticmethod
    def decode(buffer):
        ticket, = buffer.read('!H')
        return FrameAccessRequestOk(ticket)

    def encode(self):
        yield (0x01,
            struct.pack('!IH', self.METHOD_ID, self.ticket),
        )


class FrameExchangeDeclare(Frame):
    __slots__ = ('ticket', 'exchange', 'type', 'passive', 'durable', 'auto_delete', 'internal', 'nowait', 'arguments')
    name = 'exchange.declare'
    METHOD_ID = 0x0028000A  # 40,10 2621450
    has_content = False

    @staticmethod
    def decode(buffer):
        ticket, = buffer.read('!H')
        exchange = buffer.read_string('!B')
        type = buffer.read_string('!B')
        bits, = buffer.read('!B')
        passive = bool(bits & 0x1)
        durable = bool(bits & 0x2)
        auto_delete = bool(bits & 0x4)
        internal = bool(bits & 0x8)
        nowait = bool(bits & 0x10)
        arguments = buffer.read_table()
        return FrameExchangeDeclare(ticket, exchange, type, passive, durable, auto_delete, internal, nowait, arguments)

    def encode(self):
        arguments_raw = table.encode(self.arguments)
        yield (0x01,
            ''.join([
                struct.pack('!IHB', self.METHOD_ID, self.ticket, len(self.exchange)),
                self.exchange,
                struct.pack('!B', len(self.type)),
                self.type,
                struct.pack('!B', (self.passive and 0x1 or 0) | (self.durable and 0x2 or 0) | (self.auto_delete and 0x4 or 0) | (self.internal and 0x8 or 0) | (self.nowait and 0x10 or 0)),
                arguments_raw,
            ])
        )


class FrameExchangeDeclareOk(Frame):
    __slots__ = ()
    name = 'exchange.declare-ok'
    METHOD_ID = 0x0028000B  # 40,11 2621451
    has_content = False

    @staticmethod
    def decode(buffer):
        return FrameExchangeDeclareOk()

    def encode(self):
        yield (0x01,
            struct.pack('!I', self.METHOD_ID),
        )


class FrameExchangeDelete(Frame):
    __slots__ = ('ticket', 'exchange', 'if_unused', 'nowait')
    name = 'exchange.delete'
    METHOD_ID = 0x00280014  # 40,20 2621460
    has_content = False

    @staticmethod
    def decode(buffer):
        ticket, = buffer.read('!H')
        exchange = buffer.read_string('!B')
        bits, = buffer.read('!B')
        if_unused = bool(bits & 0x1)
        nowait = bool(bits & 0x2)
        return FrameExchangeDelete(ticket, exchange, if_unused, nowait)

    def encode(self):
        yield (0x01,
            ''.join([
                struct.pack('!IHB', self.METHOD_ID, self.ticket, len(self.exchange)),
                self.exchange,
                struct.pack('!B', (self.if_unused and 0x1 or 0) | (self.nowait and 0x2 or 0)),
            ])
        )


class FrameExchangeDeleteOk(Frame):
    __slots__ = ()
    name = 'exchange.delete-ok'
    METHOD_ID = 0x00280015  # 40,21 2621461
    has_content = False

    @staticmethod
    def decode(buffer):
        return FrameExchangeDeleteOk()

    def encode(self):
        yield (0x01,
            struct.pack('!I', self.METHOD_ID),
        )


class FrameExchangeBind(Frame):
    __slots__ = ('ticket', 'destination', 'source', 'routing_key', 'nowait', 'arguments')
    name = 'exchange.bind'
    METHOD_ID = 0x0028001E  # 40,30 2621470
    has_content = False

    @staticmethod
    def decode(buffer):
        ticket, = buffer.read('!H')
        destination = buffer.read_string('!B')
        source = buffer.read_string('!B')
        routing_key = buffer.read_string('!B')
        bits, = buffer.read('!B')
        nowait = bool(bits & 0x1)
        arguments = buffer.read_table()
        return FrameExchangeBind(ticket, destination, source, routing_key, nowait, arguments)

    def encode(self):
        arguments_raw = table.encode(self.arguments)
        yield (0x01,
            ''.join([
                struct.pack('!IHB', self.METHOD_ID, self.ticket, len(self.destination)),
                self.destination,
                struct.pack('!B', len(self.source)),
                self.source,
                struct.pack('!B', len(self.routing_key)),
                self.routing_key,
                struct.pack('!B', (self.nowait and 0x1 or 0)),
                arguments_raw,
            ])
        )


class FrameExchangeBindOk(Frame):
    __slots__ = ()
    name = 'exchange.bind-ok'
    METHOD_ID = 0x0028001F  # 40,31 2621471
    has_content = False

    @staticmethod
    def decode(buffer):
        return FrameExchangeBindOk()

    def encode(self):
        yield (0x01,
            struct.pack('!I', self.METHOD_ID),
        )


class FrameExchangeUnbind(Frame):
    __slots__ = ('ticket', 'destination', 'source', 'routing_key', 'nowait', 'arguments')
    name = 'exchange.unbind'
    METHOD_ID = 0x00280028  # 40,40 2621480
    has_content = False

    @staticmethod
    def decode(buffer):
        ticket, = buffer.read('!H')
        destination = buffer.read_string('!B')
        source = buffer.read_string('!B')
        routing_key = buffer.read_string('!B')
        bits, = buffer.read('!B')
        nowait = bool(bits & 0x1)
        arguments = buffer.read_table()
        return FrameExchangeUnbind(ticket, destination, source, routing_key, nowait, arguments)

    def encode(self):
        arguments_raw = table.encode(self.arguments)
        yield (0x01,
            ''.join([
                struct.pack('!IHB', self.METHOD_ID, self.ticket, len(self.destination)),
                self.destination,
                struct.pack('!B', len(self.source)),
                self.source,
                struct.pack('!B', len(self.routing_key)),
                self.routing_key,
                struct.pack('!B', (self.nowait and 0x1 or 0)),
                arguments_raw,
            ])
        )


class FrameExchangeUnbindOk(Frame):
    __slots__ = ()
    name = 'exchange.unbind-ok'
    METHOD_ID = 0x00280033  # 40,51 2621491
    has_content = False

    @staticmethod
    def decode(buffer):
        return FrameExchangeUnbindOk()

    def encode(self):
        yield (0x01,
            struct.pack('!I', self.METHOD_ID),
        )


class FrameQueueDeclare(Frame):
    __slots__ = ('ticket', 'queue', 'passive', 'durable', 'exclusive', 'auto_delete', 'nowait', 'arguments')
    name = 'queue.declare'
    METHOD_ID = 0x0032000A  # 50,10 3276810
    has_content = False

    @staticmethod
    def decode(buffer):
        ticket, = buffer.read('!H')
        queue = buffer.read_string('!B')
        bits, = buffer.read('!B')
        passive = bool(bits & 0x1)
        durable = bool(bits & 0x2)
        exclusive = bool(bits & 0x4)
        auto_delete = bool(bits & 0x8)
        nowait = bool(bits & 0x10)
        arguments = buffer.read_table()
        return FrameQueueDeclare(ticket, queue, passive, durable, exclusive, auto_delete, nowait, arguments)

    def encode(self):
        arguments_raw = table.encode(self.arguments)
        yield (0x01,
            ''.join([
                struct.pack('!IHB', self.METHOD_ID, self.ticket, len(self.queue)),
                self.queue,
                struct.pack('!B', (self.passive and 0x1 or 0) | (self.durable and 0x2 or 0) | (self.exclusive and 0x4 or 0) | (self.auto_delete and 0x8 or 0) | (self.nowait and 0x10 or 0)),
                arguments_raw,
            ])
        )


class FrameQueueDeclareOk(Frame):
    __slots__ = ('queue', 'message_count', 'consumer_count')
    name = 'queue.declare-ok'
    METHOD_ID = 0x0032000B  # 50,11 3276811
    has_content = False

    @staticmethod
    def decode(buffer):
        queue = buffer.read_string('!B')
        message_count, consumer_count, = buffer.read('!II')
        return FrameQueueDeclareOk(queue, message_count, consumer_count)

    def encode(self):
        yield (0x01,
            ''.join([
                struct.pack('!IB', self.METHOD_ID, len(self.queue)),
                self.queue,
                struct.pack('!II', self.message_count, self.consumer_count),
            ])
        )


class FrameQueueBind(Frame):
    __slots__ = ('ticket', 'queue', 'exchange', 'routing_key', 'nowait', 'arguments')
    name = 'queue.bind'
    METHOD_ID = 0x00320014  # 50,20 3276820
    has_content = False

    @staticmethod
    def decode(buffer):
        ticket, = buffer.read('!H')
        queue = buffer.read_string('!B')
        exchange = buffer.read_string('!B')
        routing_key = buffer.read_string('!B')
        bits, = buffer.read('!B')
        nowait = bool(bits & 0x1)
        arguments = buffer.read_table()
        return FrameQueueBind(ticket, queue, exchange, routing_key, nowait, arguments)

    def encode(self):
        arguments_raw = table.encode(self.arguments)
        yield (0x01,
            ''.join([
                struct.pack('!IHB', self.METHOD_ID, self.ticket, len(self.queue)),
                self.queue,
                struct.pack('!B', len(self.exchange)),
                self.exchange,
                struct.pack('!B', len(self.routing_key)),
                self.routing_key,
                struct.pack('!B', (self.nowait and 0x1 or 0)),
                arguments_raw,
            ])
        )


class FrameQueueBindOk(Frame):
    __slots__ = ()
    name = 'queue.bind-ok'
    METHOD_ID = 0x00320015  # 50,21 3276821
    has_content = False

    @staticmethod
    def decode(buffer):
        return FrameQueueBindOk()

    def encode(self):
        yield (0x01,
            struct.pack('!I', self.METHOD_ID),
        )


class FrameQueuePurge(Frame):
    __slots__ = ('ticket', 'queue', 'nowait')
    name = 'queue.purge'
    METHOD_ID = 0x0032001E  # 50,30 3276830
    has_content = False

    @staticmethod
    def decode(buffer):
        ticket, = buffer.read('!H')
        queue = buffer.read_string('!B')
        bits, = buffer.read('!B')
        nowait = bool(bits & 0x1)
        return FrameQueuePurge(ticket, queue, nowait)

    def encode(self):
        yield (0x01,
            ''.join([
                struct.pack('!IHB', self.METHOD_ID, self.ticket, len(self.queue)),
                self.queue,
                struct.pack('!B', (self.nowait and 0x1 or 0)),
            ])
        )


class FrameQueuePurgeOk(Frame):
    __slots__ = ('message_count',)
    name = 'queue.purge-ok'
    METHOD_ID = 0x0032001F  # 50,31 3276831
    has_content = False

    @staticmethod
    def decode(buffer):
        message_count, = buffer.read('!I')
        return FrameQueuePurgeOk(message_count)

    def encode(self):
        yield (0x01,
            struct.pack('!II', self.METHOD_ID, self.message_count),
        )


class FrameQueueDelete(Frame):
    __slots__ = ('ticket', 'queue', 'if_unused', 'if_empty', 'nowait')
    name = 'queue.delete'
    METHOD_ID = 0x00320028  # 50,40 3276840
    has_content = False

    @staticmethod
    def decode(buffer):
        ticket, = buffer.read('!H')
        queue = buffer.read_string('!B')
        bits, = buffer.read('!B')
        if_unused = bool(bits & 0x1)
        if_empty = bool(bits & 0x2)
        nowait = bool(bits & 0x4)
        return FrameQueueDelete(ticket, queue, if_unused, if_empty, nowait)

    def encode(self):
        yield (0x01,
            ''.join([
                struct.pack('!IHB', self.METHOD_ID, self.ticket, len(self.queue)),
                self.queue,
                struct.pack('!B', (self.if_unused and 0x1 or 0) | (self.if_empty and 0x2 or 0) | (self.nowait and 0x4 or 0)),
            ])
        )


class FrameQueueDeleteOk(Frame):
    __slots__ = ('message_count',)
    name = 'queue.delete-ok'
    METHOD_ID = 0x00320029  # 50,41 3276841
    has_content = False

    @staticmethod
    def decode(buffer):
        message_count, = buffer.read('!I')
        return FrameQueueDeleteOk(message_count)

    def encode(self):
        yield (0x01,
            struct.pack('!II', self.METHOD_ID, self.message_count),
        )


class FrameQueueUnbind(Frame):
    __slots__ = ('ticket', 'queue', 'exchange', 'routing_key', 'arguments')
    name = 'queue.unbind'
    METHOD_ID = 0x00320032  # 50,50 3276850
    has_content = False

    @staticmethod
    def decode(buffer):
        ticket, = buffer.read('!H')
        queue = buffer.read_string('!B')
        exchange = buffer.read_string('!B')
        routing_key = buffer.read_string('!B')
        arguments = buffer.read_table()
        return FrameQueueUnbind(ticket, queue, exchange, routing_key, arguments)

    def encode(self):
        arguments_raw = table.encode(self.arguments)
        yield (0x01,
            ''.join([
                struct.pack('!IHB', self.METHOD_ID, self.ticket, len(self.queue)),
                self.queue,
                struct.pack('!B', len(self.exchange)),
                self.exchange,
                struct.pack('!B', len(self.routing_key)),
                self.routing_key,
                arguments_raw,
            ])
        )


class FrameQueueUnbindOk(Frame):
    __slots__ = ()
    name = 'queue.unbind-ok'
    METHOD_ID = 0x00320033  # 50,51 3276851
    has_content = False

    @staticmethod
    def decode(buffer):
        return FrameQueueUnbindOk()

    def encode(self):
        yield (0x01,
            struct.pack('!I', self.METHOD_ID),
        )


class FrameBasicQos(Frame):
    __slots__ = ('prefetch_size', 'prefetch_count', 'global_')
    name = 'basic.qos'
    METHOD_ID = 0x003C000A  # 60,10 3932170
    has_content = False

    @staticmethod
    def decode(buffer):
        prefetch_size, prefetch_count, bits, = buffer.read('!IHB')
        global_ = bool(bits & 0x1)
        return FrameBasicQos(prefetch_size, prefetch_count, global_)

    def encode(self):
        yield (0x01,
            struct.pack('!IIHB', self.METHOD_ID, self.prefetch_size, self.prefetch_count, (self.global_ and 0x1 or 0)),
        )


class FrameBasicQosOk(Frame):
    __slots__ = ()
    name = 'basic.qos-ok'
    METHOD_ID = 0x003C000B  # 60,11 3932171
    has_content = False

    @staticmethod
    def decode(buffer):
        return FrameBasicQosOk()

    def encode(self):
        yield (0x01,
            struct.pack('!I', self.METHOD_ID),
        )


class FrameBasicConsume(Frame):
    __slots__ = ('ticket', 'queue', 'consumer_tag', 'no_local', 'no_ack', 'exclusive', 'nowait', 'arguments')
    name = 'basic.consume'
    METHOD_ID = 0x003C0014  # 60,20 3932180
    has_content = False

    @staticmethod
    def decode(buffer):
        ticket, = buffer.read('!H')
        queue = buffer.read_string('!B')
        consumer_tag = buffer.read_string('!B')
        bits, = buffer.read('!B')
        no_local = bool(bits & 0x1)
        no_ack = bool(bits & 0x2)
        exclusive = bool(bits & 0x4)
        nowait = bool(bits & 0x8)
        arguments = buffer.read_table()
        return FrameBasicConsume(ticket, queue, consumer_tag, no_local, no_ack, exclusive, nowait, arguments)

    def encode(self):
        arguments_raw = table.encode(self.arguments)
        yield (0x01,
            ''.join([
                struct.pack('!IHB', self.METHOD_ID, self.ticket, len(self.queue)),
                self.queue,
                struct.pack('!B', len(self.consumer_tag)),
                self.consumer_tag,
                struct.pack('!B', (self.no_local and 0x1 or 0) | (self.no_ack and 0x2 or 0) | (self.exclusive and 0x4 or 0) | (self.nowait and 0x8 or 0)),
                arguments_raw,
            ])
        )


class FrameBasicConsumeOk(Frame):
    __slots__ = ('consumer_tag',)
    name = 'basic.consume-ok'
    METHOD_ID = 0x003C0015  # 60,21 3932181
    has_content = False

    @staticmethod
    def decode(buffer):
        consumer_tag = buffer.read_string('!B')
        return FrameBasicConsumeOk(consumer_tag)

    def encode(self):
        yield (0x01,
            ''.join([
                struct.pack('!IB', self.METHOD_ID, len(self.consumer_tag)),
                self.consumer_tag,
            ])
        )


class FrameBasicCancel(Frame):
    __slots__ = ('consumer_tag', 'nowait')
    name = 'basic.cancel'
    METHOD_ID = 0x003C001E  # 60,30 3932190
    has_content = False

    @staticmethod
    def decode(buffer):
        consumer_tag = buffer.read_string('!B')
        bits, = buffer.read('!B')
        nowait = bool(bits & 0x1)
        return FrameBasicCancel(consumer_tag, nowait)

    def encode(self):
        yield (0x01,
            ''.join([
                struct.pack('!IB', self.METHOD_ID, len(self.consumer_tag)),
                self.consumer_tag,
                struct.pack('!B', (self.nowait and 0x1 or 0)),
            ])
        )


class FrameBasicCancelOk(Frame):
    __slots__ = ('consumer_tag',)
    name = 'basic.cancel-ok'
    METHOD_ID = 0x003C001F  # 60,31 3932191
    has_content = False

    @staticmethod
    def decode(buffer):
        consumer_tag = buffer.read_string('!B')
        return FrameBasicCancelOk(consumer_tag)

    def encode(self):
        yield (0x01,
            ''.join([
                struct.pack('!IB', self.METHOD_ID, len(self.consumer_tag)),
                self.consumer_tag,
            ])
        )


class FrameBasicPublish(Frame):
    __slots__ = ('ticket', 'exchange', 'routing_key', 'mandatory', 'immediate')
    name = 'basic.publish'
    METHOD_ID = 0x003C0028  # 60,40 3932200
    has_content = True
    class_id = CLASS_BASIC

    @staticmethod
    def decode(buffer):
        ticket, = buffer.read('!H')
        exchange = buffer.read_string('!B')
        routing_key = buffer.read_string('!B')
        bits, = buffer.read('!B')
        mandatory = bool(bits & 0x1)
        immediate = bool(bits & 0x2)
        return FrameBasicPublish(ticket, exchange, routing_key, mandatory, immediate)

    def encode(self):
        yield (0x01,
            ''.join([
                struct.pack('!IHB', self.METHOD_ID, self.ticket, len(self.exchange)),
                self.exchange,
                struct.pack('!B', len(self.routing_key)),
                self.routing_key,
                struct.pack('!B', (self.mandatory and 0x1 or 0) | (self.immediate and 0x2 or 0)),
            ])
        )


class FrameBasicReturn(Frame):
    __slots__ = ('reply_code', 'reply_text', 'exchange', 'routing_key')
    name = 'basic.return'
    METHOD_ID = 0x003C0032  # 60,50 3932210
    has_content = True
    class_id = CLASS_BASIC

    @staticmethod
    def decode(buffer):
        reply_code, = buffer.read('!H')
        reply_text = buffer.read_string('!B')
        exchange = buffer.read_string('!B')
        routing_key = buffer.read_string('!B')
        return FrameBasicReturn(reply_code, reply_text, exchange, routing_key)

    def encode(self):
        yield (0x01,
            ''.join([
                struct.pack('!IHB', self.METHOD_ID, self.reply_code, len(self.reply_text)),
                self.reply_text,
                struct.pack('!B', len(self.exchange)),
                self.exchange,
                struct.pack('!B', len(self.routing_key)),
                self.routing_key,
            ])
        )


class FrameBasicDeliver(Frame):
    __slots__ = ('consumer_tag', 'delivery_tag', 'redelivered', 'exchange', 'routing_key')
    name = 'basic.deliver'
    METHOD_ID = 0x003C003C  # 60,60 3932220
    has_content = True
    class_id = CLASS_BASIC

    @staticmethod
    def decode(buffer):
        consumer_tag = buffer.read_string('!B')
        delivery_tag, bits, = buffer.read('!QB')
        redelivered = bool(bits & 0x1)
        exchange = buffer.read_string('!B')
        routing_key = buffer.read_string('!B')
        return FrameBasicDeliver(consumer_tag, delivery_tag, redelivered, exchange, routing_key)

    def encode(self):
        yield (0x01,
            ''.join([
                struct.pack('!IB', self.METHOD_ID, len(self.consumer_tag)),
                self.consumer_tag,
                struct.pack('!QBB', self.delivery_tag, (self.redelivered and 0x1 or 0), len(self.exchange)),
                self.exchange,
                struct.pack('!B', len(self.routing_key)),
                self.routing_key,
            ])
        )


class FrameBasicGet(Frame):
    __slots__ = ('ticket', 'queue', 'no_ack')
    name = 'basic.get'
    METHOD_ID = 0x003C0046  # 60,70 3932230
    has_content = False

    @staticmethod
    def decode(buffer):
        ticket, = buffer.read('!H')
        queue = buffer.read_string('!B')
        bits, = buffer.read('!B')
        no_ack = bool(bits & 0x1)
        return FrameBasicGet(ticket, queue, no_ack)

    def encode(self):
        yield (0x01,
            ''.join([
                struct.pack('!IHB', self.METHOD_ID, self.ticket, len(self.queue)),
                self.queue,
                struct.pack('!B', (self.no_ack and 0x1 or 0)),
            ])
        )


class FrameBasicGetOk(Frame):
    __slots__ = ('delivery_tag', 'redelivered', 'exchange', 'routing_key', 'message_count')
    name = 'basic.get-ok'
    METHOD_ID = 0x003C0047  # 60,71 3932231
    has_content = True
    class_id = CLASS_BASIC

    @staticmethod
    def decode(buffer):
        delivery_tag, bits, = buffer.read('!QB')
        redelivered = bool(bits & 0x1)
        exchange = buffer.read_string('!B')
        routing_key = buffer.read_string('!B')
        message_count, = buffer.read('!I')
        return FrameBasicGetOk(delivery_tag, redelivered, exchange, routing_key, message_count)

    def encode(self):
        yield (0x01,
            ''.join([
                struct.pack('!IQBB', self.METHOD_ID, self.delivery_tag, (self.redelivered and 0x1 or 0), len(self.exchange)),
                self.exchange,
                struct.pack('!B', len(self.routing_key)),
                self.routing_key,
                struct.pack('!I', self.message_count),
            ])
        )


class FrameBasicGetEmpty(Frame):
    __slots__ = ('cluster_id',)
    name = 'basic.get-empty'
    METHOD_ID = 0x003C0048  # 60,72 3932232
    has_content = False

    @staticmethod
    def decode(buffer):
        cluster_id = buffer.read_string('!B')
        return FrameBasicGetEmpty(cluster_id)

    def encode(self):
        yield (0x01,
            ''.join([
                struct.pack('!IB', self.METHOD_ID, len(self.cluster_id)),
                self.cluster_id,
            ])
        )


class FrameBasicAck(Frame):
    __slots__ = ('delivery_tag', 'multiple')
    name = 'basic.ack'
    METHOD_ID = 0x003C0050  # 60,80 3932240
    has_content = False

    @staticmethod
    def decode(buffer):
        delivery_tag, bits, = buffer.read('!QB')
        multiple = bool(bits & 0x1)
        return FrameBasicAck(delivery_tag, multiple)

    def encode(self):
        yield (0x01,
            struct.pack('!IQB', self.METHOD_ID, self.delivery_tag, (self.multiple and 0x1 or 0)),
        )


class FrameBasicReject(Frame):
    __slots__ = ('delivery_tag', 'requeue')
    name = 'basic.reject'
    METHOD_ID = 0x003C005A  # 60,90 3932250
    has_content = False

    @staticmethod
    def decode(buffer):
        delivery_tag, bits, = buffer.read('!QB')
        requeue = bool(bits & 0x1)
        return FrameBasicReject(delivery_tag, requeue)

    def encode(self):
        yield (0x01,
            struct.pack('!IQB', self.METHOD_ID, self.delivery_tag, (self.requeue and 0x1 or 0)),
        )


class FrameBasicRecoverAsync(Frame):
    __slots__ = ('requeue',)
    name = 'basic.recover-async'
    METHOD_ID = 0x003C0064  # 60,100 3932260
    has_content = False

    @staticmethod
    def decode(buffer):
        bits, = buffer.read('!B')
        requeue = bool(bits & 0x1)
        return FrameBasicRecoverAsync(requeue)

    def encode(self):
        yield (0x01,
            struct.pack('!IB', self.METHOD_ID, (self.requeue and 0x1 or 0)),
        )


class FrameBasicRecover(Frame):
    __slots__ = ('requeue',)
    name = 'basic.recover'
    METHOD_ID = 0x003C006E  # 60,110 3932270
    has_content = False

    @staticmethod
    def decode(buffer):
        bits, = buffer.read('!B')
        requeue = bool(bits & 0x1)
        return FrameBasicRecover(requeue)

    def encode(self):
        yield (0x01,
            struct.pack('!IB', self.METHOD_ID, (self.requeue and 0x1 or 0)),
        )


class FrameBasicRecoverOk(Frame):
    __slots__ = ()
    name = 'basic.recover-ok'
    METHOD_ID = 0x003C006F  # 60,111 3932271
    has_content = False

    @staticmethod
    def decode(buffer):
        return FrameBasicRecoverOk()

    def encode(self):
        yield (0x01,
            struct.pack('!I', self.METHOD_ID),
        )


class FrameBasicNack(Frame):
    __slots__ = ('delivery_tag', 'multiple', 'requeue')
    name = 'basic.nack'
    METHOD_ID = 0x003C0078  # 60,120 3932280
    has_content = False

    @staticmethod
    def decode(buffer):
        delivery_tag, bits, = buffer.read('!QB')
        multiple = bool(bits & 0x1)
        requeue = bool(bits & 0x2)
        return FrameBasicNack(delivery_tag, multiple, requeue)

    def encode(self):
        yield (0x01,
            struct.pack('!IQB', self.METHOD_ID, self.delivery_tag, (self.multiple and 0x1 or 0) | (self.requeue and 0x2 or 0)),
        )


class FrameTxSelect(Frame):
    __slots__ = ()
    name = 'tx.select'
    METHOD_ID = 0x005A000A  # 90,10 5898250
    has_content = False

    @staticmethod
    def decode(buffer):
        return FrameTxSelect()

    def encode(self):
        yield (0x01,
            struct.pack('!I', self.METHOD_ID),
        )


class FrameTxSelectOk(Frame):
    __slots__ = ()
    name = 'tx.select-ok'
    METHOD_ID = 0x005A000B  # 90,11 5898251
    has_content = False

    @staticmethod
    def decode(buffer):
        return FrameTxSelectOk()

    def encode(self):
        yield (0x01,
            struct.pack('!I', self.METHOD_ID),
        )


class FrameTxCommit(Frame):
    __slots__ = ()
    name = 'tx.commit'
    METHOD_ID = 0x005A0014  # 90,20 5898260
    has_content = False

    @staticmethod
    def decode(buffer):
        return FrameTxCommit()

    def encode(self):
        yield (0x01,
            struct.pack('!I', self.METHOD_ID),
        )


class FrameTxCommitOk(Frame):
    __slots__ = ()
    name = 'tx.commit-ok'
    METHOD_ID = 0x005A0015  # 90,21 5898261
    has_content = False

    @staticmethod
    def decode(buffer):
        return FrameTxCommitOk()

    def encode(self):
        yield (0x01,
            struct.pack('!I', self.METHOD_ID),
        )


class FrameTxRollback(Frame):
    __slots__ = ()
    name = 'tx.rollback'
    METHOD_ID = 0x005A001E  # 90,30 5898270
    has_content = False

    @staticmethod
    def decode(buffer):
        return FrameTxRollback()

    def encode(self):
        yield (0x01,
            struct.pack('!I', self.METHOD_ID),
        )


class FrameTxRollbackOk(Frame):
    __slots__ = ()
    name = 'tx.rollback-ok'
    METHOD_ID = 0x005A001F  # 90,31 5898271
    has_content = False

    @staticmethod
    def decode(buffer):
        return FrameTxRollbackOk()

    def encode(self):
        yield (0x01,
            struct.pack('!I', self.METHOD_ID),
        )


class FrameConfirmSelect(Frame):
    __slots__ = ('nowait',)
    name = 'confirm.select'
    METHOD_ID = 0x0055000A  # 85,10 5570570
    has_content = False

    @staticmethod
    def decode(buffer):
        bits, = buffer.read('!B')
        nowait = bool(bits & 0x1)
        return FrameConfirmSelect(nowait)

    def encode(self):
        yield (0x01,
            struct.pack('!IB', self.METHOD_ID, (self.nowait and 0x1 or 0)),
        )


class FrameConfirmSelectOk(Frame):
    __slots__ = ()
    name = 'confirm.select-ok'
    METHOD_ID = 0x0055000B  # 85,11 5570571
    has_content = False

    @staticmethod
    def decode(buffer):
        return FrameConfirmSelectOk()

    def encode(self):
        yield (0x01,
            struct.pack('!I', self.METHOD_ID),
        )



[docs]def decode_basic_properties(buffer): props = {} flags, = buffer.read('!H') assert (flags & 0x01) == 0 if (flags & 0x8000): # 1 << 15 props['content_type'] = buffer.read_string('!B') if (flags & 0x4000): # 1 << 14 props['content_encoding'] = buffer.read_string('!B') if (flags & 0x2000): # 1 << 13 props['headers'] = buffer.read_table() if (flags & 0x1000): # 1 << 12 props['delivery_mode'], = buffer.read('!B') if (flags & 0x0800): # 1 << 11 props['priority'], = buffer.read('!B') if (flags & 0x0400): # 1 << 10 props['correlation_id'] = buffer.read_string('!B') if (flags & 0x0200): # 1 << 9 props['reply_to'] = buffer.read_string('!B') if (flags & 0x0100): # 1 << 8 props['expiration'] = buffer.read_string('!B') if (flags & 0x0080): # 1 << 7 props['message_id'] = buffer.read_string('!B') if (flags & 0x0040): # 1 << 6 props['timestamp'], = buffer.read('!Q') if (flags & 0x0020): # 1 << 5 props['type'] = buffer.read_string('!B') if (flags & 0x0010): # 1 << 4 props['user_id'] = buffer.read_string('!B') if (flags & 0x0008): # 1 << 3 props['app_id'] = buffer.read_string('!B') if (flags & 0x0004): # 1 << 2 props['cluster_id'] = buffer.read_string('!B') return props
PROPS = { CLASS_BASIC: decode_basic_properties, # 60 } BASIC_PROPS_SET = set(( "content_type", # shortstr "content_encoding", # shortstr "headers", # table "delivery_mode", # octet "priority", # octet "correlation_id", # shortstr "reply_to", # shortstr "expiration", # shortstr "message_id", # shortstr "timestamp", # timestamp "type", # shortstr "user_id", # shortstr "app_id", # shortstr "cluster_id", # shortstr )) ENCODE_PROPS_BASIC = { 'content_type': ( 0, 0x8000, # (1 << 15) lambda val: ''.join(( struct.pack('!B', len(val)), val, )) ), 'content_encoding': ( 1, 0x4000, # (1 << 14) lambda val: ''.join(( struct.pack('!B', len(val)), val, )) ), 'headers': ( 2, 0x2000, # (1 << 13) lambda val: table.encode(val) ), 'delivery_mode': ( 3, 0x1000, # (1 << 12) lambda val: struct.pack('!B', val) ), 'priority': ( 4, 0x0800, # (1 << 11) lambda val: struct.pack('!B', val) ), 'correlation_id': ( 5, 0x0400, # (1 << 10) lambda val: ''.join(( struct.pack('!B', len(val)), val, )) ), 'reply_to': ( 6, 0x0200, # (1 << 9) lambda val: ''.join(( struct.pack('!B', len(val)), val, )) ), 'expiration': ( 7, 0x0100, # (1 << 8) lambda val: ''.join(( struct.pack('!B', len(val)), val, )) ), 'message_id': ( 8, 0x0080, # (1 << 7) lambda val: ''.join(( struct.pack('!B', len(val)), val, )) ), 'timestamp': ( 9, 0x0040, # (1 << 6) lambda val: struct.pack('!Q', val) ), 'type': ( 10, 0x0020, # (1 << 5) lambda val: ''.join(( struct.pack('!B', len(val)), val, )) ), 'user_id': ( 11, 0x0010, # (1 << 4) lambda val: ''.join(( struct.pack('!B', len(val)), val, )) ), 'app_id': ( 12, 0x0008, # (1 << 3) lambda val: ''.join(( struct.pack('!B', len(val)), val, )) ), 'cluster_id': ( 13, 0x0004, # (1 << 2) lambda val: ''.join(( struct.pack('!B', len(val)), val, )) ), }
[docs]def encode_basic_properties(body_size, props): pieces = ['']*14 flags = 0 enc = ENCODE_PROPS_BASIC for key in BASIC_PROPS_SET & set(props.iterkeys()): i, f, fun = enc[key] flags |= f pieces[i] = fun(props[key]) return (0x02, ''.join(( struct.pack('!HHQH', CLASS_BASIC, 0, body_size, flags), ''.join(pieces), )) )
[docs]def syncmethod(*responses): """Decorator for assigning appropriate responses for an AMQP method.""" def decorate(method): # This horrible piece of gubbins is so that the arg spec shows correctly # in the auto-generated documentation # # Lifted from http://emptysquare.net/blog/copying-a-python-functions-signature/ # argspec = inspect.getargspec(method) argspec[0].pop(0) formatted_args = inspect.formatargspec(*argspec) formatted_args = formatted_args.lstrip('(').rstrip(')').rstrip(', ') largs = 'self' if not formatted_args.strip() else ('self, ' + formatted_args) fndef = 'lambda %s: self._call_sync(method.__get__(self), responses, %s)' % ( largs, ', '.join(argspec[0]) ) fake_fn = eval(fndef, { 'method': method, 'responses': responses }) return wraps(method)(fake_fn) return decorate
[docs]class FrameWriter(object): """Interface for converting AMQP client method calls to AMQP frames. The underlying transport is not defined here; subclasses can implement this by defining ._send() and ._send_message() to encode the frame and ultimately write it to the wire. """
[docs] def _send(self, frame): raise NotImplementedError( "Subclasses must implement this method to send a method frame." )
[docs] def _send_message(self, frame, headers, payload): raise NotImplementedError( "Subclasses must implement this method to send a method frame " "plus message headers and payload." )
[docs] def _call_sync(self, method, responses, *args, **kwargs): """Hook for making a method call synchronous. Subclasses should re-implement this method to call method in such a way that the client will receive one of the methods in responses as a response. """ return method(*args, **kwargs)
[docs] def connection_start_ok(self, client_properties=None, mechanism='PLAIN', response=None, locale='en_US'): """.. warning:: This is an asynchronous method. This method selects a SASL security mechanism. :param mechanism: A single security mechanisms selected by the client, which must be one of those specified by the server. :param response: A block of opaque data passed to the security mechanism. The contents of this data are defined by the SASL security mechanism. :param locale: A single message locale selected by the client, which must be one of those specified by the server. """ self._send(FrameConnectionStartOk(client_properties, mechanism, response, locale))
[docs] def connection_secure_ok(self, response=None): """.. warning:: This is an asynchronous method. This method attempts to authenticate, passing a block of SASL data for the security mechanism at the server side. :param response: A block of opaque data passed to the security mechanism. The contents of this data are defined by the SASL security mechanism. """ self._send(FrameConnectionSecureOk(response))
[docs] def connection_tune_ok(self, channel_max=0, frame_max=0, heartbeat=0): """.. warning:: This is an asynchronous method. This method sends the client's connection tuning parameters to the server. Certain fields are negotiated, others provide capability information. :param channel_max: The maximum total number of channels that the client will use per connection. :param frame_max: The largest frame size that the client and server will use for the connection. Zero means that the client does not impose any specific limit but may reject very large frames if it cannot allocate resources for them. Note that the frame-max limit applies principally to content frames, where large contents can be broken into frames of arbitrary size. :param heartbeat: The delay, in seconds, of the connection heartbeat that the client wants. Zero means the client does not want a heartbeat. """ self._send(FrameConnectionTuneOk(channel_max, frame_max, heartbeat))
@syncmethod('connection.open-ok')
[docs] def connection_open(self, virtual_host='/'): """This method opens a connection to a virtual host, which is a collection of resources, and acts to separate multiple application domains within a server. The server may apply arbitrary limits per virtual host, such as the number of each type of entity that may be used, per connection and/or in total. :param virtual_host: The name of the virtual host to work with. """ self._send(FrameConnectionOpen(virtual_host, '', 0))
@syncmethod('connection.close-ok')
[docs] def connection_close(self, reply_code=200, reply_text='', class_id=0, method_id=0): """This method indicates that the sender wants to close the connection. This may be due to internal conditions (e.g. a forced shut-down) or due to an error handling a specific method, i.e. an exception. When a close is due to an exception, the sender provides the class and method id of the method which caused the exception. :param class_id: When the close is provoked by a method exception, this is the class of the method. :param method_id: When the close is provoked by a method exception, this is the ID of the method. """ self._send(FrameConnectionClose(reply_code, reply_text, class_id, method_id))
[docs] def connection_close_ok(self, ): """.. warning:: This is an asynchronous method. This method confirms a Connection.Close method and tells the recipient that it is safe to release resources for the connection and close the socket. """ self._send(FrameConnectionCloseOk())
@syncmethod('channel.open-ok')
[docs] def channel_open(self, ): """This method opens a channel to the server. """ self._send(FrameChannelOpen(''))
@syncmethod('channel.flow-ok')
[docs] def channel_flow(self, active=None): """This method asks the peer to pause or restart the flow of content data sent by a consumer. This is a simple flow-control mechanism that a peer can use to avoid overflowing its queues or otherwise finding itself receiving more messages than it can process. Note that this method is not intended for window control. It does not affect contents returned by Basic.Get-Ok methods. :param active: If 1, the peer starts sending content frames. If 0, the peer stops sending content frames. """ self._send(FrameChannelFlow(active))
[docs] def channel_flow_ok(self, active=None): """.. warning:: This is an asynchronous method. Confirms to the peer that a flow command was received and processed. :param active: Confirms the setting of the processed flow method: 1 means the peer will start sending or continue to send content frames; 0 means it will not. """ self._send(FrameChannelFlowOk(active))
@syncmethod('channel.close-ok')
[docs] def channel_close(self, reply_code=200, reply_text='', class_id=0, method_id=0): """This method indicates that the sender wants to close the channel. This may be due to internal conditions (e.g. a forced shut-down) or due to an error handling a specific method, i.e. an exception. When a close is due to an exception, the sender provides the class and method id of the method which caused the exception. :param class_id: When the close is provoked by a method exception, this is the class of the method. :param method_id: When the close is provoked by a method exception, this is the ID of the method. """ self._send(FrameChannelClose(reply_code, reply_text, class_id, method_id))
[docs] def channel_close_ok(self, ): """.. warning:: This is an asynchronous method. This method confirms a Channel.Close method and tells the recipient that it is safe to release resources for the channel. """ self._send(FrameChannelCloseOk())
def access_request(self, realm='/data', exclusive=False, passive=True, active=True, write=True, read=True): self._send(FrameAccessRequest(realm, exclusive, passive, active, write, read)) def access_request_ok(self, ): self._send(FrameAccessRequestOk(0)) @syncmethod('exchange.declare-ok')
[docs] def exchange_declare(self, exchange=None, type='direct', passive=False, durable=False, auto_delete=False, internal=False, arguments={}): """This method creates an exchange if it does not already exist, and if the exchange exists, verifies that it is of the correct and expected class. :param type: Each exchange belongs to one of a set of exchange types implemented by the server. The exchange types define the functionality of the exchange - i.e. how messages are routed through it. It is not valid or meaningful to attempt to change the type of an existing exchange. :param passive: If set, the server will reply with Declare-Ok if the exchange already exists with the same name, and raise an error if not. The client can use this to check whether an exchange exists without modifying the server state. When set, all other method fields except name and no-wait are ignored. A declare with both passive and no-wait has no effect. Arguments are compared for semantic equivalence. :param durable: If set when creating a new exchange, the exchange will be marked as durable. Durable exchanges remain active when a server restarts. Non-durable exchanges (transient exchanges) are purged if/when a server restarts. :param auto_delete: If set, the exchange is deleted when all queues have finished using it. :param internal: If set, the exchange may not be used directly by publishers, but only when bound to other exchanges. Internal exchanges are used to construct wiring that is not visible to applications. :param arguments: A set of arguments for the declaration. The syntax and semantics of these arguments depends on the server implementation. """ self._send(FrameExchangeDeclare(0, exchange, type, passive, durable, auto_delete, internal, 0, arguments))
@syncmethod('exchange.delete-ok')
[docs] def exchange_delete(self, exchange=None, if_unused=False): """This method deletes an exchange. When an exchange is deleted all queue bindings on the exchange are cancelled. :param if_unused: If set, the server will only delete the exchange if it has no queue bindings. If the exchange has queue bindings the server does not delete it but raises a channel exception instead. """ self._send(FrameExchangeDelete(0, exchange, if_unused, 0))
@syncmethod('exchange.bind-ok')
[docs] def exchange_bind(self, destination=None, source=None, routing_key='', arguments={}): """This method binds an exchange to an exchange. :param destination: Specifies the name of the destination exchange to bind. :param source: Specifies the name of the source exchange to bind. :param routing_key: Specifies the routing key for the binding. The routing key is used for routing messages depending on the exchange configuration. Not all exchanges use a routing key - refer to the specific exchange documentation. :param arguments: A set of arguments for the binding. The syntax and semantics of these arguments depends on the exchange class. """ self._send(FrameExchangeBind(0, destination, source, routing_key, 0, arguments))
@syncmethod('exchange.unbind-ok')
[docs] def exchange_unbind(self, destination=None, source=None, routing_key='', arguments={}): """This method unbinds an exchange from an exchange. :param destination: Specifies the name of the destination exchange to unbind. :param source: Specifies the name of the source exchange to unbind. :param routing_key: Specifies the routing key of the binding to unbind. :param arguments: Specifies the arguments of the binding to unbind. """ self._send(FrameExchangeUnbind(0, destination, source, routing_key, 0, arguments))
@syncmethod('queue.declare-ok')
[docs] def queue_declare(self, queue='', passive=False, durable=False, exclusive=False, auto_delete=False, arguments={}): """This method creates or checks a queue. When creating a new queue the client can specify various properties that control the durability of the queue and its contents, and the level of sharing for the queue. :param passive: If set, the server will reply with Declare-Ok if the queue already exists with the same name, and raise an error if not. The client can use this to check whether a queue exists without modifying the server state. When set, all other method fields except name and no-wait are ignored. A declare with both passive and no-wait has no effect. Arguments are compared for semantic equivalence. :param durable: If set when creating a new queue, the queue will be marked as durable. Durable queues remain active when a server restarts. Non-durable queues (transient queues) are purged if/when a server restarts. Note that durable queues do not necessarily hold persistent messages, although it does not make sense to send persistent messages to a transient queue. :param exclusive: Exclusive queues may only be accessed by the current connection, and are deleted when that connection closes. Passive declaration of an exclusive queue by other connections are not allowed. :param auto_delete: If set, the queue is deleted when all consumers have finished using it. The last consumer can be cancelled either explicitly or because its channel is closed. If there was no consumer ever on the queue, it won't be deleted. Applications can explicitly delete auto-delete queues using the Delete method as normal. :param arguments: A set of arguments for the declaration. The syntax and semantics of these arguments depends on the server implementation. """ self._send(FrameQueueDeclare(0, queue, passive, durable, exclusive, auto_delete, 0, arguments))
@syncmethod('queue.bind-ok')
[docs] def queue_bind(self, queue='', exchange=None, routing_key='', arguments={}): """This method binds a queue to an exchange. Until a queue is bound it will not receive any messages. In a classic messaging model, store-and-forward queues are bound to a direct exchange and subscription queues are bound to a topic exchange. :param queue: Specifies the name of the queue to bind. :param routing_key: Specifies the routing key for the binding. The routing key is used for routing messages depending on the exchange configuration. Not all exchanges use a routing key - refer to the specific exchange documentation. If the queue name is empty, the server uses the last queue declared on the channel. If the routing key is also empty, the server uses this queue name for the routing key as well. If the queue name is provided but the routing key is empty, the server does the binding with that empty routing key. The meaning of empty routing keys depends on the exchange implementation. :param arguments: A set of arguments for the binding. The syntax and semantics of these arguments depends on the exchange class. """ self._send(FrameQueueBind(0, queue, exchange, routing_key, 0, arguments))
@syncmethod('queue.purge-ok')
[docs] def queue_purge(self, queue=''): """This method removes all messages from a queue which are not awaiting acknowledgment. :param queue: Specifies the name of the queue to purge. """ self._send(FrameQueuePurge(0, queue, 0))
@syncmethod('queue.delete-ok')
[docs] def queue_delete(self, queue='', if_unused=False, if_empty=False): """This method deletes a queue. When a queue is deleted any pending messages are sent to a dead-letter queue if this is defined in the server configuration, and all consumers on the queue are cancelled. :param queue: Specifies the name of the queue to delete. :param if_unused: If set, the server will only delete the queue if it has no consumers. If the queue has consumers the server does does not delete it but raises a channel exception instead. :param if_empty: If set, the server will only delete the queue if it has no messages. """ self._send(FrameQueueDelete(0, queue, if_unused, if_empty, 0))
@syncmethod('queue.unbind-ok')
[docs] def queue_unbind(self, queue='', exchange=None, routing_key='', arguments={}): """This method unbinds a queue from an exchange. :param queue: Specifies the name of the queue to unbind. :param exchange: The name of the exchange to unbind from. :param routing_key: Specifies the routing key of the binding to unbind. :param arguments: Specifies the arguments of the binding to unbind. """ self._send(FrameQueueUnbind(0, queue, exchange, routing_key, arguments))
@syncmethod('basic.qos-ok')
[docs] def basic_qos(self, prefetch_size=0, prefetch_count=0, global_=False): """This method requests a specific quality of service. The QoS can be specified for the current channel or for all channels on the connection. The particular properties and semantics of a qos method always depend on the content class semantics. Though the qos method could in principle apply to both peers, it is currently meaningful only for the server. :param prefetch_size: The client can request that messages be sent in advance so that when the client finishes processing a message, the following message is already held locally, rather than needing to be sent down the channel. Prefetching gives a performance improvement. This field specifies the prefetch window size in octets. The server will send a message in advance if it is equal to or smaller in size than the available prefetch size (and also falls into other prefetch limits). May be set to zero, meaning "no specific limit", although other prefetch limits may still apply. The prefetch-size is ignored if the no-ack option is set. :param prefetch_count: Specifies a prefetch window in terms of whole messages. This field may be used in combination with the prefetch-size field; a message will only be sent in advance if both prefetch windows (and those at the channel and connection level) allow it. The prefetch-count is ignored if the no-ack option is set. :param global_: By default the QoS settings apply to the current channel only. If this field is set, they are applied to the entire connection. """ self._send(FrameBasicQos(prefetch_size, prefetch_count, global_))
@syncmethod('basic.consume-ok')
[docs] def basic_consume(self, queue='', consumer_tag='', no_local=False, no_ack=False, exclusive=False, arguments={}): """This method asks the server to start a "consumer", which is a transient request for messages from a specific queue. Consumers last as long as the channel they were declared on, or until the client cancels them. :param queue: Specifies the name of the queue to consume from. :param consumer_tag: Specifies the identifier for the consumer. The consumer tag is local to a channel, so two clients can use the same consumer tags. If this field is empty the server will generate a unique tag. :param exclusive: Request exclusive consumer access, meaning only this consumer can access the queue. :param arguments: A set of arguments for the consume. The syntax and semantics of these arguments depends on the server implementation. """ self._send(FrameBasicConsume(0, queue, consumer_tag, no_local, no_ack, exclusive, 0, arguments))
@syncmethod('basic.cancel-ok')
[docs] def basic_cancel(self, consumer_tag=None): """This method cancels a consumer. This does not affect already delivered messages, but it does mean the server will not send any more messages for that consumer. The client may receive an arbitrary number of messages in between sending the cancel method and receiving the cancel-ok reply. It may also be sent from the server to the client in the event of the consumer being unexpectedly cancelled (i.e. cancelled for any reason other than the server receiving the corresponding basic.cancel from the client). This allows clients to be notified of the loss of consumers due to events such as queue deletion. Note that as it is not a MUST for clients to accept this method from the client, it is advisable for the broker to be able to identify those clients that are capable of accepting the method, through some means of capability negotiation. """ self._send(FrameBasicCancel(consumer_tag, 0))
[docs] def basic_publish(self, exchange='', routing_key='', mandatory=False, immediate=False, headers={}, body=''): """.. warning:: This is an asynchronous method. This method publishes a message to a specific exchange. The message will be routed to queues as defined by the exchange configuration and distributed to any active consumers when the transaction, if any, is committed. :param exchange: Specifies the name of the exchange to publish to. The exchange name can be empty, meaning the default exchange. If the exchange name is specified, and that exchange does not exist, the server will raise a channel exception. :param routing_key: Specifies the routing key for the message. The routing key is used for routing messages depending on the exchange configuration. :param mandatory: This flag tells the server how to react if the message cannot be routed to a queue. If this flag is set, the server will return an unroutable message with a Return method. If this flag is zero, the server silently drops the message. :param immediate: This flag tells the server how to react if the message cannot be routed to a queue consumer immediately. If this flag is set, the server will return an undeliverable message with a Return method. If this flag is zero, the server will queue the message, but with no guarantee that it will ever be consumed. """ self._send_message(FrameBasicPublish(0, exchange, routing_key, mandatory, immediate), headers, body)
@syncmethod('basic.get-ok', 'basic.get-empty')
[docs] def basic_get(self, queue='', no_ack=False): """This method provides a direct access to the messages in a queue using a synchronous dialogue that is designed for specific types of application where synchronous functionality is more important than performance. :param queue: Specifies the name of the queue to get a message from. """ self._send(FrameBasicGet(0, queue, no_ack))
[docs] def basic_ack(self, delivery_tag=0, multiple=False): """.. warning:: This is an asynchronous method. When sent by the client, this method acknowledges one or more messages delivered via the Deliver or Get-Ok methods. When sent by server, this method acknowledges one or more messages published with the Publish method on a channel in confirm mode. The acknowledgement can be for a single message or a set of messages up to and including a specific message. :param multiple: If set to 1, the delivery tag is treated as "up to and including", so that multiple messages can be acknowledged with a single method. If set to zero, the delivery tag refers to a single message. If the multiple field is 1, and the delivery tag is zero, this indicates acknowledgement of all outstanding messages. """ self._send(FrameBasicAck(delivery_tag, multiple))
[docs] def basic_reject(self, delivery_tag=0, requeue=True): """.. warning:: This is an asynchronous method. This method allows a client to reject a message. It can be used to interrupt and cancel large incoming messages, or return untreatable messages to their original queue. :param requeue: If requeue is true, the server will attempt to requeue the message. If requeue is false or the requeue attempt fails the messages are discarded or dead-lettered. """ self._send(FrameBasicReject(delivery_tag, requeue))
[docs] def basic_recover_async(self, requeue=False): """.. warning:: This is an asynchronous method. This method asks the server to redeliver all unacknowledged messages on a specified channel. Zero or more messages may be redelivered. This method is deprecated in favour of the synchronous Recover/Recover-Ok. :param requeue: If this field is zero, the message will be redelivered to the original recipient. If this bit is 1, the server will attempt to requeue the message, potentially then delivering it to an alternative subscriber. """ self._send(FrameBasicRecoverAsync(requeue))
@syncmethod('basic.recover-ok')
[docs] def basic_recover(self, requeue=False): """This method asks the server to redeliver all unacknowledged messages on a specified channel. Zero or more messages may be redelivered. This method replaces the asynchronous Recover. :param requeue: If this field is zero, the message will be redelivered to the original recipient. If this bit is 1, the server will attempt to requeue the message, potentially then delivering it to an alternative subscriber. """ self._send(FrameBasicRecover(requeue))
[docs] def basic_nack(self, delivery_tag=0, multiple=False, requeue=True): """.. warning:: This is an asynchronous method. This method allows a client to reject one or more incoming messages. It can be used to interrupt and cancel large incoming messages, or return untreatable messages to their original queue. This method is also used by the server to inform publishers on channels in confirm mode of unhandled messages. If a publisher receives this method, it probably needs to republish the offending messages. :param multiple: If set to 1, the delivery tag is treated as "up to and including", so that multiple messages can be rejected with a single method. If set to zero, the delivery tag refers to a single message. If the multiple field is 1, and the delivery tag is zero, this indicates rejection of all outstanding messages. :param requeue: If requeue is true, the server will attempt to requeue the message. If requeue is false or the requeue attempt fails the messages are discarded or dead-lettered. Clients receiving the Nack methods should ignore this flag. """ self._send(FrameBasicNack(delivery_tag, multiple, requeue))
@syncmethod('tx.select-ok')
[docs] def tx_select(self, ): """This method sets the channel to use standard transactions. The client must use this method at least once on a channel before using the Commit or Rollback methods. """ self._send(FrameTxSelect())
@syncmethod('tx.commit-ok')
[docs] def tx_commit(self, ): """This method commits all message publications and acknowledgments performed in the current transaction. A new transaction starts immediately after a commit. """ self._send(FrameTxCommit())
@syncmethod('tx.rollback-ok')
[docs] def tx_rollback(self, ): """This method abandons all message publications and acknowledgments performed in the current transaction. A new transaction starts immediately after a rollback. Note that unacked messages will not be automatically redelivered by rollback; if that is required an explicit recover call should be issued. """ self._send(FrameTxRollback())
@syncmethod('confirm.select-ok')
[docs] def confirm_select(self, ): """This method sets the channel to use publisher acknowledgements. The client can only use this method on a non-transactional channel. :param nowait: If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception. """ self._send(FrameConfirmSelect(0))