Tutorial¶
Installing the Python Katcp Library¶
You can install the latest version of the KATCP library by running pip install katcp if pip is installed. Alternatively, easy_install katcp; requires the setuptools Python package to be installed.
Using the Blocking Client¶
The blocking client is the most straight-forward way of querying a KATCP device. It is used as follows:
from katcp import BlockingClient, Message
device_host = "www.example.com"
device_port = 5000
client = BlockingClient(device_host, device_port)
client.start()
client.wait_protocol() # Optional
reply, informs = client.blocking_request(
Message.request("help"))
print reply
for msg in informs:
print msg
client.stop()
client.join()
After creating the BlockingClient
instance, the
start()
method is called to launch the client thread. The
wait_protocol()
method waits until katcp version information has been
received from the server, allowing the KATCP version spoken by the server to be
known; server protocol iformation is stores in client.protocol_flags. Once you
have finished with the client, stop()
can be called to request that the
thread shutdown. Finally, join()
is used to wait for the client thread to
finish.
While the client is active the blocking_request()
method can be used to send messages to
the KATCP server and wait for replies. If a reply is not received within the
allowed time, a RuntimeError
is raised.
If a reply is received blocking_request()
returns two values. The first is the
Message
containing the reply. The second is a list of
messages containing any KATCP informs associated with the reply.
Using the Callback Client¶
For situations where one wants to communicate with a server
but doesn’t want to wait for a reply, the
CallbackClient
is provided:
from katcp import CallbackClient, Message
device_host = "www.example.com"
device_port = 5000
def reply_cb(msg):
print "Reply:", msg
def inform_cb(msg):
print "Inform:", msg
client = CallbackClient(device_host, device_port)
client.start()
reply, informs = client.callback_request(
Message.request("help"),
reply_cb=reply_cb,
inform_cb=inform_cb,
)
client.stop()
client.join()
Note that the reply_cb()
and inform_cb()
callback functions are both
called inside the client’s event-loop thread so should not perform any
operations that block. If needed, pass the data out from the callback
function to another thread using a Queue.Queue
or similar
structure.
Writing your own Client¶
If neither the BlockingClient
nor
the CallbackClient
provide the
functionality you need then you can sub-class
DeviceClient
which is the base class
from which both are derived.
DeviceClient
has two methods for sending messages:
request()
for sending requestMessages
send_message
for sending arbitraryMessages
Internally request
calls
send_message
to pass messages to the
server.
Note
The send_message()
method does not
return an error code or raise an exception if sending the message
fails. Since the underlying protocol is entirely asynchronous, the only
means to check that a request was successful is receive a reply message. One
can check that the client is connected before sending a message using
is_connected()
.
When the DeviceClient
thread receives a completed message,
handle_message()
is called. The default handle_message()
implementation calls one of handle_reply()
, handle_inform()
or handle_request()
depending on the type of message received.
Note
Sending requests to clients is discouraged. The handle_request()
is provided mostly for completeness and to deal with unforseen
circumstances.
Each of handle_reply()
, handle_inform()
and handle_request()
dispatches messages to methods based on the message name. For example,
a reply message named foo
will be dispatched to reply_foo()
.
Similarly an inform message named bar
will be dispatched to
inform_bar()
. If no corresponding method is found then one of
unhandled_reply()
, unhandled_inform()
or unhandled_request()
is called.
Your own client may hook into this dispatch tree at any point by implementing or overriding the appropriate methods.
An example of a simple client that only handles replies to help
messages is presented below:
from katcp import DeviceClient, Message
import time
device_host = "www.example.com"
device_port = 5000
class MyClient(DeviceClient):
def reply_help(self, msg):
"""Print out help replies."""
print msg.name, msg.arguments
def inform_help(self, msg):
"""Print out help inform messages."""
meth, desc = msg.arguments[:2]
print "---------", meth, "---------"
print
print desc
print "----------------------------"
def unhandled_reply(self, msg):
"""Print out unhandled replies."""
print "Unhandled reply", msg.name
def unhandled_inform(self, msg):
"Print out unhandled informs."""
print "Unhandled inform", msg.name
client = MyClient(device_host, device_port)
client.start()
client.request(Message.request("help"))
client.request(Message.request("watchdog"))
time.sleep(0.5)
client.stop()
client.join()
Client handler functions can use the unpack_message()
decorator from kattypes module to unpack
messages into function arguments in the same way the request()
decorator is used in the server example below, except
that the req parameter is omitted.
Using the high-level client API¶
The high level client API inspects a KATCP device server and presents requests as method calls and sensors as objects.
A high level client for the example server presented in the following section:
import tornado
from tornado.ioloop import IOLoop
from katcp import resource_client
ioloop = IOLoop.current()
client = resource_client.KATCPClientResource(dict(
name='demo-client',
address=('localhost', 5000),
controlled=True))
@tornado.gen.coroutine
def demo():
# Wait until the client has finished inspecting the device
yield client.until_synced()
help_response = yield client.req.help()
print "device help:\n ", help_response
add_response = yield client.req.add(3, 6)
print "3 + 6 response:\n", add_response
# By not yielding we are not waiting for the response
pick_response_future = client.req.pick_fruit()
# Instead we wait for the fruit.result sensor status to change to
# nominal. Before we can wait on a sensor, a strategy must be set:
client.sensor.fruit_result.set_strategy('event')
# If the condition does not occur within the timeout (default 5s), we will
# get a TimeoutException
yield client.sensor.fruit_result.wait(
lambda reading: reading.status == 'nominal')
fruit = yield client.sensor.fruit_result.get_value()
print 'Fruit picked: ', fruit
# And see how the ?pick-fruit request responded by yielding on its future
pick_response = yield pick_response_future
print 'pick response: \n', pick_response
# Finally stop the ioloop so that the program exits
ioloop.stop()
# Note, katcp.resource_client.ThreadSafeKATCPClientResourceWrapper can be used to
# turn the client into a 'blocking' client for use in e.g. ipython. It will turn
# all functions that return tornado futures into blocking calls, and will bounce
# all method calls through the ioloop. In this case the ioloop must be started
# in a separate thread. katcp.ioloop_manager.IOLoopManager can be used to manage
# the ioloop thread.
ioloop.add_callback(client.start)
ioloop.add_callback(demo)
ioloop.start()
Writing your own Server¶
Creating a server requires sub-classing DeviceServer
. This class already provides all the requests and inform
messages required by the KATCP protocol. However, its implementation requires a
little assistance from the subclass in order to function.
A very simple server example looks like:
import threading
import time
import random
from katcp import DeviceServer, Sensor, ProtocolFlags, AsyncReply
from katcp.kattypes import (Str, Float, Timestamp, Discrete,
request, return_reply)
server_host = ""
server_port = 5000
class MyServer(DeviceServer):
VERSION_INFO = ("example-api", 1, 0)
BUILD_INFO = ("example-implementation", 0, 1, "")
# Optionally set the KATCP protocol version and features. Defaults to
# the latest implemented version of KATCP, with all supported optional
# features
PROTOCOL_INFO = ProtocolFlags(5, 0, set([
ProtocolFlags.MULTI_CLIENT,
ProtocolFlags.MESSAGE_IDS,
]))
FRUIT = [
"apple", "banana", "pear", "kiwi",
]
def setup_sensors(self):
"""Setup some server sensors."""
self._add_result = Sensor.float("add.result",
"Last ?add result.", "", [-10000, 10000])
self._time_result = Sensor.timestamp("time.result",
"Last ?time result.", "")
self._eval_result = Sensor.string("eval.result",
"Last ?eval result.", "")
self._fruit_result = Sensor.discrete("fruit.result",
"Last ?pick-fruit result.", "", self.FRUIT)
self.add_sensor(self._add_result)
self.add_sensor(self._time_result)
self.add_sensor(self._eval_result)
self.add_sensor(self._fruit_result)
@request(Float(), Float())
@return_reply(Float())
def request_add(self, req, x, y):
"""Add two numbers"""
r = x + y
self._add_result.set_value(r)
return ("ok", r)
@request()
@return_reply(Timestamp())
def request_time(self, req):
"""Return the current time in seconds since the Unix Epoch."""
r = time.time()
self._time_result.set_value(r)
return ("ok", r)
@request(Str())
@return_reply(Str())
def request_eval(self, req, expression):
"""Evaluate a Python expression."""
r = str(eval(expression))
self._eval_result.set_value(r)
return ("ok", r)
@request()
@return_reply(Discrete(FRUIT))
def request_pick_fruit(self, req):
"""Pick a random fruit."""
r = random.choice(self.FRUIT + [None])
if r is None:
return ("fail", "No fruit.")
delay = random.randrange(1,5)
req.inform("Picking will take %d seconds" % delay)
def pick_handler():
self._fruit_result.set_value(r)
req.reply("ok", r)
self.ioloop.add_callback(
self.ioloop.call_later, delay, pick_handler)
raise AsyncReply
def request_raw_reverse(self, req, msg):
"""
A raw request handler to demonstrate the calling convention if
@request decoraters are not used. Reverses the message arguments.
"""
# msg is a katcp.Message.request object
reversed_args = msg.arguments[::-1]
# req.make_reply() makes a katcp.Message.reply using the correct request
# name and message ID
return req.make_reply('ok', *reversed_args)
if __name__ == "__main__":
server = MyServer(server_host, server_port)
server.start()
server.join()
Notice that MyServer
has three special class attributes
VERSION_INFO
, BUILD_INFO
and
PROTOCOL_INFO
. VERSION_INFO
gives the version of the server
API. Many implementations might use the same
VERSION_INFO
. BUILD_INFO
gives the version of the software
that provides the device. Each device implementation should have a unique
BUILD_INFO
. PROTOCOL_INFO
is an instance of
ProtocolFlags
that describes the KATCP dialect spoken by the server. If
not specified, it defaults to the latest implemented version of KATCP, with all
supported optional features. Using a version different from the default may
change server behaviour; furthermore version info may need to be passed to the
@request
and @return_reply
decorators.
The setup_sensors()
method registers Sensor
objects with the device server. The base class uses this information to
implement the ?sensor-list
, ?sensor-value
and
?sensor-sampling
requests. add_sensor()
should be called once for each sensor the
device should contain. You may create the sensor objects inside
setup_sensors()
(as done in the example) or elsewhere if you wish.
Request handlers are added to the server by creating methods whose names start
with “request_”. These methods take two arguments – the client-request object
(abstracts the client socket and the request context) that the request came from,
and the request message. Notice that the message argument is missing from the
methods in the example. This is a result of the request()
decorator that has been applied to the methods.
The request()
decorator takes a list of
KatcpType
objects describing the request
arguments. Once the arguments have been checked they are passed in to the
underlying request method as additional parameters instead of the request
message.
The return_reply
decorator performs a
similar operation for replies. Once the request method returns a tuple (or list)
of reply arguments, the decorator checks the values of the arguments and
constructs a suitable reply message.
Use of the request()
and return_reply()
decorators is encouraged but entirely optional.
Message dispatch is handled in much the same way as described in the client
example, with the exception that there are no unhandled_request()
,
unhandled_reply()
or unhandled_request()
methods. Instead, the
server will log an exception.
Writing your own Async Server¶
To write a server in the typical tornado async style, modify the example above by adding the following imports
import signal
import tornado
from katcp import AsyncDeviceServer
Also replace class MyServer(DeviceServer) with class MyServer(AsyncDeviceServer) and replace the if __name__ == “__main__”: block with
@tornado.gen.coroutine
def on_shutdown(ioloop, server):
print('Shutting down')
yield server.stop()
ioloop.stop()
if __name__ == "__main__":
ioloop = tornado.ioloop.IOLoop.current()
server = MyServer(server_host, server_port)
# Hook up to SIGINT so that ctrl-C results in a clean shutdown
signal.signal(signal.SIGINT, lambda sig, frame: ioloop.add_callback_from_signal(
on_shutdown, ioloop, server))
ioloop.add_callback(server.start)
ioloop.start()
If multiple servers are started in a single ioloop, on_shutdown()
should
be modified to call stop()
on each server. This is needed to allow a clean
shutdown that adheres to the KATCP spec requirement that a #disconnect inform
is sent when a server shuts down.
Event Loops and Thread Safety¶
As of version 0.6.0, katcp-python was completely reworked to use Tornado as an event- and network library. A typical Tornado application would only use a single tornado.ioloop.IOLoop event-loop instance. Logically independent parts of the application would all share the same ioloop using e.g. coroutines to allow concurrent tasks.
However, to maintain backwards compatiblity with the thread-semantics of older
versions of this library, it supports starting a tornado.ioloop.IOLoop
instance in a new thread for each client or server. Instantiating the
BlockingClient
or CallbackClient
client classes or the
DeviceServer
server class will implement the backward compatible
behaviour by default, while using AsyncClient
or
AsyncDeviceServer
will by default use tornado.ioloop.IOLoop.current()
as the ioloop (can be overidden using their set_ioloop methods), and won’t
enable thread safety by default (can be overridden using
AsyncDeviceServer.set_concurrency_options()
and
AsyncClient.enable_thread_safety()
)
Note that any message (request, reply, iform) handling methods should not
block. A blocking handler will block the ioloop, causing all timed operations
(e.g. sensor strategies), network io, etc. to block. This is particularly
important when multiple servers/clients share a single ioloop. A good solution
for handlers that need to wait on other tasks is to implement them as Tornado
couroutines. A DeviceServer
will not accept another request message
from a client connection until the request handler has completed / resolved its
future. Multiple outstanding requests can be handled concurrently by raising the
AsyncReply
exception in a request handler. It is then the
responsibility of the user to ensure that a reply is eventually sent using the
req object.
If DeviceServer.set_concurrency_options()
has handler_thread=True (the
default for DeviceServer
, AsyncDeviceServer
defaults to
False), all the requests to a server is serialised and handled in a separate
request handing thread. This allows request handlers to block without preventing
sensor strategy updates, providing backwards-compatible concurrency
semantics.
In the case of a purely network-event driven server or client, all user code would execute in the thread context of the server or client event loop. Therefore all handler functions must be non-blocking to prevent unresponsiveness. Unhandled exceptions raised by handlers running in the network event-thread are caught and logged; in the case of servers, an error reply including the traceback is sent over the network interface. Slow operations (such as picking fruit) may be delegated to another thread (if a threadsafe server is used), a callback (as shown in the request_pick_fruit handler in the server example) or tornado coroutine.
If a device is linked to processing that occurs independently of network events,
one approach would be a model thread running in the background. The KATCP
handler code would then defer requests to the model. The model must provide a
thread-safe interface to the KATCP code. If using an async server
(e.g. AsyncDeviceServer
or DeviceServer.set_concurrency_options()
called with thread_safe=False), all interaction with the device server needs
to be through the tornado.ioloop.Ioloop.add_callback()
method of the
server’s ioloop. The server’s ioloop instance can be accessed through its
ioloop attribute. If a threadsafe server (e.g. DeviceServer
with
default concurrency options) or client (e.g. CallbackClient
) is used,
all the public methods provided by this katcp library for sending !replies or
#informs are thread safe.
Updates to Sensor
objects using the public setter methods are always
thread-safe, provided that the same is true for all the observers attached to
the sensor. The server observers used to implement sampling strategies are
threadsafe, even if an asyc server is used.
Backwards Compatibility¶
Server Protocol Backwards Compatibility¶
A minor modification of the first several lines of the example in Writing your own Server suffices to create a KATCP v4 server:
from katcp import DeviceServer, Sensor, ProtocolFlags, AsyncReply
from katcp.kattypes import (Str, Float, Timestamp, Discrete,
request, return_reply)
from functools import partial
import threading
import time
import random
server_host = ""
server_port = 5000
# Bind the KATCP major version of the request and return_reply decorators
# to version 4
request = partial(request, major=4)
return_reply = partial(return_reply, major=4)
class MyServer(DeviceServer):
VERSION_INFO = ("example-api", 1, 0)
BUILD_INFO = ("example-implementation", 0, 1, "")
# Optionally set the KATCP protocol version as 4.
PROTOCOL_INFO = ProtocolFlags(4, 0, set([
ProtocolFlags.MULTI_CLIENT,
]))
The rest of the example follows as before.
Client Protocol Backwards Compatibility¶
The DeviceClient
client automatically detects the
version of the server if it can, see
Server KATCP Version Auto-detection. For a simple client
this means that no changes are required to support different KATCP
versions. However, the semantics of the messages might be different for
different protocl versions. Using the unpack_message
decorator with major=4 for reply or inform
handlers might help here, although it could use some improvement.
In the case of version auto-dection failing for a given server,
preset_protocol_flags
can be
used to set the KATCP version before calling the client’s start()
method.