Commit 002f5c3e authored by manuel's avatar manuel Committed by Benoit Rat

test: Add test files

configuration.cfg include all configuration of tests, testNIC.py
varifies the Network Interface Core, testWR.py tries of correct
operation White rabbit, testDIO,py verifies all ports of boards,
testAdvDIO.py verify the correct reception and sending of packages,
testAll.py is to execute all tests. The class ssh.py is used to execute
commands by ssh, dio.py is to interact with the ports and vuart.py
execute commands by virtual uart.
parent d815d716
[config]
hostName1 = 172.17.5.155
userName1 = test
password1 = test
busIdSpec1 = 0x01
interfaceName1 = wr0
ipWR1 = 192.168.2.100
hostName2 = 172.17.5.238
userName2 = test
password2 =test
busIdSpec2 = 0x01
ipWR2 = 192.168.2.200
interfaceName2 = wr0
# @file dio.py
#
# @brief
#
# @author Manuel Castilla
# @ingroup
# @date 23/04/2019
#
###########################################################################
from ssh import SshCmd,SshCmdException
## Class DIO
#
class DIO ():
def __init__(self,hostName,userName,password,interfaceName):
"""
Construcctor
Args:
hostName: ip or hostname
userName: username
password : password
interfaceName: interface name
"""
self.hostName = hostName
self.userName = userName
self.password = password
self.interfaceName = interfaceName
self.command = "sudo wr-dio-cmd " + interfaceName
self.sshCmd = SshCmd()
def configurePort(self,channel,mode):
"""
Configure the channel
Args:
channel: number 0 to 4
mode: I/i(Input), 0/1(Output, steady state fixed at 0 low or 1 high),D/d(DIO core output),
P/p(Channel 0 Output PPS),C/c(Channel 4 Clock Input to PTP core)
Raises:
error function
"""
try:
confCommand = self.command + " mode " + str(channel) + " " + mode
self.execCommand(confCommand)
except:
print("Error configure port, channel:",channel,"mode:",mode)
raise
def configurePorts(self,modechannels):
"""
Configure all channels
Args:
modechannels: <modech0><modech1><modech2><modech3><modech4>
Raises:
error function
"""
try:
confCommand = self.command + " mode " + modechannels
self.execCommand(confCommand)
except:
print("Error configure ports, mode channels:",modechannels)
raise
def armPulse(self,channel,period,count):
"""
generate a pulse in a channel
Args:
channel: number 0 to 4
period: pulse period
count: string whih number of instances to run
Raises:
error function
"""
try:
armCommand = self.command + " pulse " + str(channel) + " " + str(period) + " " + count
self.execCommand(armCommand)
except:
print("Error arm pulse, channel:",channel,"period:",period,"count:",count)
raise
def getTimestamp(self,channel):
"""
Get timestamp of channel
Args:
channel: number 0 to 4
Raises:
error function
"""
try:
ret = []
getCommand = self.command + " stamp " + str(channel) + " | cut -f2 -d ,"
stamp = self.execCommand(getCommand)
stampSplit = stamp.split()
for i in stampSplit:
ret.append(float(i))
return ret
except:
print("Error get timestamp, channel:",channel)
raise
def clearTimestamps(self):
"""
Clear all timestamps
Raises:
error function
"""
try:
command = self.command + " stamp &> /dev/null"
self.execCommand(command)
except:
print("Error clear timestamps")
raise
def execCommand(self,command):
"""
Execute a command
Args:
command: command to execute
Raises:
error function
"""
try:
return self.sshCmd.exec(self.hostName,command,self.userName,self.password)
except SshCmdException as e:
self.sshCmd.printSshErrorCommand(e.strError) #print error command
cleanup()
raise
#!/usr/bin/env python3
# @file loadDriver.py
#
# @brief
#
# @author Manuel Castilla
# @ingroup
# @date 16/04/2019
#
###########################################################################
from ssh import SshCmd,SshCmdException
import sys
import getpass
error = -1
def loadDriver(hostName,userName,password):
"""
This function load the driver on the PC
Args:
hostName: ip remote host
username: user name
password: password
Return:
True if the driver was loaded correctly, False otherwise
"""
ret = 0
try:
sshCmd = SshCmd()
command = "ifconfig | grep wr | wc -l"
value = sshCmd.exec(hostName,command,userName,password)
if (int(value) == 0):
command = "sudo modprobe -r spec"
sshCmd.exec(hostName,command,userName,password)
command = "sudo modprobe -r wr-nic"
sshCmd.exec(hostName,command,userName,password)
command = "sudo modprobe spec"
sshCmd.exec(hostName,command,userName,password)
command = "sudo insmod /lib/modules/$(uname -r)/extra/wr-nic.ko wrc=1" #esto es temporal
sshCmd.exec(hostName,command,userName,password)
command = "sudo modprobe wr-nic"
sshCmd.exec(hostName,command,userName,password)
command = "ifconfig | grep wr | wc -l"
value = sshCmd.exec(hostName,command,userName,password)
if (int(value) == 0):
ret = error
else:
print("The driver was previously loaded")
except SshCmdException as e:
sshCmd.printSshErrorCommand(e.strError)
ret = error
return ret
#--------------------------------------------------#
#--------------------------------------------------#
# MAIN
#--------------------------------------------------#
#--------------------------------------------------#
if len(sys.argv) < 2:
print("usage: app hostaname")
sys.exit(1)
hostName = sys.argv[1]
userName = input("Please enter username: ")
#password = getpass("Please enter password: ")
password = getpass.getpass()
if (loadDriver(hostName,userName,password) == 0):
print("Load driver: ok")
else:
print("Load driver: error")
\ No newline at end of file
# @file ssh.py
#
# @brief Command ssh
#
# @author Manuel Castilla
# @ingroup
# @date 05/04/2019
#
###########################################################################
import paramiko
import os
import time
import warnings
warnings.filterwarnings(action='ignore',module='.*paramiko.*')
class SshCmdException (ValueError):
def __init__(self, arg):
self.strError = arg
self.args = {arg}
## Class SshCmd
# This class implements the snmpcmd command, used to send an
# SSH command to the target device.
class SshCmd ():
def __init__(self,timeWait=10):
"""
Constructor
Args:
timeWait: max time wait to execute commands
"""
self.timeWaitResponse = timeWait
self.ssh = None
def __del__(self):
"""
Destructor
"""
if (self.ssh != None):
self.ssh.close()
def exec(self,dev,command,user,passw=None,withResponse=True):
"""
Execute a command
Args:
dev: ip or hostname
command: command to execute
user: username
passw : password
withResponse: indicates if the command has a response
Raises:
SshCmdException
Return:
command result
"""
resp = ""
try:
self.ssh = paramiko.SSHClient()
self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.ssh.connect(dev, username=user, password=passw,timeout=5)
except paramiko.AuthenticationException as error:
self.ssh.close()
raise SshCmdException("ssh failed autentication")
except:
self.ssh.close()
raise SshCmdException("ssh not connection")
# Send the command (non-blocking)
stdin, stdout, stderr = self.ssh.exec_command(command,get_pty=True)
stdin.close()
stderr.close()
if (withResponse):
# Wait for the command to terminate
i = 0
while not stdout.channel.exit_status_ready():
if i == self.timeWaitResponse:
stdout.channel.close()
stdout.close()
self.ssh.close()
raise SshCmdException("Not return command")
i = i + 1
time.sleep(1)
# Only print data if there is data to read in the channel
if stdout.channel.recv_exit_status() == 0:
resp = stdout.read().decode('utf-8')
else:
resp = stdout.read().decode('utf-8')
stdout.channel.close()
stdout.close()
self.ssh.close()
raise SshCmdException(resp)
#Close fds
stdout.channel.close()
stdout.close()
self.ssh.close()
return resp
def printSshErrorCommand(self,sshError,command = None):
"""
Print error command
Args:
sshError: error message
command: cammand executed
"""
if command != None:
print("Ssh error:",sshError,"Command:",command)
else:
print("Ssh error:",sshError)
#!/usr/bin/env python3
# @file testAdvDIO.py
#
# @brief
#
# @author Manuel Castilla
# @ingroup
# @date 25/04/2019
#
###########################################################################
import configparser
from dio import DIO
import time
from ssh import SshCmd,SshCmdException
delay = 0.001
maxOffset = 0.000000008
def findTimestamp(listStamp1,listStamp2,offset):
"""
This function compare two list and search if two timestamps match
Args:
listStamp1: first list of timestamps
listStamp2: second list of timestamps
offset: maximum offset between each comparison
Return:
True if it find a match in the two lists, False otherwise
"""
for i in listStamp1:
for j in listStamp2:
value = i - j
if (value <= offset and value >= -offset):
return True
return False
def testAdvDIO(hostName1,userName1,password1,interfaceName1,hostName2,userName2,password2,interfaceName2):
"""
Check the pulses generated on a remote board
Args:
hostName1: ip remote host 1
username1: user name host 1
password1: password
interfaceName1 : interface name
hostName2: ip remote host 2
username2: user name host 2
password2: password
interfaceName2 : interface name
Raises:
error test
"""
try:
sshCmd1 = SshCmd()
sshCmd2 = SshCmd()
dio1 = DIO(hostName1,userName1,password1,interfaceName1)
dio2 = DIO(hostName2,userName2,password2,interfaceName2)
dio1.configurePorts("pdiii")
dio2.configurePorts("iiiii")
dio1.clearTimestamps()
dio2.clearTimestamps()
command = "sudo wr-dio-agent " + interfaceName1
sshCmd1.exec(hostName1,command,userName1,password1,False)
command = "sudo wr-dio-ruler " + interfaceName1 + " IN0 R1+" + str(delay)
sshCmd2.exec(hostName2,command,userName2,password2,False)
totalOffset = delay + maxOffset
time.sleep(2)
if(not findTimestamp(dio1.getTimestamp(0),dio1.getTimestamp(1),totalOffset)):
raise
except SshCmdException as e:
sshCmd.printSshErrorCommand(e.strError,command)
print("Test advanced error")
raise
except:
print("Test advanced error")
raise
#--------------------------------------------------#
#--------------------------------------------------#
# MAIN
#--------------------------------------------------#
#--------------------------------------------------#
def main(configuration):
hostName1 = configuration["hostName1"]
userName1 = configuration["userName1"]
password1 = configuration["password1"]
interfaceName1 = configuration["interfaceName1"]
hostName2 = configuration["hostName2"]
userName2 = configuration["userName2"]
password2 = configuration["password2"]
interfaceName2 = configuration["interfaceName2"]
try:
print("Executing advanced DIO test")
testAdvDIO(hostName1,userName1,password1,interfaceName1,hostName2,userName2,password2,interfaceName2)
print("Test advanced DIO: successful")
except:
print("Test advanced DIO: error")
if __name__ == '__main__':
try:
configuration = {}
nameConfig = "configuration.cfg"
config = configparser.ConfigParser()
config.readfp(open(nameConfig))
configuration["hostName1"] = config.get('config',"hostName1")
configuration["userName1"] = config.get('config',"userName1")
configuration["password1"] = config.get('config',"password1")
configuration["interfaceName1"] = config.get('config',"interfaceName1")
configuration["hostName2"] = config.get('config',"hostName2")
configuration["userName2"] = config.get('config',"userName2")
configuration["password2"] = config.get('config',"password2")
configuration["interfaceName2"] = config.get('config',"interfaceName2")
main(configuration)
except configparser.Error as e:
print("Error load configuration:",e.message)
\ No newline at end of file
#!/usr/bin/env python3
# @file testAll.py
#
# @brief
#
# @author Manuel Castilla
# @ingroup
# @date 29/04/2019
#
###########################################################################
import configparser
tests = ["testNIC","testWR","testDIO","testAdvDIO"]
#--------------------------------------------------#
#--------------------------------------------------#
# MAIN
#--------------------------------------------------#
#--------------------------------------------------#
def main(configuration):
print("Executing all tests")
for i in tests:
test = __import__(i)
test.main(configuration)
print("Test All: completed")
if __name__ == '__main__':
try:
configuration = {}
nameConfig = "configuration.cfg"
config = configparser.ConfigParser()
config.readfp(open(nameConfig))
configuration["hostName1"] = config.get('config',"hostName1")
configuration["userName1"] = config.get('config',"userName1")
configuration["password1"] = config.get('config',"password1")
configuration["busIdSpec1"] = config.get('config',"busIdSpec1")
configuration["ipWR1"] = config.get('config',"ipWR1")
configuration["interfaceName1"] = config.get('config',"interfaceName1")
configuration["hostName2"] = config.get('config',"hostName2")
configuration["userName2"] = config.get('config',"userName2")
configuration["password2"] = config.get('config',"password2")
configuration["busIdSpec2"] = config.get('config',"busIdSpec2")
configuration["ipWR2"] = config.get('config',"ipWR2")
configuration["interfaceName2"] = config.get('config',"interfaceName2")
main(configuration)
except configparser.Error as e:
print("Error load configuration:",e.message)
\ No newline at end of file
#!/usr/bin/env python3
# @file testDIO.py
#
# @brief
#
# @author Manuel Castilla
# @ingroup
# @date 25/04/2019
#
###########################################################################
import configparser
from dio import DIO
import time
maxOffset = 0.000000017 #offset 17 nanoseconds
def findTimestamp(listStamp1,listStamp2,offset):
"""
This function compare two list and search if two timestamps match
Args:
listStamp1: first list of timestamps
listStamp2: second list of timestamps
offset: maximum offset between each comparison
Return:
True if it find a match in the two lists, False otherwise
"""
for i in listStamp1:
for j in listStamp2:
value = i - j
if (value <= offset and value >= -offset):
return True
return False
def tryChannel(hostName1,userName1,password1,interfaceName1,hostName2,userName2,password2,interfaceName2,channel):
"""
Try the input and output of channel
Args:
hostName1: ip remote host 1
username1: user name host 1
password1: password
interfaceName1 : interface name
hostName2: ip remote host 2
username2: user name host 2
password2: password
interfaceName2 : interface name
channel: number of channel to try(0 to 4)
Raises:
error channel
"""
try:
dio1 = DIO(hostName1,userName1,password1,interfaceName1)
dio2 = DIO(hostName2,userName2,password2,interfaceName2)
dio1.configurePorts("pdddd")
dio2.configurePorts("iiiii")
dio1.clearTimestamps()
dio2.clearTimestamps()
if (channel != 0):
dio1.armPulse(channel,0.1,"now")
else:
time.sleep(2) #wait 2 seconds to ensure some timestamp
if(findTimestamp(dio1.getTimestamp(channel),dio2.getTimestamp(channel),maxOffset)):
dio1.configurePorts("iiiii")
dio2.configurePorts("pdddd")
dio1.clearTimestamps()
dio2.clearTimestamps()
if (channel != 0):
dio2.armPulse(channel,0.1,"now")
else:
time.sleep(2) #wait 2 seconds to ensure some timestamp
if(findTimestamp(dio1.getTimestamp(channel),dio2.getTimestamp(channel),maxOffset)):
print("Channel",channel,": ok in host:",hostName1,"and host:",hostName2)
else:
raise
else:
raise
except:
print("Channel",channel,": error in host:",hostName1,"or host:",hostName2)
raise
#--------------------------------------------------#
#--------------------------------------------------#
# MAIN
#--------------------------------------------------#
#--------------------------------------------------#
def main(configuration):
hostName1 = configuration["hostName1"]
userName1 = configuration["userName1"]
password1 = configuration["password1"]
interfaceName1 = configuration["interfaceName1"]
hostName2 = configuration["hostName2"]
userName2 = configuration["userName2"]
password2 = configuration["password2"]
interfaceName2 = configuration["interfaceName2"]
try:
print("Executing DIO test")
tryChannel(hostName1,userName1,password1,interfaceName1,hostName2,userName2,password2,interfaceName2,0)
tryChannel(hostName1,userName1,password1,interfaceName1,hostName2,userName2,password2,interfaceName2,1)
tryChannel(hostName1,userName1,password1,interfaceName1,hostName2,userName2,password2,interfaceName2,2)
tryChannel(hostName1,userName1,password1,interfaceName1,hostName2,userName2,password2,interfaceName2,3)
tryChannel(hostName1,userName1,password1,interfaceName1,hostName2,userName2,password2,interfaceName2,4)
print("Test DIO: successful")
except:
print("Test DIO: error")
if __name__ == '__main__':
try:
configuration = {}
nameConfig = "configuration.cfg"
config = configparser.ConfigParser()
config.readfp(open(nameConfig))
configuration["hostName1"] = config.get('config',"hostName1")
configuration["userName1"] = config.get('config',"userName1")
configuration["password1"] = config.get('config',"password1")
configuration["interfaceName1"] = config.get('config',"interfaceName1")
configuration["hostName2"] = config.get('config',"hostName2")
configuration["userName2"] = config.get('config',"userName2")
configuration["password2"] = config.get('config',"password2")
configuration["interfaceName2"] = config.get('config',"interfaceName2")
main(configuration)
except configparser.Error as e:
print("Error load configuration:",e.message)
\ No newline at end of file
#!/usr/bin/env python3
# @file testNIC.py
#
# @brief
#
# @author Manuel Castilla
# @ingroup
# @date 16/04/2019
#
###########################################################################
from ssh import SshCmd,SshCmdException
from vuart import Vuart
import configparser
def configurationIpInterface(hostName,userName,password,busId,interfaceName,ip):
"""
Set ip of white rabbit interface
Args:
hostName: ip remote host
username: user name
password: password
busId: number PCIe bus
interfaceName : interface name
ip : ip set interface
Raises:
error function
"""
try:
sshCmd = SshCmd()
command = "sudo ifconfig " + interfaceName + " " + ip
sshCmd.exec(hostName,command,userName,password)
command = "ifconfig | grep " + ip + " | wc -l"
value = sshCmd.exec(hostName,command,userName,password)
if (int(value) == 1):
vuart = Vuart(hostName,userName,password,busId)
value = vuart.setIp(ip)
if (("IP-address: " + str(ip)) in value):
print("Configuration ip interface: ok in host:",hostName)
else:
raise
except SshCmdException as e:
sshCmd.printSshErrorCommand(e.strError,command)
print("Configuration ip interface: error in host:",hostName)
raise
except:
print("Configuration ip interface2: error in host:",hostName)
raise
def testIperf(hostName,userName,password,ip,hostName2,userName2,password2):
"""
Test iperf by white rabbit interfaces
Args:
hostName: ip remote host client
username: user name client
password: password
ip : ip server
hostName2: ip remote host server
username2: user name server
password2: password