"""
Classes and utilities to manage IxLoad application.
@author yoram@ignissoft.com
"""
import time
from trafficgenerator.tgn_utils import is_true, is_false
from trafficgenerator.trafficgenerator import TrafficGenerator
from ixload.ixl_object import IxlObject, IxlList
[docs]class IxlApp(TrafficGenerator):
""" IxLoad driver. Equivalent to IxLoad Application. """
controller = None
def __init__(self, logger, api_wrapper=None):
""" Set all kinds of application level objects - logger, api, etc.
:param logger: python logger (e.g. logging.getLogger('log'))
:param api_wrapper: api wrapper object inheriting and implementing IxlApi base class.
"""
super(self.__class__, self).__init__()
self.logger = logger
self.api = api_wrapper
IxlObject.logger = self.logger
IxlObject.api = self.api
IxlObject.str_2_class = TYPE_2_OBJECT
[docs] def connect(self, ip='localhost', port=8080):
""" Connect to IxTcl/REST server.
:param server: TxTcl/REST server.
:param port: REST port, ignored for IxTcl server.
"""
self.api.connect(ip, port)
IxlApp.controller = IxlController()
[docs] def disconnect(self):
""" Disconnect from chassis and server. """
self.api.disconnect()
[docs] def load_config(self, config_file_name, test_name='Test1'):
self.repository = IxlRepository(name=config_file_name.replace('\\', '/'), test=test_name)
IxlObject.repository = self.repository
[docs] def save_config(self, config_file_name):
self.repository.save_config(config_file_name.replace('\\', '/'))
#
# IxLoad GUI commands.
#
[docs] def start_test(self, blocking=True):
self.controller.start_test(self.repository.test, blocking)
[docs] def stop_test(self):
self.controller.stop_test()
[docs]class IxlController(IxlObject):
def __init__(self, **data):
data['objType'] = 'ixTestController'
super(self.__class__, self).__init__(**data)
self.set_results_dir(data.get('resultsDir', 'c:/temp/IxLoad'))
[docs] def set_results_dir(self, results_dir):
self.results_dir = results_dir
self.command('setResultDir', self.results_dir)
[docs] def start_test(self, test, blocking=True):
self.api.eval('set ::ixTestControllerMonitor {}')
self.command('run', test.obj_ref())
while is_false(self.command('isBusy')):
time.sleep(1)
rc = self.api.eval('set dummy $::ixTestControllerMonitor')
if rc:
raise Exception(rc)
if blocking:
self.wait_for_test_finish()
self.release_test()
[docs] def stop_test(self):
self.command('stopRun')
self.release_test()
[docs] def wait_for_test_finish(self):
while is_true(self.command('isBusy')):
time.sleep(1)
rc = self.api.eval('set dummy $::ixTestControllerMonitor')
if 'status ok' not in rc.lower() and 'test stopped by the user' not in rc.lower():
raise Exception(rc)
[docs] def release_test(self):
self.command('releaseConfigWaitFinish')
[docs]class IxlRepository(IxlObject):
def __init__(self, **data):
data['objType'] = 'ixRepository'
super(self.__class__, self).__init__(**data)
self.cc = self.get_child('chassisChain')
self.cc.clear()
self.load_test(data.get('test', 'Test1'))
def _create(self, **attributes):
return super(self.__class__, self)._create(name=self._data['name'])
[docs] def load_test(self, name='Test1'):
for test in self.get_children('testList'):
if test.obj_name() == name:
self.test = test
break
for scenario in self.test.get_children('scenarioList'):
for column in scenario.get_children('columnList'):
column.get_children('elementList')
[docs] def save_config(self, name):
self.command('write', destination=name, overwrite=True)
[docs]class IxlChassisChain(IxlObject):
[docs] def clear(self):
for chassis in self.command('getChassisNames').split():
self.command('deleteChassisByName', chassis)
[docs] def append(self, chassis):
chassis_id = 0
for index, name in enumerate(self.command('getChassisNames').split()):
if name == chassis:
chassis_id = index + 1
if not chassis_id:
self.command('addChassis', chassis)
self.command('refresh')
chassis_id = len(self.command('getChassisNames').split())
return chassis_id
[docs]class IxlElement(IxlObject):
def __init__(self, **data):
super(self.__class__, self).__init__(**data)
self.network = self.get_child('network')
[docs] def reserve(self, location):
port_list = IxlList(self.network, 'port')
port_list.clear()
chassis, cardId, portId = location.split('/')
repository = IxlObject.repository
chassisId = repository.cc.append(chassis)
port_list.append(chassisId=chassisId, cardId=cardId, portId=portId)
TYPE_2_OBJECT = {'chassischain': IxlChassisChain,
'element': IxlElement}