Source code for tango_simlib.model

#!/usr/bin/env python
###############################################################################
# SKA South Africa (http://ska.ac.za/)                                        #
# Author: cam@ska.ac.za                                                       #
# Copyright @ 2013 SKA SA. All rights reserved.                               #
#                                                                             #
# THIS SOFTWARE MAY NOT BE COPIED OR DISTRIBUTED IN ANY FORM WITHOUT THE      #
# WRITTEN PERMISSION OF SKA SA.                                               #
###############################################################################

import logging
import time
import weakref
import sys
import importlib

from functools import partial
from tango_simlib import quantities

from PyTango import (DevBoolean, DevString, DevEnum,
                     DevDouble, DevFloat, DevLong, DevVoid)


MODULE_LOGGER = logging.getLogger(__name__)

model_registry = weakref.WeakValueDictionary()

DEFAULT_TANGO_COMMANDS = frozenset(['State', 'Status', 'Init'])
MAX_NUM_OF_CLASS_ATTR_OCCURENCE = 1
ARBITRARY_DATA_TYPE_RETURN_VALUES = {
    DevString: 'Ok!',
    DevBoolean: True,
    DevDouble: 4.05,
    DevFloat: 8.1,
    DevLong: 3,
    DevVoid: None}

# In the case where an attribute with contant quantity simulation type is
# specified, this dict is used to convert the initial value if specified to
# the data-type corresponding to the attribute data-type.
INITIAL_CONSTANT_VALUE_TYPES = {
    DevString: (str, ""),
    DevFloat: (float, 0.0),
    DevDouble: (float, 0.0),
    DevBoolean: (bool, False),
    DevEnum: (int, 0)}


