Commit 8269bb7c authored by Manuel Castilla's avatar Manuel Castilla Committed by Benoit Rat

Merged in manu-190506-tests-rebased (pull request #2)

Manu 190506 tests rebased
parents e6fbc7b4 b512e9de
# .gitignore file
# configuration file #
######################
configuration.cfg
Tests for WR-starting-kit.
This folder contains test files, documentation file and docker deployment files.
The folders are structured as following:
* `doc`: File documentation.
* `dockers`: Files Dockerfile to deploy WR-starting-kit in differents SO and kernerls.
* `validation`: Python files for tests.
\ No newline at end of file
Dokerfiles to deployment test
==========================
To build image
----------
You need to move to UbuntuXX folder and execute:
sudo docker build -t <name image> .
To run image
----------
You need to execute:
sudo docker run -it <name image>
Show images
----------
sudo docker images
Remove image
----------
sudo docker rmi -f <IMAGE ID>
\ No newline at end of file
# spec-sw Dockerfile ubuntu 16.04
#
#
# Pull base image.
FROM ubuntu:16.04
# Install.
RUN \
apt-get update &&\
apt-get upgrade -y &&\
apt-get install -y git \
build-essential \
linux-headers-4.4.0-21-generic \
linux-headers-4.8.0-34-generic \
linux-headers-4.10.0-14-generic \
linux-headers-4.13.0-16-generic \
linux-headers-4.15.0-13-generic
# Clone repository and detach HEAD
RUN \
git clone https://ohwr.org/project/spec-sw.git &&\
cd spec-sw &&\
git checkout 36f70fe4
#set enviroment variable for kernel version 4.4.0-21-generic
ENV LINUX /lib/modules/4.4.0-21-generic/build
#construction spec-sw for kernel version 4.4.0-21-generic
RUN \
cd spec-sw &&\
make &&\
make install
#set enviroment variable for kernel version 4.8.0-34-generic
ENV LINUX /lib/modules/4.8.0-34-generic/build
#construction spec-sw for kernel version 4.8.0-34-generic
RUN \
cd spec-sw &&\
make clean &&\
make &&\
make install
#set enviroment variable for kernel version 4.10.0-14-generic
ENV LINUX /lib/modules/4.10.0-14-generic/build
#construction spec-sw for kernel version 4.10.0-14-generic
RUN \
cd spec-sw &&\
make clean &&\
make &&\
make install
#set enviroment variable for kernel version 4.13.0-16-generic
ENV LINUX /lib/modules/4.13.0-16-generic/build
#construction spec-sw for kernel version 4.13.0-16-generic
RUN \
cd spec-sw &&\
make clean &&\
make &&\
make install
#set enviroment variable for kernel version 4.15.0-13-generic
ENV LINUX /lib/modules/4.15.0-13-generic/build
#construction spec-sw for kernel version 4.15.0-13-generic
RUN \
cd spec-sw &&\
make clean &&\
make &&\
make install
# Define default command.
CMD ["bash"]
#
# spec-sw Dockerfile ubuntu 18.04
#
#
# Pull base image.
FROM ubuntu:18.04
# Install.
RUN \
apt-get update &&\
apt-get upgrade -y &&\
apt-get install -y git \
build-essential \
linux-headers-4.15.0-47-generic \
linux-headers-4.18.0-17-generic
# Clone repository and detach HEAD
RUN \
git clone https://ohwr.org/project/spec-sw.git &&\
cd spec-sw &&\
git checkout 36f70fe4
#set enviroment variable for kernel version 4.15.0-47-generic
ENV LINUX /lib/modules/4.15.0-47-generic/build
#construction spec-sw for kerner version 4.15.0-47-generic
RUN \
cd spec-sw &&\
make &&\
make install
#set enviroment variable for kernel version 4.18.0-17-generic
ENV LINUX /lib/modules/4.18.0-17-generic/build
#construction spec-sw for kernel version 4.18.0-17-generic
RUN \
cd spec-sw &&\
make clean &&\
make &&\
make install
# Define default command.
CMD ["bash"]
The files in this folder are the following:
* `dio.py`: Class to interact with the ports of the board.
* `loadDriver.py`: Load the driver on the PC.
* `sample.cfg`: Sample configuration file of the tests, to execute the tests must be modified the parameters and renamed the file to `configuration.cfg`.
* `ssh.py`: Class used to execute commands by SSH.
* `testAdvDIO.py`: Verifies the correct reception and sending of packages.
* `testAll.py`: Execute all tests.
* `testDIO.py`: Check all ports of the boards.
* `testNIC.py`: Verifies the correct operation of the Network Interface Core.
* `testWR.py`: Check the correct operation of White Rabbit.
* `vuart.py`: Class to execute commands by virtual uart.
# @file dio.py
#
# @brief This class is to interact with the ports of the board
#
# @author Manuel Castilla
# @date 23/04/2019
#
###########################################################################
from ssh import SshCmd,SshCmdException
## Class DIO
#
class DIO ():
def __init__(self,hostName,userName,password,interfaceName):
"""
Construcctor
Args:
hostName: ip or hostname
userName: username
password : password
interfaceName: interface name
"""
self.hostName = hostName
self.userName = userName
self.password = password
self.interfaceName = interfaceName
self.command = "sudo wr-dio-cmd " + interfaceName
self.sshCmd = SshCmd()
def configurePort(self,channel,mode):
"""
Configure the channel
Args:
channel: number 0 to 4
mode: I/i(Input), 0/1(Output, steady state fixed at 0 low or 1 high),D/d(DIO core output),
P/p(Channel 0 Output PPS),C/c(Channel 4 Clock Input to PTP core)
Raises:
error function
"""
try:
confCommand = self.command + " mode " + str(channel) + " " + mode
self.execCommand(confCommand)
except:
print("Error configure port, channel:",channel,"mode:",mode)
raise
def configurePorts(self,modechannels):
"""
Configure all channels
Args:
modechannels: <modech0><modech1><modech2><modech3><modech4>
Raises:
error function
"""
try:
confCommand = self.command + " mode " + modechannels
self.execCommand(confCommand)
except:
print("Error configure ports, mode channels:",modechannels)
raise
def armPulse(self,channel,period,count):
"""
generate a pulse in a channel
Args:
channel: number 0 to 4
period: pulse period
count: string whih number of instances to run
Raises:
error function
"""
try:
armCommand = self.command + " pulse " + str(channel) + " " + str(period) + " " + count
self.execCommand(armCommand)
except:
print("Error arm pulse, channel:",channel,"period:",period,"count:",count)
raise
def getTimestamp(self,channel):
"""
Get timestamp of channel
Args:
channel: number 0 to 4
Raises:
error function
"""
try:
ret = []
getCommand = self.command + " stamp " + str(channel) + " | cut -f2 -d ,"
stamp = self.execCommand(getCommand)
stampSplit = stamp.split()
for i in stampSplit:
ret.append(float(i))
return ret
except:
print("Error get timestamp, channel:",channel)
raise
def clearTimestamps(self):
"""
Clear all timestamps
Raises:
error function
"""
try:
command = self.command + " stamp &> /dev/null"
self.execCommand(command)
except:
print("Error clear timestamps")
raise
def execCommand(self,command):
"""
Execute a command
Args:
command: command to execute
Raises:
error function
"""
try:
return self.sshCmd.exec(self.hostName,command,self.userName,self.password)
except SshCmdException as e:
self.sshCmd.printSshErrorCommand(e.strError) #print error command
cleanup()
raise
#!/usr/bin/env python3
# @file loadDriver.py
#
# @brief Load the driver on the PC
#
# @author Manuel Castilla
# @date 16/04/2019
#
###########################################################################
from ssh import SshCmd,SshCmdException
import sys
import getpass
error = -1
def loadDriver(hostName,userName,password):
"""
This function load the driver on the PC
Args:
hostName: ip remote host
username: user name
password: password
Return:
True if the driver was loaded correctly, False otherwise
"""
ret = 0
try:
sshCmd = SshCmd()
command = "ifconfig | grep wr | wc -l"
value = sshCmd.exec(hostName,command,userName,password)
if (int(value) == 0):
command = "sudo modprobe -r spec"
sshCmd.exec(hostName,command,userName,password)
command = "sudo modprobe -r wr-nic"
sshCmd.exec(hostName,command,userName,password)
command = "sudo modprobe spec"
sshCmd.exec(hostName,command,userName,password)
command = "sudo insmod /lib/modules/$(uname -r)/extra/wr-nic.ko wrc=1" #esto es temporal
sshCmd.exec(hostName,command,userName,password)
command = "sudo modprobe wr-nic"
sshCmd.exec(hostName,command,userName,password)
command = "ifconfig | grep wr | wc -l"
value = sshCmd.exec(hostName,command,userName,password)
if (int(value) == 0):
ret = error
else:
print("The driver was previously loaded")
except SshCmdException as e:
sshCmd.printSshErrorCommand(e.strError)
ret = error
return ret
#--------------------------------------------------#
#--------------------------------------------------#
# MAIN
#--------------------------------------------------#
#--------------------------------------------------#
if len(sys.argv) < 2:
print("usage: app hostaname")
sys.exit(1)
hostName = sys.argv[1]
userName = input("Please enter username: ")
#password = getpass("Please enter password: ")
password = getpass.getpass()
if (loadDriver(hostName,userName,password) == 0):
print("Load driver: ok")
else:
print("Load driver: error")
\ No newline at end of file
[pc1]
hostName = 172.17.5.196
userName = test
password = test
busIdSpec = 0x01
interfaceName = wr0
ipWR = 192.168.2.100
[pc2]
hostName = 172.17.5.238
userName = test
password =test
busIdSpec = 0x01
ipWR = 192.168.2.200
interfaceName = wr0
\ No newline at end of file
# @file ssh.py
#
# @brief This class is used to execute commands by SSH
#
# @author Manuel Castilla
# @date 05/04/2019
#
###########################################################################
import paramiko
import os
import time
import warnings
warnings.filterwarnings(action='ignore',module='.*paramiko.*')
class SshCmdException (ValueError):
def __init__(self, arg):
self.strError = arg
self.args = {arg}
## Class SshCmd
# This class implements the snmpcmd command, used to send an
# SSH command to the target device.
class SshCmd ():
def __init__(self,timeWait=10):
"""
Constructor
Args:
timeWait: max time wait to execute commands
"""
self.timeWaitResponse = timeWait
self.ssh = None
def __del__(self):
"""
Destructor
"""
if (self.ssh != None):
self.ssh.close()
def exec(self,dev,command,user,passw=None,withResponse=True):
"""
Execute a command
Args:
dev: ip or hostname
command: command to execute
user: username
passw : password
withResponse: indicates if the command has a response
Raises:
SshCmdException
Return:
command result
"""
resp = ""
try:
self.ssh = paramiko.SSHClient()
self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.ssh.connect(dev, username=user, password=passw,timeout=5)
except paramiko.AuthenticationException as error:
self.ssh.close()
raise SshCmdException("ssh failed autentication")
except:
self.ssh.close()
raise SshCmdException("ssh not connection")
# Send the command (non-blocking)
stdin, stdout, stderr = self.ssh.exec_command(command,get_pty=True)
stdin.close()
stderr.close()
if (withResponse):
# Wait for the command to terminate
i = 0
while not stdout.channel.exit_status_ready():
if i == self.timeWaitResponse:
stdout.channel.close()
stdout.close()
self.ssh.close()
raise SshCmdException("Not return command")
i = i + 1
time.sleep(1)
# Only print data if there is data to read in the channel
if stdout.channel.recv_exit_status() == 0:
resp = stdout.read().decode('utf-8')
else:
resp = stdout.read().decode('utf-8')
stdout.channel.close()
stdout.close()
self.ssh.close()
raise SshCmdException(resp)
#Close fds
stdout.channel.close()
stdout.close()
self.ssh.close()
return resp
def printSshErrorCommand(self,sshError,command = None):
"""
Print error command
Args:
sshError: error message
command: cammand executed
"""
if command != None:
print("Ssh error:",sshError,"Command:",command)
else:
print("Ssh error:",sshError)
#!/usr/bin/env python3
# @file testAdvDIO.py
#
# @brief Check the correct reception and sending of packages
#
# @author Manuel Castilla
# @date 25/04/2019
#
###########################################################################
import configparser
from dio import DIO
import time
from ssh import SshCmd,SshCmdException
delay = 0.001
maxOffset = 0.000000008
def findTimestamp(listStamp1,listStamp2,offset):
"""
This function compare two list and search if two timestamps match
Args:
listStamp1: first list of timestamps
listStamp2: second list of timestamps
offset: maximum offset between each comparison
Return:
True if it find a match in the two lists, False otherwise
"""
for i in listStamp1:
for j in listStamp2:
value = i - j
if (value <= offset and value >= -offset):
return True
return False
def testAdvDIO(hostName1,userName1,password1,interfaceName1,hostName2,userName2,password2,interfaceName2):
"""
Check the pulses generated on a remote board
Args:
hostName1: ip remote host 1
username1: user name host 1
password1: password
interfaceName1 : interface name
hostName2: ip remote host 2
username2: user name host 2
password2: password
interfaceName2 : interface name
Raises:
error test
"""
try:
sshCmd1 = SshCmd()
sshCmd2 = SshCmd()
dio1 = DIO(hostName1,userName1,password1,interfaceName1)
dio2 = DIO(hostName2,userName2,password2,interfaceName2)
dio1.configurePorts("pdiii")
dio2.configurePorts("iiiii")
dio1.clearTimestamps()
dio2.clearTimestamps()
command = "sudo wr-dio-agent " + interfaceName1
sshCmd1.exec(hostName1,command,userName1,password1,False)
command = "sudo wr-dio-ruler " + interfaceName1 + " IN0 R1+" + str(delay)
sshCmd2.exec(hostName2,command,userName2,password2,False)
totalOffset = delay + maxOffset
time.sleep(2)
if(not findTimestamp(dio1.getTimestamp(0),dio1.getTimestamp(1),totalOffset)):
raise
except SshCmdException as e:
sshCmd.printSshErrorCommand(e.strError,command)
print("Test advanced error")
raise
except:
print("Test advanced error")
raise
#--------------------------------------------------#
#--------------------------------------------------#
# MAIN
#--------------------------------------------------#
#--------------------------------------------------#
def main(configuration):
hostName1 = configuration["hostName1"]
userName1 = configuration["userName1"]
password1 = configuration["password1"]
interfaceName1 = configuration["interfaceName1"]
hostName2 = configuration["hostName2"]
userName2 = configuration["userName2"]
password2 = configuration["password2"]
interfaceName2 = configuration["interfaceName2"]
try:
print("Executing advanced DIO test")
testAdvDIO(hostName1,userName1,password1,interfaceName1,hostName2,userName2,password2,interfaceName2)
print("Test advanced DIO: successful")
except:
print("Test advanced DIO: error")
if __name__ == '__main__':
try:
configuration = {}
nameConfig = "configuration.cfg"
config = configparser.ConfigParser()
config.readfp(open(nameConfig))
configuration["hostName1"] = config.get('pc1',"hostName")
configuration["userName1"] = config.get('pc1',"userName")
configuration["password1"] = config.get('pc1',"password")
configuration["interfaceName1"] = config.get('pc1',"interfaceName")
configuration["hostName2"] = config.get('pc2',"hostName")
configuration["userName2"] = config.get('pc2',"userName")
configuration["password2"] = config.get('pc2',"password")
configuration["interfaceName2"] = config.get('pc2',"interfaceName")
main(configuration)
except configparser.Error as e:
print("Error load configuration:",e.message)
\ No newline at end of file
#!/usr/bin/env python3
# @file testAll.py
#
# @brief Execute all tests
#
# @author Manuel Castilla
# @date 29/04/2019
#
###########################################################################
import configparser
tests = ["testNIC","testWR","testDIO","testAdvDIO"]
#--------------------------------------------------#
#--------------------------------------------------#
# MAIN
#--------------------------------------------------#
#--------------------------------------------------#
def main(configuration):
print("Executing all tests")
for i in tests:
test = __import__(i)
test.main(configuration)
print("Test All: completed")
if __name__ == '__main__':
try:
configuration = {}
nameConfig = "configuration.cfg"
config = configparser.ConfigParser()
config.readfp(open(nameConfig))
configuration["hostName1"] = config.get('pc1',"hostName")
configuration["userName1"] = config.get('pc1',"userName")
configuration["password1"] = config.get('pc1',"password")
configuration["busIdSpec1"] = config.get('pc1',"busIdSpec")
configuration["ipWR1"] = config.get('pc1',"ipWR")
configuration["interfaceName1"] = config.get('pc1',"interfaceName")
configuration["hostName2"] = config.get('pc2',"hostName")
configuration["userName2"] = config.get('pc2',"userName")
configuration["password2"] = config.get('pc2',"password")
configuration["busIdSpec2"] = config.get('pc2',"busIdSpec")
configuration["ipWR2"] = config.get('pc2',"ipWR")
configuration["interfaceName2"] = config.get('pc2',"interfaceName")
main(configuration)
except configparser.Error as e:
print("Error load configuration:",e.message)
\ No newline at end of file
#!/usr/bin/env python3
# @file testDIO.py
#
# @brief Check all ports of the boards
#
# @author Manuel Castilla
# @date 25/04/2019
#
###########################################################################
import configparser
from dio import DIO
import time
maxOffset = 0.000000017 #offset 17 nanoseconds
def findTimestamp(listStamp1,listStamp2,offset):
"""
This function compare two list and search if two timestamps match
Args:
listStamp1: first list of timestamps
listStamp2: second list of timestamps
offset: maximum offset between each comparison
Return:
True if it find a match in the two lists, False otherwise
"""
for i in listStamp1:
for j in listStamp2:
value = i - j
if (value <= offset and value >= -offset):
return True
return False
def tryChannel(hostName1,userName1,password1,interfaceName1,hostName2,userName2,password2,interfaceName2,channel):
"""
Try the input and output of channel
Args:
hostName1: ip remote host 1
username1: user name host 1
password1: password
interfaceName1 : interface name
hostName2: ip remote host 2
username2: user name host 2
password2: password
interfaceName2 : interface name
channel: number of channel to try(0 to 4)
Raises:
error channel
"""
try:
dio1 = DIO(hostName1,userName1,password1,interfaceName1)
dio2 = DIO(hostName2,userName2,password2,interfaceName2)
dio1.configurePorts("pdddd")
dio2.configurePorts("iiiii")
dio1.clearTimestamps()
dio2.clearTimestamps()
if (channel != 0):
dio1.armPulse(channel,0.1,"now")
else:
time.sleep(2) #wait 2 seconds to ensure some timestamp
if(findTimestamp(dio1.getTimestamp(channel),dio2.getTimestamp(channel),maxOffset)):
dio1.configurePorts("iiiii")
dio2.configurePorts("pdddd")
dio1.clearTimestamps()
dio2.clearTimestamps()
if (channel != 0):
dio2.armPulse(channel,0.1,"now")
else:
time.sleep(2) #wait 2 seconds to ensure some timestamp
if(findTimestamp(dio1.getTimestamp(channel),dio2.getTimestamp(channel),maxOffset)):
print("Channel",channel,": ok in host:",hostName1,"and host:",hostName2)
else:
raise
else:
raise
except:
print("Channel",channel,": error in host:",hostName1,"or host:",hostName2)
raise
#--------------------------------------------------#
#--------------------------------------------------#
# MAIN
#--------------------------------------------------#
#--------------------------------------------------#
def main(configuration):
hostName1 = configuration["hostName1"]
userName1 = configuration["userName1"]
password1 = configuration["password1"]
interfaceName1 = configuration["interfaceName1"]
hostName2 = configuration["hostName2"]
userName2 = configuration["userName2"]
password2 = configuration["password2"]
interfaceName2 = configuration["interfaceName2"]
try:
print("Executing DIO test")
tryChannel(hostName1,userName1,password1,interfaceName1,hostName2,userName2,password2,interfaceName2,0)
tryChannel(hostName1,userName1,password1,interfaceName1,hostName2,userName2,password2,interfaceName2,1)
tryChannel(hostName1,userName1,password1,interfaceName1,hostName2,userName2,password2,interfaceName2,2)
tryChannel(hostName1,userName1,password1,interfaceName1,hostName2,userName2,password2,interfaceName2,3)
tryChannel(hostName1,userName1,password1,interfaceName1,hostName2,userName2,password2,interfaceName2,4)
print("Test DIO: successful")
except:
print("Test DIO: error")
if __name__ == '__main__':
try:
configuration = {}
nameConfig = "configuration.cfg"
config = configparser.ConfigParser()
config.readfp(open(nameConfig))
configuration["hostName1"] = config.get('pc1',"hostName")
configuration["userName1"] = config.get('pc1',"userName")
configuration["password1"] = config.get('pc1',"password")
configuration["interfaceName1"] = config.get('pc1',"interfaceName")
configuration["hostName2"] = config.get('pc2',"hostName")
configuration["userName2"] = config.get('pc2',"userName")
configuration["password2"] = config.get('pc2',"password")
configuration["interfaceName2"] = config.get('pc2',"interfaceName")
main(configuration)
except configparser.Error as e:
print("Error load configuration:",e.message)
\ No newline at end of file
#!/usr/bin/env python3
# @file testNIC.py
#
# @brief Check the correct operation of the Network Interface Core
#
# @author Manuel Castilla
# @date 16/04/2019
#
###########################################################################
from ssh import SshCmd,SshCmdException
from vuart import Vuart
import configparser
def configurationIpInterface(hostName,userName,password,busId,interfaceName,ip):
"""
Set ip of white rabbit interface
Args:
hostName: ip remote host
username: user name
password: password
busId: number PCIe bus
interfaceName : interface name
ip : ip set interface
Raises:
error function
"""
try:
sshCmd = SshCmd()
command = "sudo ifconfig " + interfaceName + " " + ip
sshCmd.exec(hostName,command,userName,password)
command = "ifconfig | grep " + ip + " | wc -l"
value = sshCmd.exec(hostName,command,userName,password)
if (int(value) == 1):
vuart = Vuart(hostName,userName,password,busId)
value = vuart.setIp(ip)
if (("IP-address: " + str(ip)) in value):
print("Configuration ip interface: ok in host:",hostName)
else:
raise
except SshCmdException as e:
sshCmd.printSshErrorCommand(e.strError,command)
print("Configuration ip interface: error in host:",hostName)
raise
except:
print("Configuration ip interface2: error in host:",hostName)
raise
def testIperf(hostName,userName,password,ip,hostName2,userName2,password2):
"""
Test iperf by white rabbit interfaces
Args:
hostName: ip remote host client
username: user name client
password: password
ip : ip server
hostName2: ip remote host server
username2: user name server
password2: password
Raises:
error function
"""
try:
sshCmdServer = SshCmd()
sshCmdClient = SshCmd()
command = "iperf -s"
sshCmdServer.exec(hostName2,command,userName2,password2,withResponse=False)
command = "iperf -t 4 -c " + str(ip)
value = sshCmdClient.exec(hostName,command,userName,password)
if ("failed" in value):
print("Test iperf: error")
raise
else:
print(value)
print("Test iperf: ok")
except SshCmdException as e:
print("Test iperf: error")
print(e.strError)
raise
#--------------------------------------------------#
#--------------------------------------------------#
# MAIN
#--------------------------------------------------#
#--------------------------------------------------#
def main(configuration):
hostName1 = configuration["hostName1"]
userName1 = configuration["userName1"]
password1 = configuration["password1"]
busIdSpec1 = configuration["busIdSpec1"]
ipWR1 = configuration["ipWR1"]
interfaceName1 = configuration["interfaceName1"]
hostName2 = configuration["hostName2"]
userName2 = configuration["userName2"]
password2 = configuration["password2"]
busIdSpec2 = configuration["busIdSpec2"]
ipWR2 = configuration["ipWR2"]
interfaceName2 = configuration["interfaceName2"]
try:
print("Executing NIC test")
configurationIpInterface(hostName1,userName1,password1,busIdSpec1,interfaceName1,ipWR1)
configurationIpInterface(hostName2,userName2,password2,busIdSpec2,interfaceName2,ipWR2)
testIperf(hostName1,userName1,password1,ipWR2,hostName2,userName2,password2)
print("Test NIC: successful")
except:
print("Test NIC: error")
if __name__ == '__main__':
try:
configuration = {}
nameConfig = "configuration.cfg"
config = configparser.ConfigParser()
config.readfp(open(nameConfig))
configuration["hostName1"] = config.get('pc1',"hostName")
configuration["userName1"] = config.get('pc1',"userName")
configuration["password1"] = config.get('pc1',"password")
configuration["busIdSpec1"] = config.get('pc1',"busIdSpec")
configuration["ipWR1"] = config.get('pc1',"ipWR")
configuration["interfaceName1"] = config.get('pc1',"interfaceName")
configuration["hostName2"] = config.get('pc2',"hostName")
configuration["userName2"] = config.get('pc2',"userName")
configuration["password2"] = config.get('pc2',"password")
configuration["busIdSpec2"] = config.get('pc2',"busIdSpec")
configuration["ipWR2"] = config.get('pc2',"ipWR")
configuration["interfaceName2"] = config.get('pc2',"interfaceName")
main(configuration)
except configparser.Error as e:
print("Error load configuration:",e.message)
#!/usr/bin/env python3
# @file testWR.py
#
# @brief Check the correct operation of White Rabbit
#
# @author Manuel Castilla
# @date 23/04/2019
#
###########################################################################
from vuart import Vuart
import configparser
import time
def whoIsMaster(hostName,userName,password,busId):
"""
This function asks which spec has SFP purple (master)
Args:
hostName: ip remote host
username: user name
password: password
busId: number PCIe bus
Return:
True if the spec has SFP purple, False otherwise
"""
ret = False
try:
vuart = Vuart(hostName,userName,password,busId)
value = vuart.sfpDetect()
if ("AXGE-3454-0531" in value):
ret = True
except:
print("Who is master: error, host:",hostName)
raise
return ret
def putMode(hostName,userName,password,busId,mode):
"""
This function puts in mode master an spec
Args:
hostName: ip remote host
username: user name
password: password
busId: number PCIe bus
mode: mode ptp status("master","slave")
Raises:
error function
"""
try:
vuart = Vuart(hostName,userName,password,busId)
if (mode == "master"):
vuart.modeMaster()
else:
vuart.modeSlave()
vuart.ptpStart()
value = vuart.mode()
if(mode in value):
print("Put mode", mode,": ok, host:",hostName)
else:
raise
except:
print("Put mode: error, host:",hostName)
raise
def checkTrackPhase(hostName,userName,password,busId):
"""
This function check if the state is TRACK_PHASE
Args:
hostName: ip remote host
username: user name
password: password
busId: number PCIe bus
Raises:
error function
"""
try:
vuart = Vuart(hostName,userName,password,busId)
numberTimesTry = 10
i = 0
while not vuart.isTrackPhase():
if (i == numberTimesTry):
raise
i = i + 1
time.sleep(1)
print("Track Phase: ok")
except:
print("Track phase: error, host:",hostName)
raise
#--------------------------------------------------#
#--------------------------------------------------#
# MAIN
#--------------------------------------------------#
#--------------------------------------------------#
def main(configuration):
hostName1 = configuration["hostName1"]
userName1 = configuration["userName1"]
password1 = configuration["password1"]
busIdSpec1 = configuration["busIdSpec1"]
interfaceName1 = configuration["interfaceName1"]
hostName2 = configuration["hostName2"]
userName2 = configuration["userName2"]
password2 = configuration["password2"]
busIdSpec2 = configuration["busIdSpec2"]
interfaceName2 = configuration["interfaceName2"]
try:
print("Executing WR test")
masterSpec1 = whoIsMaster(hostName1,userName1,password1,busIdSpec1)
masterSpec2 = whoIsMaster(hostName2,userName2,password2,busIdSpec2)
if (masterSpec1 == 1 or masterSpec2 == 1):
if (masterSpec1 == 1):
putMode(hostName2,userName2,password2,busIdSpec2,"slave")
putMode(hostName1,userName1,password1,busIdSpec1,"master")
checkTrackPhase(hostName2,userName2,password2,busIdSpec2)
else:
putMode(hostName1,userName1,password1,busIdSpec1,"slave")
putMode(hostName2,userName2,password2,busIdSpec2,"master")
checkTrackPhase(hostName1,userName1,password1,busIdSpec1)
print("Test WR: successful")
else:
print("Error: It is not possible to know who is master")
raise
except:
print("Test WR: error")
if __name__ == '__main__':
try:
configuration = {}
nameConfig = "configuration.cfg"
config = configparser.ConfigParser()
config.readfp(open(nameConfig))
configuration["hostName1"] = config.get('pc1',"hostName")
configuration["userName1"] = config.get('pc1',"userName")
configuration["password1"] = config.get('pc1',"password")
configuration["busIdSpec1"] = config.get('pc1',"busIdSpec")
configuration["interfaceName1"] = config.get('pc1',"interfaceName")
configuration["hostName2"] = config.get('pc2',"hostName")
configuration["userName2"] = config.get('pc2',"userName")
configuration["password2"] = config.get('pc2',"password")
configuration["busIdSpec2"] = config.get('pc2',"busIdSpec")
configuration["interfaceName2"] = config.get('pc2',"interfaceName")
main(configuration)
except configparser.Error as e:
print("Error load configuration:",e.message)
\ No newline at end of file
# @file vuart.py
#
# @brief This class is to execute commands by virtual uart
#
# @author Manuel Castilla
# @date 23/04/2019
#
###########################################################################
from ssh import SshCmd,SshCmdException
## Class Vuart
#
class Vuart ():
def __init__(self,hostName,userName,password,busId):
"""
Construcctor
Args:
hostName: ip or hostname
userName: username
password : password
busId: bus ID PCIe
"""
self.hostName = hostName
self.userName = userName
self.password = password
self.busId = busId
self.command = "sudo spec-vuart -b " + str(busId) + " -c "
self.sshCmd = SshCmd()
def setIp(self,ip):
"""
Set IP
Args:
ip: IP to assing
Return:
Result set ip
Raises:
error function
"""
try:
command = "ip\ set\ " + str(ip)
return self.vUartCommand(command)
except:
print("Error set ip:",ip)
raise
def modeMaster(self):
"""
Configure the board in master mode
Return:
Result mode master
Raises:
error function
"""
try:
command = "mode\ master"
return self.vUartCommand(command)
except:
print("Error mode master")
raise
def modeSlave(self):
"""
Configure the board in slave mode
Return:
Result mode slave
Raises:
error function
"""
try:
command = "mode\ slave"
return self.vUartCommand(command)
except:
print("Error mode slave")
raise
def sfpDetect(self):
"""
Ask SFP value
Return:
SFP value
Raises:
error function
"""
try:
command = "sfp\ detect"
return self.vUartCommand(command)
except:
print("Error sfp detect")
raise
def ptpStart(self):
"""
Start PTP daemon
Return:
Result
Raises:
error function
"""
try:
command = "ptp\ start"
return self.vUartCommand(command)
except:
print("Error ptp start")
raise
def ptpStop(self):
"""
Stop PTP daemon
Return:
Result
Raises:
error function
"""
try:
command = "ptp\ stop"
return self.vUartCommand(command)
except:
print("Error ptp stop")
raise
def mode(self):
"""
Ask board mode
Return:
Board mode ("master" or "slave")
Raises:
error function
"""
try:
command = "mode"
return self.vUartCommand(command)
except:
print("Error mode")
raise
def isTrackPhase(self):
"""
Ask if the slave board is in TRACK_PHASE
Return:
True if TRACK_PHASE, False otherwise
Raises:
error function
"""
try:
ret = False
command = "stat\ 1"
value = self.vUartCommand(command)
if ("sv:1 ss:'TRACK_PHASE'" in value):
ret = True
return ret
except:
print("Error track phase")
raise
def vUartCommand(self,command):
"""
Execute a command
Args:
command: command to execute
Raises:
error function
"""
try:
uartCommand = self.command + command
ret = self.sshCmd.exec(self.hostName,uartCommand,self.userName,self.password)
if ("Unrecognized command" in ret):
print(ret)
raise
return ret
except SshCmdException as e:
self.sshCmd.printSshErrorCommand(e.strError) #print error command
cleanup()
raise
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment