import unittest from netmiko import ConnectHandler from testset import Testset import time import re globals = None report = unittest.Report() class TRA884_AUTOMATE_103(unittest.SystemTestCase): @classmethod def setUpClass(inst): """ Add the initial configuration if any """ ne1_dict = { 'host' : globals.NE1_IP, 'http_username' : globals.NE1_httpUsername, 'http_password' : globals.NE1_httpPassword, 'username' : globals.NE1_userName, 'password' : globals.NE1_password, 'port' : globals.NE1_socketNo, 'device_type' : globals.NE1_productType, 'verbose' : True } global device_ne1 device_ne1 = ConnectHandler(**ne1_dict) inst.ne1_uni_ports = [ globals.NE1_UNI1.split("#")[0], globals.NE1_UNI2.split("#")[0] ] inst.ne1_ch1 = inst.ne1_uni_ports[0].split(",")[0] inst.ne1_si1 = inst.ne1_uni_ports[0].split(",")[1] inst.ne1_ch2 = inst.ne1_uni_ports[1].split(",")[0] inst.ne1_si2 = inst.ne1_uni_ports[1].split(",")[1] inst.port_details = { "NE1" : { 'handle' : device_ne1, 'uni_ports' : inst.ne1_uni_ports, 'ont_ports' : [], 'nni_ports' : [] } } inst.port_details = inst.get_config_details(**inst.port_details) inst.packetRate = int(inst.port_details["packet_rate"]) inst.ts_ports = [ globals.NE1_UNI1.split("#")[1], globals.NE1_UNI2.split("#")[1] ] ts_dict = { 'ip' : globals.L3_Testset_IP, 'port_list' : inst.ts_ports } global testset testset = Testset.initialize_testset(**ts_dict) testset.reserve() inst.tc_details = { 'cvlan1' : '230', 'evc_name' : 'TRA884_MT_ELAN_EVC_101', 'cvlan_fpt_name' : 'TRA884_MT_FPT_101', 'total_frames_sent' : '100', 'total_frames_received' : '100' } @classmethod def tearDownClass(inst): """ Add the teardown configuration if any """ print(" TearDown ") device_ne1.tear_down() inst.default_port_switching_params(**inst.port_details) for ts_port in inst.ts_ports: testset.ixDeleteDevice( port = ts_port, device_name = 'all' ) testset.release() device_ne1.disconnect() """Add the definitions for tests defined in suite here""" """ Function description: Uses: To configure the physical parameters of port. """ def configure_phycl_param_ne1(self, ne1_ch, ne1_si, device_port_ne1, **param_dict): port_phycl_param_dict = { } for param, config_value in param_dict.items(): port_phycl_param_dict[param] = config_value device_ne1.edit_port_physical_parameters( ch = ne1_ch, si = ne1_si, interface = device_port_ne1, **port_phycl_param_dict ) def testConfigure_phycl_param_admin_up_ne1(self): param_dict = { 'admin_status' : 'enable' } device_port1_ne1 = self.ne1_uni_ports[0] device_port2_ne1 = self.ne1_uni_ports[1] self.configure_phycl_param_ne1( ne1_ch = self.ne1_ch1, ne1_si = self.ne1_si1, device_port_ne1 = device_port1_ne1, **param_dict ) self.configure_phycl_param_ne1( ne1_ch = self.ne1_ch2, ne1_si = self.ne1_si2, device_port_ne1 = device_port2_ne1, **param_dict ) def testConfigure_phycl_param_admin_down_ne1(self): param_dict = { 'admin_status' : 'disable', } device_port1_ne1 = self.ne1_uni_ports[0] device_port2_ne1 = self.ne1_uni_ports[1] self.configure_phycl_param_ne1( ne1_ch = self.ne1_ch1, ne1_si = self.ne1_si1, device_port_ne1 = device_port1_ne1, **param_dict ) self.configure_phycl_param_ne1( ne1_ch = self.ne1_ch2, ne1_si = self.ne1_si2, device_port_ne1 = device_port2_ne1, **param_dict ) """ Function description: Uses: To configure the switching parameters of port. """ def configure_switch_param_ne1(self, ne1_ch, ne1_si, intf_type, interface, **param_dict): self.intf_type = intf_type self.interface = interface port_switch_param_dict = { } for param, config_value in param_dict.items(): port_switch_param_dict[param] = config_value device_ne1.edit_port_switching_params( ch = ne1_ch, si = ne1_si, intf_type = self.intf_type, interface = self.interface, **port_switch_param_dict ) def testConfigure_switch_param_case1_ne1(self): param_dict = { 'port_type' : 'dot1q', 'port_mode' : 'regular', 'pvid' : '230', 'pvid_egress_untag' : 'disable', 'acceptable_frame_policy' : 'acceptall', 'fpcr_priority_source' : 'vlan_only' } device_port1_ne1 = self.ne1_uni_ports[0] device_port2_ne1 = self.ne1_uni_ports[1] self.configure_switch_param_ne1( ne1_ch = self.ne1_ch1, ne1_si = self.ne1_si1, intf_type = 'interface', interface = device_port1_ne1, **param_dict ) self.configure_switch_param_ne1( ne1_ch = self.ne1_ch2, ne1_si = self.ne1_si2, intf_type = 'interface', interface = device_port2_ne1, **param_dict ) """ Function description: Uses: To create evc ( of type eline or elan). """ def create_evc_ne1(self, ne1_ch, ne1_si, evc_name, evc_type, forwarding_type, unknown_multicast_action, customer_name, **param_dict): evc_param_dict = { 'admin_status' : 'up', 'evc_mode' : 'regular', 'trail_identifier' : '0', 'mvr' : 'disable', } for param, config_value in param_dict.items(): evc_param_dict[param] = config_value device_ne1.create_evc( ch = ne1_ch, si = ne1_si, evc_name = evc_name, evc_type = evc_type, forwarding_type = forwarding_type, unknown_multicast_action = unknown_multicast_action, customer_name = customer_name, **evc_param_dict ) def testCreate_elan_evc_case1_ne1(self): self.create_evc_ne1( ne1_ch = self.ne1_ch1, ne1_si = self.ne1_si1, evc_name = 'TRA884_MT_ELAN_EVC_101', evc_type = 'elan', forwarding_type = 'cvlan', unknown_multicast_action = 'drop', customer_name = 'tejas' ) """ Function description: Uses: To create Flowpoint template. """ def create_fpt_ne1(self, ne1_ch, ne1_si, fpt_name, fpt_type, **param_dict): fpt_param_dict = { 'delete_if_exists' : True, } for param, config_value in param_dict.items(): fpt_param_dict[param] = config_value device_ne1.create_fpt( ch = ne1_ch, si = ne1_si, fpt_name = fpt_name, fpt_type = fpt_type, **fpt_param_dict ) def testCreate_fpt_case1_ne1(self): fpt_config_dict = { 'cvlan_low' : '200', 'cvlan_high' : '300' } self.create_fpt_ne1( ne1_ch = self.ne1_ch1, ne1_si = self.ne1_si1, fpt_name = 'TRA884_MT_FPT_101', fpt_type = 'cvlan', **fpt_config_dict ) """ Function description: Uses: To create Flowpoint. """ def create_flowpoint_ne1(self, ne1_ch, ne1_si, evc_name, intf_type, interface, fpt_name, fp_mode, **param_dict): fp_param_dict = { 'traffic_type' : 'data', } for param, config_value in param_dict.items(): fp_param_dict[param] = config_value device_ne1.create_flowpoint( ch = ne1_ch, si = ne1_si, evc_name = evc_name, intf_type = intf_type, interface = interface, fpt_name = fpt_name, fp_mode = fp_mode, **fp_param_dict ) def testCreate_flowpoint_case1_ne1(self): self.create_flowpoint_ne1( ne1_ch = self.ne1_ch1, ne1_si = self.ne1_si1, evc_name = 'TRA884_MT_ELAN_EVC_101', intf_type = 'interface', interface = self.ne1_uni_ports[0], fpt_name = 'TRA884_MT_FPT_101', fp_mode = 'dot1q' ) self.create_flowpoint_ne1( ne1_ch = self.ne1_ch2, ne1_si = self.ne1_si2, evc_name = 'TRA884_MT_ELAN_EVC_101', intf_type = 'interface', interface = self.ne1_uni_ports[1], fpt_name = 'TRA884_MT_FPT_101', fp_mode = 'dot1q' ) """ Function description: Uses: To configure a RAW stream block. """ def configure_rw_stream_block_TS1(self, stream_id): port = self.ts_ports[0] req_dict = { 'port' : port, 'streamID' : str(stream_id), 'FrameLengthMode' : 'fixedSize', 'FrameSize' : '1000', 'DurationMode' : 'Bursts', 'Duration' : self.tc_details['total_frames_sent'] } testset.ixConfStreamBlock(**req_dict) testset.save_testset_config() """ Function description: Uses: To configure a single vlan tagged frame. """ def configure_traffic_single_tagged_TS1(self, spirent_port, streamID,**param_dict): self.configure_rw_stream_block_TS1(streamID) frame_config_param_dict = { 'port' : spirent_port, 'streamID' : str(streamID), 'dstMAC' : '00:ce:00:01:02:03', 'srcMAC' : '00:ab:01:00:00:02', 'etherType' : '88b5', 'cvid' : '256', } for param, config_value in param_dict: frame_config_param_dict[param] = config_value testset.ixConfDot1ADFrame(**frame_config_param_dict) def testConfigure_traffic_single_tagged_case1_TS1(self): sp_port = self.ts_ports[0] streamId = 1 self.configure_traffic_single_tagged_TS1( spirent_port = sp_port, streamID = streamId ) """ Function description: Uses: To check one or more fields from captured packet. """ def testValidate_traffic_TS1_to_TS2(self): testset.ixCheckStreamParams( txPort = self.ts_ports[0], rxPort = self.ts_ports[1], paramList = ['FrameCount'], streamID = str(1) ) """ Function description: Uses: To start the traffic on the ports. """ def testStart_traffic_TS1(self): testset.ixStartTraffic( portList = [self.ts_ports[0], self.ts_ports[1]], clearStats = True ) time.sleep(5) """ Function description: Uses: To start the capturing packets on port. """ def testStart_capture_TS2(self): testset.ixStartStopPacketCapture( portList = [self.ts_ports[1]], capture = 'start' ) def testStop_capture_TS2(self): time.sleep(10) testset.ixStartStopPacketCapture( portList = [self.ts_ports[1]], capture = 'stop' ) def testCheck_single_tagged_packet_TS1_to_TS_2(self): param_dict = { 'txPortList' : [self.ts_ports[0]], 'rxPortList' : [self.ts_ports[1]], 'filter' : 'eth.src==00:ab:01:00:00:02', 'checkRxVID' : self.tc_details['cvlan1'], 'checkEthType' : '88 b5', 'checkRxFrames' : self.tc_details['total_frames_received'] } testset.ixCheckTrafficSanity(**param_dict) def suite(): suite = unittest.TestSuite() """Add your tests into the suite here""" suite.addTest(TRA884_AUTOMATE_103("testConfigure_phycl_param_admin_down_ne1")) suite.addTest(TRA884_AUTOMATE_103("testConfigure_phycl_param_admin_up_ne1")) suite.addTest(TRA884_AUTOMATE_103("testConfigure_switch_param_case1_ne1")) suite.addTest(TRA884_AUTOMATE_103("testCreate_elan_evc_case1_ne1")) suite.addTest(TRA884_AUTOMATE_103("testCreate_fpt_case1_ne1")) suite.addTest(TRA884_AUTOMATE_103("testCreate_flowpoint_case1_ne1")) suite.addTest(TRA884_AUTOMATE_103("testConfigure_traffic_single_tagged_case1_TS1")) suite.addTest(TRA884_AUTOMATE_103("testStart_capture_TS2")) suite.addTest(TRA884_AUTOMATE_103("testStart_traffic_TS1")) suite.addTest(TRA884_AUTOMATE_103("testStop_capture_TS2")) suite.addTest(TRA884_AUTOMATE_103("testValidate_traffic_TS1_to_TS2")) suite.addTest(TRA884_AUTOMATE_103("testCheck_single_tagged_packet_TS1_to_TS_2")) return suite def main(): runner = unittest.TextTestRunner() return runner.run(suite())