Source code for ixnetwork.ixn_topology

"""
@author yoram@ignissoft.com
"""

import os
import time

from trafficgenerator.tgn_utils import TgnError

from ixnetwork.ixn_object import IxnObject


[docs]class IxnTopologyBaseClass(IxnObject): action_2_status = {'start': 'started', 'stop': 'notStarted'}
[docs] def get_device_groups(self): """ :return: dictionary {name: object} of all device groups. """ return {o.obj_name(): o for o in self.get_objects_or_children_by_type('deviceGroup')}
[docs] def action(self, action, timeout=64, *arguments): self.api.execute(action, self.obj_ref(), *arguments) now = time.time() while self.get_attribute('status') != self.action_2_status[action] and time.time() - now <= timeout: time.sleep(1) if time.time() - now > timeout: raise TgnError('Failed to {} protocols after {} seconds'.format(action, time.time() - now))
[docs] def start(self): """ Start the protocol. The function will return after the protocol is started. """ self.action('start')
[docs] def stop(self): """ Stop the protocol. The function will return after the protocol is stopped. """ self.action('stop')
[docs]class IxnTopology(IxnTopologyBaseClass): """ Represents IXN topology. Currently supports topology under single port only. """ def __init__(self, **data): data['objType'] = 'topology' data['parent'] = self.root super(self.__class__, self).__init__(**data)
[docs]class IxnMultivalueBase(IxnTopologyBaseClass): """ Represents IXN device group. """
[docs]class IxnDeviceGroup(IxnTopologyBaseClass): """ Represents IXN device group. """ def __init__(self, **data): data['objType'] = 'deviceGroup' super(IxnDeviceGroup, self).__init__(**data)
[docs]class IxnNextGenProtocol(IxnTopologyBaseClass): """ Represents IXN device group. """ def __init__(self, **data): data['objType'] = self.protocol super(IxnNextGenProtocol, self).__init__(**data)
[docs] def getProtocolItems(self): items = {} for item in self.getChildren('item'): name = self.getName() + ' ' + os.path.basename(item).split(':')[1] objectDict = {'-name': name, '-objRef': item, '-type': 'ProtocolNgpfItem', '-subType': self.getSubType()} items[name] = IxnNextGenProtocolItem(self.api, objectDict) return items
[docs]class IxnNgpfEthernet(IxnNextGenProtocol): protocol = 'ethernet'
[docs]class IxnNgpfIpv4(IxnNextGenProtocol): protocol = 'ipv4'
[docs]class IxnNextGenProtocolItem(IxnTopologyBaseClass): pass