Request Reply MQ messages

The system is based on mdp (http://rfc.zeromq.org/spec:7)

Frame format

  • null frame

  • protocol (MDPC01 or MDPW01) = what protocol to use (Client or Worker)

  • service = the service where we want the request to (rinor, dbmanager, host.pluginname, host.manager)
    • for services that can only run once (rest, dbmanager, scenario manager) its just there name
    • for services that can run multiple times (on multiple hosts) its in the format <host>.<service>
  • action = the action to perform (config.get, config.set, plugin.start, plugin.stop, plugin.install, ...)

  • data = the data (json format) for the action

Clients

There are 2 clients provided, an async and a sync client

Async

The async object is supposed to be used as an extension of another class, the on_mdp_message methode can then be overloaded.

Sync

import zmq
from zmq.eventloop.ioloop import IOLoop
from domogik.mq.reqrep.client import MQSyncReq
from domogik.mq.message import MQMessage

cli = MQSyncReq(zmq.Context())
msg = MQMessage()
msg.set_action('plugin.start.do')
msg.add_data('id', 'diskfree')
print cli.request('manager', msg.get(), timeout=10).get()

Workers

  • all plugins => for helper commands
  • dbmanager
  • packagemanager
  • manager => start/stop a plugin for example

anything that needs to act as req/rep worker can extend the MQRep class

the mdpworker needs to be initialized: MQRep.init(self, zmq.Context(), ‘dbmgr’) the first argument is the zmq context the second argument is the workers name

then you can override the on_mdp_request methode def on_mdp_request(self, msg) the msg argument is an object of type MQMessage

the MQMessage has 2 main items: - the action field - the data file (python dict)

How to Use

from domogik.mq.reqrep.worker import MQRep
from domogik.mq.message import MQMessage
from zmq.eventloop.ioloop import IOLoop
import zmq


class Rep(MQRep):

    def __init__(self):
        MQRep.__init__(self, zmq.Context(), 'test_rep')
        IOLoop.instance().start()

    def on_mdp_request(self, msg):
        # display the req message
        print(msg)
        # call a function to reply to the message depending on the header
        if msg.get_action() == "foo.this.do":
            self._mdp_reply_foo_this(msg)

    def _mdp_reply_foo_this(self, msg_received):
        # get the value for a key of the received message
        the_value = data.get_data('a_key')
        # send the reply
        msg = MQMessage()
        msg.set_action('foo.this.result')
        msg.add_data('another_key', 'another_value')
        print msg.get()
        self.reply(msg.get())

Examples

Example plugin requests config on dbmanager

=> only the bold part is seen by the domogik components

client sends the following message:

empty
MDPC01
dbmanager
**config.get**
**{ host = igor, plugin = velbus, key = protocol}**

the dbmanager will then receive the following packet:

Null
MDPW01
0x02
envelopp (used to generate the reply
empty
**config.get**
**{ host=igor, plugin = velbus, key = protocol}**

the dbmanager then does its action and will sent the following packet:

Null
MDPW01
0x03
envelop (copy from the receiving packet)
empty
**config.result**
**{ data }**

the plugin will on its turn receive the following reply:

empty
MDPC01
dbmanager
**config.result**
**{ data }**