The purpose of this section is to gather code showing PyMQI in action or code that’s related to common WebSphere MQ-related tasks in general. Some of the examples are Python ports of IBM’s examples that WebSphere MQ ships with.
The samples are self-contained and ready to use in your own PyMQI applications. All contributions are very much welcome, see here for more informations. Don’t hesitate to send a question if you’d like to see any specific example be added. Thanks!
Code:
import pymqi
queue_manager = "QM01"
channel = "SVRCONN.1"
host = "192.168.1.135"
port = "1434"
conn_info = "%s(%s)" % (host, port)
qmgr = pymqi.connect(queue_manager, channel, conn_info)
qmgr.disconnect()
Notes:
Code:
import pymqi
queue_manager = "QM01"
qmgr = pymqi.connect(queue_manager)
qmgr.disconnect()
Notes:
Code:
import pymqi
queue_manager = "QM01"
channel = "SVRCONN.1"
host = "192.168.1.135"
port = "1434"
queue_name = "TEST.1"
message = "Hello from Python!"
conn_info = "%s(%s)" % (host, port)
qmgr = pymqi.connect(queue_manager, channel, conn_info)
queue = pymqi.Queue(qmgr, queue_name)
queue.put(message)
queue.close()
qmgr.disconnect()
Code:
import pymqi
queue_manager = "QM01"
channel = "SVRCONN.1"
host = "192.168.1.135"
port = "1434"
queue_name = "TEST.1"
conn_info = "%s(%s)" % (host, port)
qmgr = pymqi.connect(queue_manager, channel, conn_info)
queue = pymqi.Queue(qmgr, queue_name)
message = queue.get()
queue.close()
qmgr.disconnect()
Notes:
Code:
import CMQC
import pymqi
queue_manager = "QM01"
channel = "SVRCONN.1"
host = "192.168.1.135"
port = "1434"
queue_name = "TEST.1"
conn_info = "%s(%s)" % (host, port)
# Message Descriptor
md = pymqi.MD()
# Get Message Options
gmo = pymqi.GMO()
gmo.Options = CMQC.MQGMO_WAIT | CMQC.MQGMO_FAIL_IF_QUIESCING
gmo.WaitInterval = 5000 # 5 seconds
qmgr = pymqi.connect(queue_manager, channel, conn_info)
queue = pymqi.Queue(qmgr, queue_name)
message = queue.get(None, md, gmo)
queue.close()
qmgr.disconnect()
Notes:
Code:
import CMQC
import pymqi
queue_manager = "QM01"
channel = "SVRCONN.1"
host = "192.168.1.135"
port = "1434"
queue_name = "TEST.1"
conn_info = "%s(%s)" % (host, port)
# Message Descriptor
md = pymqi.MD()
# Get Message Options
gmo = pymqi.GMO()
gmo.Options = CMQC.MQGMO_WAIT | CMQC.MQGMO_FAIL_IF_QUIESCING
gmo.WaitInterval = 5000 # 5 seconds
qmgr = pymqi.connect(queue_manager, channel, conn_info)
queue = pymqi.Queue(qmgr, queue_name)
keep_running = True
while keep_running:
try:
# Wait up to to gmo.WaitInterval for a new message.
message = queue.get(None, md, gmo)
# Process the message here..
# Reset the MsgId, CorrelId & GroupId so that we can reuse
# the same 'md' object again.
md.MsgId = CMQC.MQMI_NONE
md.CorrelId = CMQC.MQCI_NONE
md.GroupId = CMQC.MQGI_NONE
except pymqi.MQMIError, e:
if e.comp == CMQC.MQCC_FAILED and e.reason == CMQC.MQRC_NO_MSG_AVAILABLE:
# No messages, that's OK, we can ignore it.
pass
else:
# Some other error condition.
raise
queue.close()
qmgr.disconnect()
Notes:
Code:
import CMQC
import pymqi
queue_manager = "QM01"
channel = "SVRCONN.1"
host = "192.168.1.135"
port = "1434"
conn_info = "%s(%s)" % (host, port)
message = "Please reply to a dynamic queue, thanks."
dynamic_queue_prefix = "MY.REPLIES.*"
request_queue = "TEST.1"
qmgr = pymqi.connect(queue_manager, channel, conn_info)
# Dynamic queue's object descriptor.
dyn_od = pymqi.OD()
dyn_od.ObjectName = "SYSTEM.DEFAULT.MODEL.QUEUE"
dyn_od.DynamicQName = dynamic_queue_prefix
# Open the dynamic queue.
dyn_input_open_options = CMQC.MQOO_INPUT_EXCLUSIVE
dyn_queue = pymqi.Queue(qmgr, dyn_od, dyn_input_open_options)
dyn_queue_name = dyn_od.ObjectName.strip()
# Prepare a Message Descriptor for the request message.
md = pymqi.MD()
md.ReplyToQ = dyn_queue_name
# Send the message.
queue = pymqi.Queue(qmgr, request_queue)
queue.put(message, md)
# Get and process the response here..
dyn_queue.close()
queue.close()
qmgr.disconnect()
Notes:
Code:
import pymqi
queue_manager = "QM01"
channel = "SVRCONN.1"
host = "192.168.1.135"
port = "1434"
queue_name = "TEST.1"
message = "Here's a reply"
conn_info = "%s(%s)" % (host, port)
qmgr = pymqi.connect(queue_manager, channel, conn_info)
md = pymqi.MD()
queue = pymqi.Queue(qmgr, queue_name)
message = queue.get(None, md)
reply_to_queue_name = md.ReplyToQ.strip()
reply_to_queue = pymqi.Queue(qmgr, reply_to_queue_name)
reply_to_queue.put(message)
queue.close()
qmgr.disconnect()
Notes:
Code:
import pymqi, CMQC
queue_manager = "QM01"
channel = "SVRCONN.1"
host = "192.168.1.135"
port = "1434"
topic_string = "/currency/rate/EUR/USD"
msg = "1.3961"
conn_info = "%s(%s)" % (host, port)
qmgr = pymqi.QueueManager(None)
qmgr.connect_tcp_client(queue_manager, pymqi.CD(), channel, conn_info)
topic = pymqi.Topic(qmgr, topic_string=topic_string)
topic.open(open_opts=CMQC.MQOO_OUTPUT)
topic.pub(msg)
topic.close()
qmgr.disconnect()
Notes:
Code:
import logging
import pymqi, CMQC
logging.basicConfig(level=logging.INFO)
queue_manager = "QM01"
channel = "SVRCONN.1"
host = "192.168.1.135"
port = "1434"
topic_string = "/currency/rate/EUR/USD"
msg = "1.3961"
conn_info = "%s(%s)" % (host, port)
qmgr = pymqi.QueueManager(None)
qmgr.connect_tcp_client(queue_manager, pymqi.CD(), channel, conn_info)
sub_desc = pymqi.SD()
sub_desc["Options"] = CMQC.MQSO_CREATE + CMQC.MQSO_RESUME + CMQC.MQSO_DURABLE + CMQC.MQSO_MANAGED
sub_desc.set_vs("SubName", "MySub")
sub_desc.set_vs("ObjectString", topic_string)
sub = pymqi.Subscription(qmgr)
sub.sub(sub_desc=sub_desc)
get_opts = pymqi.GMO(Options=CMQC.MQGMO_NO_SYNCPOINT + CMQC.MQGMO_FAIL_IF_QUIESCING + CMQC.MQGMO_WAIT)
get_opts["WaitInterval"] = 15000
data = sub.get(None, pymqi.md(), get_opts)
logging.info("Here's the received data: [%s]" % data)
sub.close(sub_close_options=CMQC.MQCO_KEEP_SUB, close_sub_queue=True)
qmgr.disconnect()
Notes:
Code:
import logging
import pymqi
import CMQC
logging.basicConfig(level=logging.INFO)
queue_manager = "QM01"
channel = "SSL.SVRCONN.1"
host = "192.168.1.135"
port = "1434"
queue_name = "TEST.1"
conn_info = "%s(%s)" % (host, port)
ssl_cipher_spec = "TLS_RSA_WITH_AES_256_CBC_SHA"
key_repo_location = "/var/mqm/ssl-db/client/KeyringClient"
message = "Hello from Python!"
cd = pymqi.CD()
cd.ChannelName = channel
cd.ConnectionName = conn_info
cd.ChannelType = CMQC.MQCHT_CLNTCONN
cd.TransportType = CMQC.MQXPT_TCP
cd.SSLCipherSpec = ssl_cipher_spec
sco = pymqi.SCO()
sco.KeyRepository = key_repo_location
qmgr = pymqi.QueueManager(None)
qmgr.connect_with_options(queue_manager, cd, sco)
put_queue = pymqi.Queue(qmgr, queue_name)
put_queue.put(message)
get_queue = pymqi.Queue(qmgr, queue_name)
logging.info("Here's the message again: [%s]" % get_queue.get())
put_queue.close()
get_queue.close()
qmgr.disconnect()
Notes:
Queue manager has been assigned a key repository (SSLKEYR attribute) and the repository contains the client’s certificate,
There is an SVRCONN channel with the following properties set:
DIS CHANNEL(SSL.SVRCONN.1) SSLCAUTH SSLCIPH 1 : DIS CHANNEL(SSL.SVRCONN.1) SSLCAUTH SSLCIPH AMQ8414: Display Channel details. CHANNEL(SSL.SVRCONN.1) CHLTYPE(SVRCONN) SSLCAUTH(REQUIRED) SSLCIPH(TLS_RSA_WITH_AES_256_CBC_SHA)You can access a client key database of type CMS - one, which can be created with gsk6cmd/gsk7cmd tools - and there are following files in the /var/mqm/ssl-db/client/ directory (the directory name may be arbitrary, /var/mqm/ssl-db/client/ is only an example):
$ ls -a /var/mqm/ssl-db/client/ . .. KeyringClient.crl KeyringClient.kdb KeyringClient.rdb KeyringClient.sth $The client key database contains a certificate labeled ibmwebspheremqmy_user and you are running the code as an operating system’s account my_user,
The client key database contains the queue manager’s certificate.
- The queue manager certificate’s label is prefixed with ibmwebspheremq and ends with the name of the queue manager, lowercased. If the name of a queue manager is QM01 then the label will be ibmwebspheremqqm01,
- The client certificate’s label is prefixed with ibmwebspheremq and ends with the name of the operating system’s account under which the code will be executed; so if the account name is user01 then the label will be ibmwebspheremquser01,
- The value of a cd.SSLCipherSpec parameter matches the value of a channel’s SSLCIPH attribute.
Code:
import logging
import pymqi
logging.basicConfig(level=logging.INFO)
queue_manager = "QM01"
channel = "SVRCONN.1"
host = "192.168.1.135"
port = "1434"
queue_name = "TEST.1"
message = "Hello from Python!"
conn_info = "%s(%s)" % (host, port)
priority = 2
put_md = pymqi.MD()
put_md.Priority = priority
qmgr = pymqi.connect(queue_manager, channel, conn_info)
put_queue = pymqi.Queue(qmgr, queue_name)
put_queue.put(message, put_md)
get_md = pymqi.MD()
get_queue = pymqi.Queue(qmgr, queue_name)
message_body = get_queue.get(None, get_md)
logging.info("Received a message, priority [%s]." % get_md.Priority)
put_queue.close()
get_queue.close()
qmgr.disconnect()
Notes:
Code:
import pymqi
import CMQXC
queue_manager = "QM01"
channel = "SVRCONN.1"
host = "192.168.1.135"
port = "1434"
queue_name = "TEST.1"
message = "Hello from Python!" * 10000
conn_info = "%s(%s)" % (host, port)
cd = pymqi.CD()
cd.MsgCompList[1] = CMQXC.MQCOMPRESS_ZLIBHIGH
qmgr = pymqi.connect(queue_manager, channel, conn_info)
queue = pymqi.Queue(qmgr, queue_name)
queue.put(message)
queue.close()
qmgr.disconnect()
Notes:
- Note that the compression level to use is the second element of the cd.MsgCompList list, not the first one,
- The above assumes the channel’s been configured using the following MQSC command: ALTER CHANNEL(SVRCONN.1) CHLTYPE(SVRCONN) COMPMSG(ZLIBHIGH)
Code:
import logging
import CMQC
import pymqi
queue_manager = "QM01"
channel = "SVRCONN.1"
host = "foo.bar" # Note the made up host name
port = "1434"
conn_info = "%s(%s)" % (host, port)
try:
qmgr = pymqi.connect(queue_manager, channel, conn_info)
except pymqi.MQMIError, e:
if e.comp == CMQC.MQCC_FAILED and e.reason == CMQC.MQRC_HOST_NOT_AVAILABLE:
logging.error("Such a host [%s] does not exist." % host)
Notes:
Code:
import logging
import rpm
logging.basicConfig(level=logging.INFO)
package_name = "MQSeriesClient"
ts = rpm.TransactionSet()
mi = ts.dbMatch("name", package_name)
if not mi.count():
logging.info("Did not find package [%s] in RPM database." % package_name)
else:
for header in mi:
version = header["version"]
msg = "Found package [%s], version [%s]." % (package_name, version)
logging.info(msg)
Notes:
Code:
import logging
import _winreg
logging.basicConfig(level=logging.INFO)
key_name = "Software\\IBM\\MQSeries\\CurrentVersion"
try:
key = _winreg.OpenKey(_winreg.HKEY_LOCAL_MACHINE, key_name)
except WindowsError:
logging.info("Could not find WebSphere MQ-related information in Windows registry.")
else:
version = _winreg.QueryValueEx(key, "VRMF")[0]
logging.info("WebSphere MQ version is [%s]." % version)
Code:
import pymqi
import CMQC
queue_manager = "QM01"
channel = "SVRCONN.1"
host = "192.168.1.135"
port = "1434"
queue_name = "TEST.1"
message = "Hello from Python!"
alternate_user_id = "myuser"
conn_info = "%s(%s)" % (host, port)
qmgr = pymqi.connect(queue_manager, channel, conn_info)
od = pymqi.OD()
od.ObjectName = queue_name
od.AlternateUserId = alternate_user_id
queue = pymqi.Queue(qmgr)
queue.open(od, CMQC.MQOO_OUTPUT | CMQC.MQOO_ALTERNATE_USER_AUTHORITY)
queue.put(message)
queue.close()
qmgr.disconnect()
Notes:
(contributed by Hannes Wagener)
Code:
# stdlib
import logging, threading, time, traceback, uuid
# PyMQI
import pymqi
import CMQC
logging.basicConfig(level=logging.INFO)
# Queue manager name
qm_name = "QM01"
# Listener host and port
listener = "192.168.1.135(1434)"
# Channel to transfer data through
channel = "SVRCONN.1"
# Request Queue
request_queue_name = "REQUEST.QUEUE.1"
# ReplyTo Queue
replyto_queue_name = "REPLYTO.QUEUE.1"
message_prefix = "Test Data. "
class Producer(threading.Thread):
""" A base class for any producer used in this example.
"""
def __init__(self):
threading.Thread.__init__(self)
self.daemon = True
cd = pymqi.CD()
cd.ChannelName = channel
cd.ConnectionName = listener
cd.ChannelType = CMQC.MQCHT_CLNTCONN
cd.TransportType = CMQC.MQXPT_TCP
self.qm = pymqi.QueueManager(None)
self.qm.connect_with_options(qm_name, opts=CMQC.MQCNO_HANDLE_SHARE_NO_BLOCK,
cd=cd)
self.req_queue = pymqi.Queue(self.qm, request_queue_name)
self.replyto_queue = pymqi.Queue(self.qm, replyto_queue_name)
class RequestProducer(Producer):
""" Instances of this class produce an infinite stream of request messages
and wait for appropriate responses on reply-to queues.
"""
def run(self):
while True:
# Put the request message.
put_mqmd = pymqi.MD()
# Set the MsgType to request.
put_mqmd["MsgType"] = CMQC.MQMT_REQUEST
# Set up the ReplyTo QUeue/Queue Manager (Queue Manager is automatically
# set by MQ).
put_mqmd["ReplyToQ"] = replyto_queue_name
put_mqmd["ReplyToQMgr"] = qm_name
# Set up the put options - must do with NO_SYNCPOINT so that the request
# message is committed immediately.
put_opts = pymqi.PMO(Options=CMQC.MQPMO_NO_SYNCPOINT + CMQC.MQPMO_FAIL_IF_QUIESCING)
# Create a random message.
message = message_prefix + uuid.uuid4().hex
self.req_queue.put(message, put_mqmd, put_opts)
logging.info("Put request message. Message: [%s]" % message)
# Set up message descriptor for get.
get_mqmd = pymqi.MD()
# Set the get CorrelId to the put MsgId (which was set by MQ on the put1).
get_mqmd["CorrelId"] = put_mqmd["MsgId"]
# Set up the get options.
get_opts = pymqi.GMO(Options=CMQC.MQGMO_NO_SYNCPOINT +
CMQC.MQGMO_FAIL_IF_QUIESCING +
CMQC.MQGMO_WAIT)
# Version must be set to 2 to correlate.
get_opts["Version"] = CMQC.MQGMO_VERSION_2
# Tell MQ that we are matching on CorrelId.
get_opts["MatchOptions"] = CMQC.MQMO_MATCH_CORREL_ID
# Set the wait timeout of half a second.
get_opts["WaitInterval"] = 500
# Open the replyto queue and get response message,
replyto_queue = pymqi.Queue(self.qm, replyto_queue_name, CMQC.MQOO_INPUT_SHARED)
response_message = replyto_queue.get(None, get_mqmd, get_opts)
logging.info("Got response message [%s]" % response_message)
time.sleep(1)
class ResponseProducer(Producer):
""" Instances of this class wait for request messages and produce responses.
"""
def run(self):
# Request message descriptor, will be reset after processing each
# request message.
request_md = pymqi.MD()
# Get Message Options
gmo = pymqi.GMO()
gmo.Options = CMQC.MQGMO_WAIT | CMQC.MQGMO_FAIL_IF_QUIESCING
gmo.WaitInterval = 500 # Half a second
queue = pymqi.Queue(self.qm, request_queue_name)
keep_running = True
while keep_running:
try:
# Wait up to to gmo.WaitInterval for a new message.
request_message = queue.get(None, request_md, gmo)
# Create a response message descriptor with the CorrelId
# set to the value of MsgId of the original request message.
response_md = pymqi.MD()
response_md.CorrelId = request_md.MsgId
response_message = "Response to message %s" % request_message
self.replyto_queue.put(response_message, response_md)
# Reset the MsgId, CorrelId & GroupId so that we can reuse
# the same 'md' object again.
request_md.MsgId = CMQC.MQMI_NONE
request_md.CorrelId = CMQC.MQCI_NONE
request_md.GroupId = CMQC.MQGI_NONE
except pymqi.MQMIError, e:
if e.comp == CMQC.MQCC_FAILED and e.reason == CMQC.MQRC_NO_MSG_AVAILABLE:
# No messages, that's OK, we can ignore it.
pass
else:
# Some other error condition.
raise
req = RequestProducer()
resp = ResponseProducer()
req.start()
resp.start()
try:
while True:
time.sleep(0.1)
except KeyboardInterrupt:
req.qm.disconnect()
Notes:
Code:
import CMQC, pymqi
queue_manager = "QM01"
channel = "SVRCONN.1"
host = "192.168.1.135"
port = "1434"
queue_name = "TEST.1"
message = "Hello from Python!"
conn_info = "%s(%s)" % (host, port)
cd = pymqi.CD()
cd.ChannelName = channel
cd.ConnectionName = conn_info
cd.ChannelType = CMQC.MQCHT_CLNTCONN
cd.TransportType = CMQC.MQXPT_TCP
connect_options = CMQC.MQCNO_HANDLE_SHARE_BLOCK
qmgr = pymqi.QueueManager(None)
for x in range(10):
qmgr.connect_with_options(queue_manager, cd=cd, opts=connect_options)
qmgr.connect_with_options(queue_manager, cd=cd, opts=connect_options)
queue = pymqi.Queue(qmgr, queue_name)
queue.put(message)
queue.close()
qmgr.disconnect()
import CMQC, pymqi
queue_manager = "QM01"
channel = "SVRCONN.1"
host = "192.168.1.135"
port = "1434"
queue_name = "TEST.1"
message = "Hello from Python!"
conn_info = "%s(%s)" % (host, port)
qmgr = pymqi.QueueManager(None)
qmgr.connect_tcp_client(queue_manager, pymqi.CD(), channel, conn_info)
try:
qmgr.connect_tcp_client(queue_manager, pymqi.CD(), channel, conn_info)
except pymqi.MQMIError, e:
if e.comp == CMQC.MQCC_WARNING and e.reason == CMQC.MQRC_ALREADY_CONNECTED:
# Move along, nothing to see here..
pass
queue = pymqi.Queue(qmgr, queue_name)
queue.put(message)
queue.close()
qmgr.disconnect()
Notes:
Code:
import pymqi
import CMQC, CMQXC, CMQCFC
queue_manager = "QM01"
channel = "SVRCONN.1"
host = "192.168.1.135"
port = "1434"
conn_info = "%s(%s)" % (host, port)
channel_name = "MYCHANNEL.1"
channel_type = CMQXC.MQCHT_SVRCONN
args = {CMQCFC.MQCACH_CHANNEL_NAME: channel_name,
CMQCFC.MQIACH_CHANNEL_TYPE: channel_type}
qmgr = pymqi.connect(queue_manager, channel, conn_info)
pcf = pymqi.PCFExecute(qmgr)
pcf.MQCMD_CREATE_CHANNEL(args)
qmgr.disconnect()
Notes:
Code:
import pymqi
import CMQC
queue_manager = "QM01"
channel = "SVRCONN.1"
host = "192.168.1.135"
port = "1434"
conn_info = "%s(%s)" % (host, port)
queue_name = "MYQUEUE.1"
queue_type = CMQC.MQQT_LOCAL
max_depth = 123456
args = {CMQC.MQCA_Q_NAME: queue_name,
CMQC.MQIA_Q_TYPE: queue_type,
CMQC.MQIA_MAX_Q_DEPTH: max_depth}
qmgr = pymqi.connect(queue_manager, channel, conn_info)
pcf = pymqi.PCFExecute(qmgr)
pcf.MQCMD_CREATE_Q(args)
qmgr.disconnect()
Notes:
Code:
import logging
import pymqi
import CMQC, CMQCFC
logging.basicConfig(level=logging.INFO)
queue_manager = "QM01"
channel = "SVRCONN.1"
host = "192.168.1.135"
port = "1434"
conn_info = "%s(%s)" % (host, port)
prefix = "SYSTEM.*"
args = {CMQCFC.MQCACH_CHANNEL_NAME: prefix}
qmgr = pymqi.connect(queue_manager, channel, conn_info)
pcf = pymqi.PCFExecute(qmgr)
try:
response = pcf.MQCMD_INQUIRE_CHANNEL(args)
except pymqi.MQMIError, e:
if e.comp == CMQC.MQCC_FAILED and e.reason == CMQC.MQRC_UNKNOWN_OBJECT_NAME:
logging.info("No channels matched prefix [%s]" % prefix)
else:
raise
else:
for channel_info in response:
channel_name = channel_info[CMQCFC.MQCACH_CHANNEL_NAME]
logging.info("Found channel [%s]" % channel_name)
qmgr.disconnect()
Notes:
Code:
import logging
import pymqi
import CMQC, CMQCFC, CMQXC
logging.basicConfig(level=logging.INFO)
queue_manager = "QM01"
channel = "SVRCONN.1"
host = "192.168.1.135"
port = "1434"
conn_info = "%s(%s)" % (host, port)
prefix = "SYSTEM.*"
queue_type = CMQC.MQQT_MODEL
args = {CMQC.MQCA_Q_NAME: prefix,
CMQC.MQIA_Q_TYPE: queue_type}
qmgr = pymqi.connect(queue_manager, channel, conn_info)
pcf = pymqi.PCFExecute(qmgr)
try:
response = pcf.MQCMD_INQUIRE_Q(args)
except pymqi.MQMIError, e:
if e.comp == CMQC.MQCC_FAILED and e.reason == CMQC.MQRC_UNKNOWN_OBJECT_NAME:
logging.info("No queues matched given arguments.")
else:
raise
else:
for queue_info in response:
queue_name = queue_info[CMQC.MQCA_Q_NAME]
logging.info("Found queue [%s]" % queue_name)
qmgr.disconnect()
Notes:
Code:
import pymqi
import CMQC, CMQCFC, CMQXC
queue_manager = "QM01"
channel = "SVRCONN.1"
host = "192.168.1.135"
port = "1434"
conn_info = "%s(%s)" % (host, port)
qmgr = pymqi.connect(queue_manager, channel, conn_info)
pcf = pymqi.PCFExecute(qmgr)
pcf.MQCMD_PING_Q_MGR()
qmgr.disconnect()
Notes: