#!/usr/bin/env python
###############################################################################
# SKA South Africa (http://ska.ac.za/) #
# Author: cam@ska.ac.za #
# Copyright @ 2013 SKA SA. All rights reserved. #
# #
# THIS SOFTWARE MAY NOT BE COPIED OR DISTRIBUTED IN ANY FORM WITHOUT THE #
# WRITTEN PERMISSION OF SKA SA. #
###############################################################################
"""
Simlib library generic simulator generator utility to be used to generate an actual
TANGO device that exhibits the behaviour defined in the data description file.
@author MeerKAT CAM team <cam@ska.ac.za>
"""
import os
import weakref
import logging
import argparse
import time
from PyTango import Attr, AttrWriteType, UserDefaultAttrProp, AttrQuality, Database
from PyTango.server import Device, DeviceMeta, command, attribute
from PyTango import DevState, AttrDataFormat, CmdArgType
from functools import partial
from tango_simlib.model import Model
from tango_simlib.sim_xmi_parser import XmiParser
from tango_simlib.simdd_json_parser import SimddParser
from tango_simlib.sim_sdd_xml_parser import SDDParser
from tango_simlib.sim_test_interface import TangoTestDeviceServerBase
from tango_simlib.model import PopulateModelQuantities, PopulateModelActions
from tango_simlib import helper_module
MODULE_LOGGER = logging.getLogger(__name__)
POGO_USER_DEFAULT_CMD_PROP_MAP = {
'name': 'name',
'arginDescription': 'doc_in',
'arginType': 'dtype_in',
'argoutDescription': 'doc_out',
'argoutType': 'dtype_out'}
class TangoDeviceServerBase(Device):
instances = weakref.WeakValueDictionary()
def init_device(self):
super(TangoDeviceServerBase, self).init_device()
name = self.get_name()
self.model = None
self.instances[name] = self
self.set_state(DevState.ON)
def always_executed_hook(self):
self.model.update()
def read_attributes(self, attr):
"""Method reading an attribute value
Parameters
----------
attr : PyTango.DevAttr
The attribute to read from.
"""
if self.get_state() != DevState.OFF:
name = attr.get_name()
value, update_time = self.model.quantity_state[name]
quality = AttrQuality.ATTR_VALID
self.info_stream("Reading attribute %s", name)
attr.set_value_date_quality(value, update_time, quality)
[docs]def get_tango_device_server(model, sim_data_files):
"""Declares a tango device class that inherits the Device class and then
adds tango commands.
Parameters
----------
model: model.Model instance
Device model instance
sim_data_files: list
A list of direct paths to either xmi/xml/json data files.
Returns
-------
TangoDeviceServer : PyTango.Device
Tango device that has the results of the translated KATCP server
"""
# Declare a Tango Device class for specifically adding commands prior
# running the device server
class TangoDeviceServerCommands(object):
pass
class TangoTestDeviceServerCommands(object):
pass
# Declare a Tango Device class for specifically adding static
# attributes prior running the device server and controller
class TangoDeviceServerStaticAttrs(object):
pass
class TangoTestDeviceServerStaticAttrs(object):
pass
def read_fn(tango_device_instance):
return tango_device_instance._attribute_name_index
def write_fn(tango_device_instance, val):
tango_device_instance._attribute_name_index = val
tango_device_instance.model_quantity = tango_device_instance.model.sim_quantities[
sorted(tango_device_instance.model.sim_quantities.keys())[val]]
def generate_cmd_handler(action_name, action_handler):
def cmd_handler(tango_device, input_parameters=None):
return action_handler(tango_dev=tango_device, data_input=input_parameters)
cmd_handler.__name__ = action_name
cmd_info_copy = model.sim_actions_meta[action_name].copy()
# Delete all the keys that are not part of the Tango command parameters.
cmd_info_copy.pop('name')
tango_cmd_prop = POGO_USER_DEFAULT_CMD_PROP_MAP.values()
for prop_key in model.sim_actions_meta[action_name]:
if prop_key not in tango_cmd_prop:
MODULE_LOGGER.info(
"Warning! Property %s is not a tango command prop", prop_key)
cmd_info_copy.pop(prop_key)
"""
The command method signature:
command(f=None, dtype_in=None, dformat_in=None, doc_in="",
dtype_out=None, dformat_out=None, doc_out="", green_mode=None)
"""
return command(f=cmd_handler, **cmd_info_copy)
def add_static_attribute(tango_device_class, attr_name, attr_meta):
"""Add any TANGO attribute of to the device server before start-up.
Parameters
----------
cls: class
class object that the device server will inherit from
attr_name: str
Tango attribute name
attr_meta: dict
Meta data that enables the creation of a well configured attribute
Note
====
This is needed for DevEnum and spectrum type attribues
"""
attr = attribute(label=attr_meta['label'], dtype=attr_meta['data_type'],
enum_labels=attr_meta['enum_labels']
if 'enum_labels' in attr_meta else '',
doc=attr_meta['description'],
dformat=attr_meta['data_format'],
max_dim_x=attr_meta['max_dim_x'],
max_dim_y=attr_meta['max_dim_y'],
access=getattr(AttrWriteType, attr_meta['writable']))
attr.__name__ = attr_name
# Attribute read method
def read_meth(tango_device_instance, attr):
name = attr.get_name()
value, update_time = tango_device_instance.model.quantity_state[name]
quality = AttrQuality.ATTR_VALID
tango_device_instance.info_stream("Reading attribute %s", name)
# For attributes that have a SPECTRUM data format, there is no need to
# type cast them to an integer data type. we need assign the list of values
# to the attribute value parameter.
if type(value) == list:
attr.set_value_date_quality(value, update_time, quality)
else:
attr.set_value_date_quality(int(value), update_time, quality)
# Attribute write method for writable attributes
if str(attr_meta['writable']) == 'READ_WRITE':
@attr.write
def attr(tango_device_instance, new_val):
# When selecting a model quantity we use the enum labels list indexing
# to return the string value corresponding to the respective enum value
# since an integer value is returned by device server when
# attribute value is read
tango_device_instance.model_quantity = (
tango_device_instance.model.sim_quantities[attr_name])
tango_device_instance.model_quantity.set_val(
new_val, tango_device_instance.model.time_func())
read_meth.__name__ = 'read_{}'.format(attr_name)
# Add the read method and the attribute to the class object
setattr(tango_device_class, read_meth.__name__, read_meth)
setattr(tango_device_class, attr.__name__, attr)
MODULE_LOGGER.info("Adding static attribute {} to the device.".format(attr_name))
# Sim test interface static attribute `attribute_name` info
controllable_attribute_names = model.sim_quantities.keys()
attr_control_meta = dict()
attr_control_meta['enum_labels'] = sorted(controllable_attribute_names)
attr_control_meta['data_format'] = AttrDataFormat.SCALAR
attr_control_meta['data_type'] = CmdArgType.DevEnum
attr_control_meta['label'] = 'Attribute name'
attr_control_meta['description'] = 'Attribute name to control'
attr_control_meta['max_dim_x'] = 1
attr_control_meta['max_dim_y'] = 0
attr_control_meta['writable'] = 'READ_WRITE'
TangoTestDeviceServerStaticAttrs.read_fn = read_fn
TangoTestDeviceServerStaticAttrs.write_fn = write_fn
attr = attribute(
label=attr_control_meta['label'], dtype=attr_control_meta['data_type'],
enum_labels=attr_control_meta['enum_labels']
if 'enum_labels' in attr_control_meta else '',
doc=attr_control_meta['description'],
dformat=attr_control_meta['data_format'],
max_dim_x=attr_control_meta['max_dim_x'],
max_dim_y=attr_control_meta['max_dim_y'],
access=getattr(AttrWriteType, attr_control_meta['writable']),
fget=TangoTestDeviceServerStaticAttrs.read_fn,
fset=TangoTestDeviceServerStaticAttrs.write_fn)
TangoTestDeviceServerStaticAttrs.attribute_name = attr
# We use the `add_static_attribute` method to add DevEnum and Spectrum type
# attributes statically to the tango device before start-up since the
# cannot be well configured when added dynamically. This is suspected
# to be a bug.
# TODO(AR 02-03-2017): Ask the tango community on the upcoming Stack
# Exchange community (AskTango) and also make follow ups on the next tango
# releases.
for quantity_name, quantity in model.sim_quantities.items():
d_type = quantity.meta['data_type']
d_type = str(quantity.meta['data_type'])
d_format = str(quantity.meta['data_format'])
if d_type == 'DevEnum' or d_format == 'SPECTRUM':
add_static_attribute(TangoDeviceServerStaticAttrs, quantity_name,
quantity.meta)
for action_name, action_handler in model.sim_actions.items():
cmd_handler = generate_cmd_handler(action_name, action_handler)
# You might need to turn cmd_handler into an unbound method before you add
# it to the class
setattr(TangoDeviceServerCommands, action_name, cmd_handler)
for action_name, action_handler in model.test_sim_actions.items():
cmd_handler = generate_cmd_handler(action_name, action_handler)
# You might need to turn cmd_handler into an unbound method before you add
# it to the class
setattr(TangoTestDeviceServerCommands, action_name, cmd_handler)
class TangoDeviceServer(TangoDeviceServerBase, TangoDeviceServerCommands,
TangoDeviceServerStaticAttrs):
__metaclass__ = DeviceMeta
def init_device(self):
super(TangoDeviceServer, self).init_device()
self.model = model
self._reset_to_default_state()
def _reset_to_default_state(self):
"""Reset the model's quantities' adjustable attributes to their default
values.
"""
simulated_quantities = self.model.sim_quantities.values()
for simulated_quantity in simulated_quantities:
sim_quantity_meta_info = simulated_quantity.meta
adjustable_attrs = simulated_quantity.adjustable_attributes
for attr in adjustable_attrs:
try:
adjustable_val = float(sim_quantity_meta_info[attr])
except KeyError:
adjustable_val = 0.0
setattr(simulated_quantity, attr, adjustable_val)
def initialize_dynamic_attributes(self):
model_sim_quants = self.model.sim_quantities
attribute_list = set([attr for attr in model_sim_quants.keys()])
for attribute_name in attribute_list:
meta_data = model_sim_quants[attribute_name].meta
attr_dtype = meta_data['data_type']
d_format = meta_data['data_format']
# Dynamically add all attributes except those with DevEnum data type
# and SPECTRUM data format since they are added statically to the
# device class prior to start-up.
if str(attr_dtype) != 'DevEnum' and str(d_format) != 'SPECTRUM':
# The return value of rwType is a string and it is required as a
# PyTango data type when passed to the Attr function.
# e.g. 'READ' -> PyTango.AttrWriteType.READ
rw_type = meta_data['writable']
rw_type = getattr(AttrWriteType, rw_type)
attr = Attr(attribute_name, attr_dtype, rw_type)
attr_props = UserDefaultAttrProp()
for prop in meta_data.keys():
attr_prop_setter = getattr(attr_props, 'set_' + prop, None)
if attr_prop_setter:
attr_prop_setter(str(meta_data[prop]))
else:
MODULE_LOGGER.info(
"No setter function for " + prop + " property")
attr.set_default_properties(attr_props)
self.add_attribute(attr, self.read_attributes)
MODULE_LOGGER.info("Added dynamic {} attribute"
.format(attribute_name))
class SimControl(TangoTestDeviceServerBase, TangoTestDeviceServerCommands,
TangoTestDeviceServerStaticAttrs):
__metaclass__ = DeviceMeta
instances = weakref.WeakValueDictionary()
def init_device(self):
super(SimControl, self).init_device()
name = self.get_name()
self.instances[name] = self
klass_name = get_device_class(sim_data_files)
TangoDeviceServer.TangoClassName = klass_name
TangoDeviceServer.__name__ = klass_name
SimControl.TangoClassName = '%sSimControl' % klass_name
SimControl.__name__ = '%sSimControl' % klass_name
return [TangoDeviceServer, SimControl]
[docs]def get_parser_instance(sim_datafile):
"""This method returns an appropriate parser instance to generate a Tango device
Parameters
----------
sim_datafile : str
A direct path to the xmi/xml/json file.
Returns
------
parser_instance: Parser instance
The Parser object which reads an xmi/xml/json file and parses it into device
attributes, commands, and properties.
"""
extension = os.path.splitext(sim_datafile)[-1]
extension = extension.lower()
parser_instance = None
if extension in [".xmi"]:
parser_instance = XmiParser()
parser_instance.parse(sim_datafile)
elif extension in [".json"]:
parser_instance = SimddParser()
parser_instance.parse(sim_datafile)
elif extension in [".xml"]:
parser_instance = SDDParser()
parser_instance.parse(sim_datafile)
return parser_instance
[docs]def generate_device_server(server_name, sim_data_files, directory=''):
"""Create a tango device server python file
Parameters
---------
server_name: str
Tango device server name
sim_data_files: list
A list of direct paths to either xmi/xml/json data files.
"""
lines = ['#!/usr/bin/env python',
'from PyTango.server import server_run',
('from tango_simlib.tango_sim_generator import ('
'configure_device_model, get_tango_device_server)'),
'\n\n# File generated on {} by tango-simlib-tango-simulator-generator'.format(time.ctime()),
'\n\ndef main():',
' sim_data_files = %s' % sim_data_files,
' model = configure_device_model(sim_data_files)',
' TangoDeviceServers = get_tango_device_server(model, sim_data_files)',
' server_run(TangoDeviceServers)',
'\nif __name__ == "__main__":',
' main()\n']
with open(os.path.join(directory, "%s" % server_name), 'w') as dserver:
dserver.write('\n'.join(lines))
# Make the script executable
os.chmod(os.path.join(directory, "%s" % server_name), 477)
[docs]def get_device_class(sim_data_files):
"""Get device class name from specified xmi/simdd description file
Parameters
----------
sim_data_files: list
A list of direct paths to either xmi/xml/json data files.
Returns
-------
klass_name: str
Tango device class name
"""
if len(sim_data_files) < 1:
raise Exception('No simulator data file specified.')
parser_instance = None
klass_name = ''
for data_file in sim_data_files:
extension = os.path.splitext(data_file)[-1]
extension = extension.lower()
if extension in [".xmi"]:
parser_instance = get_parser_instance(data_file)
elif extension in [".json"] and len(sim_data_files) < 2:
parser_instance = get_parser_instance(data_file)
# Since at the current moment the class name of the tango simulator to be
# generated must be specified in the xmi data file, if no xmi if provided
# the simulator will be given a default name.
if parser_instance:
klass_name = parser_instance.device_class_name
else:
klass_name = 'TangoDeviceServer'
return klass_name
[docs]def get_argparser():
parser = argparse.ArgumentParser(
description="Generate a tango data driven simulator, handling"
"registration as needed. Supports multiple device per process.")
required_argument = partial(parser.add_argument, required=True)
required_argument('--sim-data-file', action='append',
help='Simulator description data files(s) '
'.i.e. can specify multiple files')
required_argument('--directory', help='TANGO server executable path', default='')
required_argument('--dserver-name', help='TANGO server executable command')
return parser
[docs]def main():
arg_parser = get_argparser()
opts = arg_parser.parse_args()
generate_device_server(opts.dserver_name, opts.sim_data_file, directory=opts.directory)
if __name__ == "__main__":
main()