[docs]class Model(object): """Tango Device main model with quantities and actions Parameters ---------- name : str Model name identifier start_time : float Time at instantiation of the model min_update_period : float Minimun update period of the quantites in the model time_func : time function Function that return current time i.e. time.time """ def __init__(self, name, start_time=None, min_update_period=0.99, time_func=time.time): self.name = name model_registry[self.name] = self self.min_update_period = min_update_period self.time_func = time_func self.start_time = start_time or time_func() self.last_update_time = self.start_time self.sim_quantities = {} self.sim_actions = {} self.test_sim_actions = {} self.sim_actions_meta = {} self._sim_state = {} self.setup_sim_quantities() self.override_pre_updates = [] self.override_post_updates = [] self.paused = False # Flag to pause updates # Making a public reference to _sim_state. Allows us to hook read-only views # or updates or whatever the future requires of this humble public attribute. self.quantity_state = self._sim_state
[docs] def setup_sim_quantities(self): """Set up self.sim_quantities with simulated quantities Subclasses should implement this method. Should place simulated quantities in self.sim_quantities dict. Keyed by name of quantity, value must be instances satisfying the :class:`quantites.Quantity` interface. Notes ===== - Must use self.start_time to set initial time values. - Must call super method after setting up `sim_quantities` """ self._sim_state.update( {var: (quant.last_val, quant.last_update_time) for var, quant in self.sim_quantities.items()})
[docs] def update(self): sim_time = self.time_func() dt = sim_time - self.last_update_time if dt < self.min_update_period or self.paused: # Updating the sim_state in case the test interface or external command # updated the quantities. for var, quant in self.sim_quantities.items(): self._sim_state[var] = (quant.last_val, quant.last_update_time) MODULE_LOGGER.debug( "Sim {} skipping update at {}, dt {} < {} and pause {}" .format(self.name, sim_time, dt, self.min_update_period, self.paused)) return for override_update in self.override_pre_updates: override_update(self, sim_time, dt) MODULE_LOGGER.info("Stepping at {}, dt: {}".format(sim_time, dt)) self.last_update_time = sim_time try: for var, quant in self.sim_quantities.items(): self._sim_state[var] = (quant.next_val(sim_time), sim_time) except Exception: MODULE_LOGGER.exception('Exception in update loop') for override_update in self.override_post_updates: override_update(self, sim_time, dt)
[docs] def set_sim_action(self, name, handler): """Add an action handler function Parameters ---------- name : str Name of the action handler : callable(model_instance, action_args) Callable that handles action (name). Is called with the model instance as the first parameter. """ self.sim_actions[name] = partial(handler, self)
[docs] def set_test_sim_action(self, name, handler): """Add an action handler function Parameters ---------- name : str Name of the action handler : callable(model_instance, action_args) Callable that handles action (name). Is called with the model instance as the first parameter. """ self.test_sim_actions[name] = partial(handler, self)
[docs]class PopulateModelQuantities(object): """Used to populate/update model quantities Populates the model quantities using the data from the TANGO device information captured in the POGO generated xmi file. Attributes ---------- parser_instance : Parser instance The Parser object which reads an xmi/xml/json file and parses it into device attributes, commands, and properties. sim_model : Model instance An instance of the Model class which is used for simulation of simple attributes. """ def __init__(self, parser_instance, tango_device_name, sim_model=None): self.parser_instance = parser_instance if sim_model: if isinstance(sim_model, Model): self.sim_model = sim_model else: raise SimModelException("The sim_model object passed is not an " "instance of the class mkat_tango.simlib.model.Model") else: self.sim_model = Model(tango_device_name) self.setup_sim_quantities()
[docs] def setup_sim_quantities(self): """Set up self.sim_quantities from Model with simulated quantities Places simulated quantities in sim_quantities dict. Keyed by name of quantity, value must be instances satifying the :class:`quantities.Quantity` interface Notes ===== - Must use self.start_time to set initial time values. - Must call super method after setting up `sim_quantities` """ start_time = self.sim_model.start_time attributes = self.parser_instance.get_reformatted_device_attr_metadata() for attr_name, attr_props in attributes.items(): # When using more than one config file, the attribute meta data can be # overwritten, so we need to update it instead of reassigning a different # object. try: model_attr_props = self.sim_model.sim_quantities[attr_name].meta except KeyError: MODULE_LOGGER.info( "Initializing '{}' quantity meta information using config file:" " '{}'.".format(attr_name, self.parser_instance.data_description_file_name)) model_attr_props = attr_props else: # Before the model attribute props dict is updated, the # parameter keys with no values specified from the attribute # props template are removed. # i.e. All optional parameters not provided in the SIMDD attr_props = dict((param_key, param_val) for param_key, param_val in attr_props.iteritems() if param_val) model_attr_props = dict(model_attr_props.items() + attr_props.items()) if model_attr_props.has_key('quantity_simulation_type'): if model_attr_props['quantity_simulation_type'] == 'ConstantQuantity': try: initial_value = model_attr_props['initial_value'] except KeyError: # `initial_value` is an optional parameter, thus if not # specified in the SIMDD datafile, an initial value of # default value of is assigned to the attribute # quantity initial value initial_value = None MODULE_LOGGER.info( "Parameter `initial_value` does not exist for" "attribute {}. Default will be used".format( model_attr_props['name'])) attr_data_type = model_attr_props['data_type'] init_val = (initial_value if initial_value not in [None, ""] else INITIAL_CONSTANT_VALUE_TYPES[attr_data_type][-1]) start_val = INITIAL_CONSTANT_VALUE_TYPES[attr_data_type][0](init_val) quantity_factory = ( quantities.registry[attr_props['quantity_simulation_type']]) self.sim_model.sim_quantities[attr_name] = quantity_factory( start_time=start_time, meta=model_attr_props, start_value=start_val) else: try: sim_attr_quantities = self.sim_attribute_quantities( float(model_attr_props['min_bound']), float(model_attr_props['max_bound']), float(model_attr_props['max_slew_rate']), float(model_attr_props['mean']), float(model_attr_props['std_dev'])) except KeyError: raise ValueError( "Attribute with name '{}' specified in the configuration" " file [{}] has no mininum or maximum values set".format( attr_name, self.parser_instance.data_description_file_name)) quantity_factory = ( quantities.registry[attr_props['quantity_simulation_type']]) self.sim_model.sim_quantities[attr_name] = quantity_factory( start_time=start_time, meta=model_attr_props, **sim_attr_quantities) else: self.sim_model.sim_quantities[attr_name] = quantities.ConstantQuantity( start_time=start_time, meta=model_attr_props, start_value=True) self.sim_model.setup_sim_quantities()
[docs] def sim_attribute_quantities(self, min_bound, max_bound, max_slew_rate, mean, std_dev): """Simulate attribute quantities with a Guassian value distribution Parameters ---------- min_value : float minimum attribute value to be simulated max_value : float maximum attribute value to be simulated max_slew_rate : float maximum changing rate of the simulated quantities between min and max values mean : float average value of the simulated quantity std_dev : float starndard deviation value of the simulated quantity Returns ------- sim_attribute_quantities : dict Dict of Gaussian simulated quantities """ sim_attribute_quantities = dict() sim_attribute_quantities['max_slew_rate'] = max_slew_rate sim_attribute_quantities['min_bound'] = min_bound sim_attribute_quantities['max_bound'] = max_bound sim_attribute_quantities['mean'] = mean sim_attribute_quantities['std_dev'] = std_dev return sim_attribute_quantities
[docs]class PopulateModelActions(object): """Used to populate/update model actions Populates the model actions using the data from the TANGO device information captured in the POGO generated xmi file. Attributes ---------- command_info : dict A dictionary of all the device commands together with their metadata specified in the POGO generated XMI file. The key represents the name of the command and the value is a dictionary of all the attribute's metadata. sim_model : Model instance An instance of the Model class which is used for simulation of simple attributes and/or commands. """ def __init__(self, parser_instance, tango_device_name, model_instance=None): self.parser_instance = parser_instance if model_instance is None: self.sim_model = Model(tango_device_name) else: self.sim_model = model_instance self.add_actions()
[docs] def add_actions(self): command_info = self.parser_instance.get_reformatted_cmd_metadata() override_info = self.parser_instance.get_reformatted_override_metadata() instances = {} if override_info != {}: instances = self._get_class_instances(override_info) # Need to override the model's update method if the override class provides one. instance = [] for instance_ in instances: if instance_.startswith('Sim'): instance.append(instances[instance_]) for inst in instance: try: pre_update_overwrite = getattr(inst, 'pre_update') except AttributeError: MODULE_LOGGER.info("No pre-update method defined in the '{}'" " override class.".format(type(inst).__name__)) else: self.sim_model.override_pre_updates.append(pre_update_overwrite) try: post_update_overwrite = getattr(inst, 'post_update') except AttributeError: MODULE_LOGGER.info("No pre-update method defined in the '{}'" " override class.".format(type(inst).__name__)) else: self.sim_model.override_post_updates.append(post_update_overwrite) for cmd_name, cmd_meta in command_info.items(): # Exclude the TANGO default commands as they have their own built in handlers # provided. if cmd_name in DEFAULT_TANGO_COMMANDS: continue # Every command is to be declared to have one or more action behaviour. # Example of a list of actions handle at this moment is as follows # [{'behaviour': 'input_transform', # 'destination_variable': 'temporary_variable'}, # {'behaviour': 'side_effect', # 'destination_quantity': 'temperature', # 'source_variable': 'temporary_variable'}, # {'behaviour': 'output_return', # 'source_variable': 'temporary_variable'}] actions = cmd_meta.get('actions', []) instance = None if cmd_name.startswith('test_'): cmd_name = cmd_name.split('test_')[1] for instance_ in instances: if instance_.startswith('SimControl'): instance = instances[instance_] self._check_override_action_presence(cmd_name, instance, 'test_action_{}') handler = getattr( instance, 'test_action_{}'.format(cmd_name.lower()), self.generate_action_handler(cmd_name, cmd_meta['dtype_out'], actions)) self.sim_model.set_test_sim_action(cmd_name, handler) else: for instance_ in instances: if instance_.startswith('Sim'): instance = instances[instance_] self._check_override_action_presence(cmd_name, instance, 'action_{}') handler = getattr(instance, 'action_{}'.format(cmd_name.lower()), self.generate_action_handler( cmd_name, cmd_meta['dtype_out'], actions)) self.sim_model.set_sim_action(cmd_name, handler) # Might store the action's metadata in the sim_actions dictionary # instead of creating a separate dict. try: self.sim_model.sim_actions_meta[cmd_name.split('test_')[1]] = cmd_meta except IndexError: self.sim_model.sim_actions_meta[cmd_name] = cmd_meta
def _get_class_instances(self, override_class_info): instances = {} for klass_info in override_class_info.values(): if klass_info['module_directory'] == 'None': module = importlib.import_module(klass_info['module_name']) else: sys.path.append(klass_info['module_directory']) module = importlib.import_module(klass_info['module_name']) sys.path.remove(klass_info['module_directory']) klass = getattr(module, klass_info['class_name']) instance = klass() instances[klass_info['name']] = instance return instances def _check_override_action_presence(self, cmd_name, instance, action_type): instance_attributes = dir(instance) instance_attributes_list = [attr.lower() for attr in instance_attributes] attr_occurences = instance_attributes_list.count( action_type.format(cmd_name.lower())) # Check if there is only one override class method defined for each command if attr_occurences > MAX_NUM_OF_CLASS_ATTR_OCCURENCE: raise Exception("The command '{}' has multiple override methods defined" " in the override class".format(cmd_name)) # Assuming that there is only one override method defined, now we check if it # is in the correct letter case. elif attr_occurences == MAX_NUM_OF_CLASS_ATTR_OCCURENCE: try: instance_attributes.index(action_type.format(cmd_name.lower())) except ValueError: raise Exception("Only lower-case overide method names are supported.")
[docs] def generate_action_handler(self, action_name, action_output_type, actions=None): """Generates and returns an action handler to manage tango commands Parameters ---------- action_name : str Name of action handler to generate action_output_type : PyTango._PyTango.CmdArgType Tango command argument type actions : list List of actions that the handler will provide Returns ------- action_handler : function action handler, taking command input argument in case of tango commands with input arguments. """ if actions is None: actions = [] def action_handler(model, data_input=None, tango_dev=None): """Action handler taking command input arguments Parameters ---------- model : model.Model Model instance data_in : float, string, int, etc. Input arguments of tango command Returns ------- return_value : float, string, int, etc. Output value of an executed tango command """ # TODO (KM 18-01-2016): Need to remove the tango_dev parameter from # action hanlder, currently used for testing functionality of the # override class actions. temp_variables = {} return_value = None for action in actions: if action['behaviour'] == 'input_transform': temp_variables[action['destination_variable']] = data_input if action['behaviour'] == 'side_effect': quantity = action['destination_quantity'] temp_variables[action['source_variable']] = data_input model_quantity = model.sim_quantities[quantity] model_quantity.set_val(data_input, model.time_func()) if action['behaviour'] == 'output_return': if 'source_variable' in action and 'source_quantity' in action: raise ValueError( "{}: Either 'source_variable' or 'source_quantity'" " for 'output_return' action, not both" .format(action_name)) elif 'source_variable' in action: source_variable = action['source_variable'] try: return_value = temp_variables[source_variable] except KeyError: raise ValueError( "{}: Source variable {} not defined" .format(action_name, source_variable)) elif 'source_quantity' in action: quantity = action['source_quantity'] try: model_quantity = model.sim_quantities[quantity] except KeyError: raise ValueError( "{}: Source quantity {} not defined" .format(action_name, quantity)) return_value = model_quantity.last_val else: raise ValueError( "{}: Need to specify one of 'source_variable' " "or 'source_quantity' for 'output_return' action" .format(action_name)) else: # Return a default value if output_return is not specified. return_value = ARBITRARY_DATA_TYPE_RETURN_VALUES[action_output_type] return return_value action_handler.__name__ = action_name return action_handler
[docs]class SimModelException(Exception): def __init__(self, message): super(SimModelException, self).__init__(message)