Source code for ixnetwork.ixn_traffic
"""
@author yoram@ignissoft.com
"""
import os
from ixnetwork.ixn_object import IxnObject
[docs]class IxnTrafficItem(IxnObject):
""" Base class for all traffic items """
def __init__(self, **data):
""" Create new traffic item object in the API. """
data['objType'] = 'trafficItem'
data['parent'] = self.root.get_child_static('traffic')
super(IxnTrafficItem, self).__init__(**data)
if 'objRef' not in data:
# Set traffic item type.
self.set_attributes(commit=True, trafficItemType=type_2_obj[type(self)])
# Change new class to one of its sub-classes based on traffic item type.
self.__class__ = type_2_obj[self.get_attribute('trafficItemType')]
def _create(self):
""" Create new object on IxNetwork.
:return: IXN object reference.
"""
if 'name' in self._data:
obj_ref = self.api.add(self.obj_parent(), self.obj_type(), name=self.obj_name())
else:
obj_ref = self.api.add(self.obj_parent(), self.obj_type())
self.api.commit()
return self.api.remapIds(obj_ref)
[docs] def get_flow_groups(self):
"""
:return: dictionary {name: object} of all flow groups.
"""
return {o.obj_name(): o for o in self.get_objects_or_children_by_type('highLevelStream')}
[docs] def generate(self):
self.api.execute('generate', self.getObjRef())
[docs] def start(self):
self.api.execute('startStatelessTraffic', self.getObjRef())
[docs] def stop(self):
self.api.execute('stopStatelessTraffic', self.getObjRef())
[docs]class IxnL23TrafficItem(IxnTrafficItem):
""" L23 traffic item. """
pass
[docs]class IxnQuickTrafficItem(IxnTrafficItem):
""" L23 traffic item. """
pass
[docs]class IxnFlowGroup(IxnObject):
""" Base class for all IXN flow groups. """
pass
[docs]class IxnL23FlowGroup(IxnFlowGroup):
""" L23 flow group """
pass
[docs]class IxnL23QuickFlowGroup(IxnFlowGroup):
""" L23 quick flow group """
[docs] def generate(self):
self.api.execute('generate', os.path.dirname(self.obj_ref()))
type_2_obj = {'l2L3': IxnL23TrafficItem,
'quick': IxnQuickTrafficItem,
IxnL23TrafficItem: 'l2L3',
IxnQuickTrafficItem: 'quick'}