Source code for ixnetwork.ixn_topology
"""
@author yoram@ignissoft.com
"""
import os
import time
from trafficgenerator.tgn_utils import TgnError
from ixnetwork.ixn_object import IxnObject
[docs]class IxnTopologyBaseClass(IxnObject):
action_2_status = {'start': 'started',
'stop': 'notStarted'}
[docs] def get_device_groups(self):
"""
:return: dictionary {name: object} of all device groups.
"""
return {o.obj_name(): o for o in self.get_objects_or_children_by_type('deviceGroup')}
[docs] def action(self, action, timeout=64, *arguments):
self.api.execute(action, self.obj_ref(), *arguments)
now = time.time()
while self.get_attribute('status') != self.action_2_status[action] and time.time() - now <= timeout:
time.sleep(1)
if time.time() - now > timeout:
raise TgnError('Failed to {} protocols after {} seconds'.format(action, time.time() - now))
[docs] def start(self):
""" Start the protocol.
The function will return after the protocol is started.
"""
self.action('start')
[docs] def stop(self):
""" Stop the protocol.
The function will return after the protocol is stopped.
"""
self.action('stop')
[docs]class IxnTopology(IxnTopologyBaseClass):
""" Represents IXN topology.
Currently supports topology under single port only.
"""
def __init__(self, **data):
data['objType'] = 'topology'
data['parent'] = self.root
super(self.__class__, self).__init__(**data)
[docs]class IxnMultivalueBase(IxnTopologyBaseClass):
""" Represents IXN device group. """
[docs]class IxnDeviceGroup(IxnTopologyBaseClass):
""" Represents IXN device group. """
def __init__(self, **data):
data['objType'] = 'deviceGroup'
super(IxnDeviceGroup, self).__init__(**data)
[docs]class IxnNextGenProtocol(IxnTopologyBaseClass):
""" Represents IXN device group. """
def __init__(self, **data):
data['objType'] = self.protocol
super(IxnNextGenProtocol, self).__init__(**data)
[docs] def getProtocolItems(self):
items = {}
for item in self.getChildren('item'):
name = self.getName() + ' ' + os.path.basename(item).split(':')[1]
objectDict = {'-name': name,
'-objRef': item,
'-type': 'ProtocolNgpfItem',
'-subType': self.getSubType()}
items[name] = IxnNextGenProtocolItem(self.api, objectDict)
return items
[docs]class IxnNgpfEthernet(IxnNextGenProtocol):
protocol = 'ethernet'
[docs]class IxnNgpfIpv4(IxnNextGenProtocol):
protocol = 'ipv4'
[docs]class IxnNextGenProtocolItem(IxnTopologyBaseClass):
pass