Commit f261b396 authored by David Cussans's avatar David Cussans

Taking local copy of PyChips

parent 2f031cf3
K 25
svn:wc:ra_dav:version-url
V 32
/svn/!svn/ver/1570/trunk/PyChips
END
VersionHistory.txt
K 25
svn:wc:ra_dav:version-url
V 51
/svn/!svn/ver/1538/trunk/PyChips/VersionHistory.txt
END
K 10
svn:ignore
V 24
.pydevproject
.project
END
10
dir
1570
https://cactus.hepforge.org/svn/trunk/PyChips
https://cactus.hepforge.org/svn
2013-05-09T15:13:09.517614Z
1570
harderk
has-props
18a17b70-165d-4f71-a96d-00e1b61b3a60
test
dir
addressTables
dir
VersionHistory.txt
file
2013-07-29T17:57:11.000000Z
30972e3a69a3a9e90afbb557b906e79e
2013-01-25T13:19:49.053038Z
1538
frazier
has-props
1869
scripts
dir
src
dir
Version 1.5.X
-------------
Released XX December 2011.
Only supports the IPbus Protocol Version 2.0 and above!
* No longer supports IPbus Protocol v1.2/1.3. Please use the latest version
in the Version 1.4.X series if you are still using the older protocol.
New Features:
* Added support for IPbus v2.0 headers
Version 1.4.2
-------------
Released 19th December 2011.
Supports the IPbus Protocol Version 1.2/1.3
Minor changes / bug-fixes:
* Dummy hardware improvements
- A unified dummy hardware startup script that allows you to select the
various options (UDP/TCP mode, the port number, verbosity) via command-
line arguments.
- The TCP Dummy Hardware no-longer shuts down after the first TCP connection
closes - it now listens for a new connection whenever it's lacking an
active connection.
Version 1.4.1
-------------
Released 19th July 2011.
Supports the IPbus Protocol Version 1.2/1.3
Bug-fixes:
* Dummy hardware now correctly handles the RMWsum and getReservedAddrInfo
transactions.
Version 1.4.0
-------------
Released 23rd June 2011.
Supports the IPbus Protocol Version 1.2/1.3
New Features:
* Significant speed improvements
- Three times greater bandwidth on large block reads/writes
- Approx 30% faster on general/mixed small packet transactions
* Dummy Hardware improvements
- Automatic byte-reordering
- Handles malformed packets more elegantly than before
Bug fixes:
* In certain circumstances, a byte-order header was not automatically
being added to the beginning of the IPbus packet.
Version 1.3.0
-------------
Released 31st May 2011.
Supports the IPbus Protocol Version 1.2/1.3
Pre-version 1.3.0
-----------------
Older version history is not available...
\ No newline at end of file
K 25
svn:wc:ra_dav:version-url
V 45
/svn/!svn/ver/933/trunk/PyChips/addressTables
END
simpleTestAddrTable.txt
K 25
svn:wc:ra_dav:version-url
V 69
/svn/!svn/ver/231/trunk/PyChips/addressTables/simpleTestAddrTable.txt
END
soakTestAddrTable.txt
K 25
svn:wc:ra_dav:version-url
V 67
/svn/!svn/ver/933/trunk/PyChips/addressTables/soakTestAddrTable.txt
END
10
dir
1570
https://cactus.hepforge.org/svn/trunk/PyChips/addressTables
https://cactus.hepforge.org/svn
2011-12-22T16:31:27.899680Z
933
frazier
18a17b70-165d-4f71-a96d-00e1b61b3a60
simpleTestAddrTable.txt
file
2013-07-29T17:57:09.000000Z
67ed7fad27ea918cbff2b3c09f31989f
2011-03-10T19:07:38.397796Z
231
frazier
has-props
2124
soakTestAddrTable.txt
file
2013-07-29T17:57:09.000000Z
50540f679a643bcbc33c4447c9f31a14
2011-12-22T16:31:27.899680Z
933
frazier
has-props
564
*RegName RegAddr RegMask R W
*----------------------------------------------------------------------------------
SysId 0x00001000 0xffffffff 1 0
BoardId 0x00001004 0xffffffff 1 0
SlaveId 0x00001008 0xffffffff 1 0
FirmwareId 0x0000100C 0xffffffff 1 0
Test 0x00001010 0xffffffff 1 1
TestRam 0x08000000 0xffffffff 1 1 Should be at least 4K deep.
i2c_reset 0x00004000 0xffffffff 0 1
i2c_data 0x00004004 0xffffffff 0 1
i2c_trigger 0x00004008 0xffffffff 0 1
i2c_status 0x0000400c 0xffffffff 1 0
*----------------------------------------------------------------------------------
* The registers are for the purposes of unit-testing the code, and do not exist
* on the mini-T itself
UnitTest1 0xffff0000 0xffffffff 1 1
UnitTest1_mask1 0xffff0000 0xff000000 1 1
UnitTest1_mask2 0xffff0000 0x00ff0000 1 1
UnitTest1_mask3 0xffff0000 0x0000fffe 1 1
UnitTest1_mask4 0xffff0000 0x00000001 1 1
UnitTest2 0xffff0001 0xffffffff 1 1
UnitTest2_mask1 0xffff0001 0xff000000 1 1
UnitTest2_mask2 0xffff0001 0x00ff0000 1 1
UnitTest2_mask3 0xffff0001 0x0000fffe 1 1
UnitTest2_mask4 0xffff0001 0x00000001 1 1
UnitTest3 0xffff0002 0xffffffff 1 1
UnitTest4 0xffff0003 0xffffffff 1 1
UnitTest5 0xffff0004 0xffffffff 1 1
UnitTest6 0xffff0005 0xffffffff 1 1
UnitTest7 0xffff0006 0xffffffff 1 1
UnitTest8 0xffff0007 0xffffffff 1 1
UnitTest_ResetDummyHW 0xffffffff 0xffffffff 1 1 Reading from this address resets the dummy hardware.
*----------------------------------------------------------------------------------
\ No newline at end of file
*RegName RegAddr RegMask R W
*----------------------------------------------------------------------------------
Test_readOnly 0x00000000 0xffffffff 1 1 # Put write-flag to 1, in order to test read-only-ness!
Test 0x00000001 0xffffffff 1 1
TestRam 0x00000010 0xffffffff 1 1
TestRam_end 0x0000001f 0xffffffff 1 1
BigTestRam 0x00001000 0xffffffff 1 1
BigTestRam_end 0x00001fff 0xffffffff 1 1
K 25
svn:wc:ra_dav:version-url
V 39
/svn/!svn/ver/934/trunk/PyChips/scripts
END
example1.py
K 25
svn:wc:ra_dav:version-url
V 51
/svn/!svn/ver/499/trunk/PyChips/scripts/example1.py
END
soakTest.py
K 25
svn:wc:ra_dav:version-url
V 51
/svn/!svn/ver/934/trunk/PyChips/scripts/soakTest.py
END
bandwidthRxTest.py
K 25
svn:wc:ra_dav:version-url
V 58
/svn/!svn/ver/499/trunk/PyChips/scripts/bandwidthRxTest.py
END
configureJumboMac.py
K 25
svn:wc:ra_dav:version-url
V 60
/svn/!svn/ver/863/trunk/PyChips/scripts/configureJumboMac.py
END
10
dir
1570
https://cactus.hepforge.org/svn/trunk/PyChips/scripts
https://cactus.hepforge.org/svn
2011-12-22T16:42:29.517135Z
934
frazier
18a17b70-165d-4f71-a96d-00e1b61b3a60
example1.py
file
2013-07-29T17:57:10.000000Z
90ebe4b8a9f33d5d7e11431d91cdf4f1
2011-06-21T22:29:49.331383Z
499
frazier
has-props
1878
soakTest.py
file
2013-07-29T17:57:10.000000Z
10931df3fa75c627980d8aed235af372
2011-12-22T16:42:29.517135Z
934
frazier
has-props
6865
bandwidthRxTest.py
file
2013-07-29T17:57:10.000000Z
2155affb31538e0fffd2ca21850e2da9
2011-06-21T22:29:49.331383Z
499
frazier
has-props
1542
configureJumboMac.py
file
2013-07-29T17:57:10.000000Z
e4476a2f4ee79e1e131e74da9e4ed9d8
2011-11-01T16:40:33.255597Z
863
frazier
has-props
1223
from PyChipsUser import *
from datetime import datetime
from os import environ
import math
#chipsLog.setLevel(logging.DEBUG)
#######################
### TEST PARAMETERS ###
hostIP = "localhost"
hostPort = 50001
hostRamName = "BigTestRam"
testDepth = 350
testIterations = 10000
#######################
addrTable = AddressTable("../addressTables/davesFirmwareSoakTestAddrTable.txt")
testBoard = ChipsBusUdp(addrTable, hostIP, hostPort)
# Fill up the host's RAM with something, just so we can see its working correctly
# when in debug mode
writeBuf = []
for iVal in range(testDepth):
writeBuf.append(iVal)
testBoard.blockWrite(hostRamName, writeBuf)
print "Read-bandwidth test is running..."
# Start the clock
startTime = datetime.now()
# Run the test
for iRead in range(testIterations):
testBoard.blockRead(hostRamName, testDepth)
# Stop the clock
stopTime = datetime.now()
# Calculate the total IPbus payload actually transferred in kilobytes, excluding all headers, etc
totalPayloadKB = testIterations * testDepth / 256.
# Calculate the total transfer time in seconds
totalTime = stopTime-startTime
totalSeconds = (totalTime.days*86400) + totalTime.seconds + (totalTime.microseconds/1000000.)
dataRateKB_s = totalPayloadKB/totalSeconds
print "\nRead Bandwidth Results:"
print "-----------------------\n"
print "Total IPbus payload transferred = %.2f KB" % totalPayloadKB
print "Total time taken = %.2f s" % totalSeconds
print "Average read bandwidth = %.2f KB/s" % dataRateKB_s
# Configures the Ethernet MAC to use jumbo frames, when using Dave Newbold's default firmware.
#
# Robert Frazier, Oct 2011
from PyChipsUser import *
#chipsLog.setLevel(logging.DEBUG) # Uncomment for debug output!
# ******************************************************
# **** Set IP and port number of board in question ****
boardIpAddr = "192.168.200.32" # TODO: make this a command line parameter...
boardPortNum = 50001
# ******************************************************
if __name__ == '__main__':
addrTable = AddressTable("../addressTables/davesFirmwareSoakTestAddrTable.txt")
board = ChipsBusUdp(addrTable, boardIpAddr, boardPortNum)
board.write("MacHostBusPtr", 0x240)
result = board.read("MacHostBusReg")
print "Receiver register is set to:", hex(result)
print "Writing back previous value with bit 30 set high..."
board.write("MacHostBusReg", (result | 0x40000000))
print "...done."
board.write("MacHostBusPtr", 0x280)
result = board.read("MacHostBusReg")
print "Transmitter register is set to:", hex(result)
print "Writing back previous value with bit 30 set high..."
board.write("MacHostBusReg", (result | 0x40000000))
print "...done."
# Import the PyChips code - PYTHONPATH must be set to the PyChips installation src folder!
from PyChipsUser import *
##################################################################################################
### Uncomment one of the following two lines to turn on verbose or very-verbose debug modes. ###
### These debug modes allow you to see the packets being sent and received. ###
##################################################################################################
#chipsLog.setLevel(logging.DEBUG) # Verbose logging (see packets being sent and received)
# Read in an address table by creating an AddressTable object (Note the forward slashes, not backslashes!)
addrTable = AddressTable("../addressTables/davesFirmwareSoakTestAddrTable.txt")
# Create a ChipsBus bus to talk to your board.
# These require an address table object, an IP address and a port number
myBoard = ChipsBusUdp(addrTable, "localhost", 50001) # Change "localhost" to an IP address like "192.168.10.1", etc
# Perform some basic single-register reads and writes.
# Note that single-register reads and writes can be done on registers with a mask.
# See: help(ChipsBusUdp.read) and help(ChipsBusUdp.write) for more info on the below.
myBoard.write("Test", 0xdeadbeef)
print "'Test' register value is:", hex(myBoard.read("Test"))
myBoard.write("Test", 0xcafebabe)
print "'Test' register value is now:", hex(myBoard.read("Test"))
# Perform some block reads/writes:
# See: help(ChipsBusUdp.blockWrite) and help(ChipsBusUdp.blockRead) for more info
myBoard.blockWrite("BigTestRam", [0xdeadbeef, 0xcafebabe, 0x0ddba115, 0xbeefcafe])
blockReadResult = myBoard.blockRead("BigTestRam", 4) # 4 is the read depth.
print "\nBlock read result is:", uInt32HexListStr(blockReadResult)
# For further details on the basic API, please see: help(ChipsBusUdp)
#! /usr/bin/env python
#--------------------------------------------------------------
# Soak-tester for Dave Newbold's default/example IPbus firmware
#
# Robert Frazier, March 2011
#--------------------------------------------------------------
# Python imports
import sys
from optparse import OptionParser
from random import random
from time import asctime
import os
def getRandU32Value():
"""Returns a random unsigned 32-bit value"""
return int(random()*0xffffffff)
def getRandU32List(listSize):
"""Returns a list of length listSize of random U32 values"""
result = []
for i in range(listSize):
result.append(getRandU32Value())
return result
def screenAndLog(logFile, msg):
"""Prints a message to both the screen and to a logFile handle"""
print msg
logFile.write(msg + "\n")
logFile.flush()
os.fsync(logFile)
def findAddrTable():
"""A hacky function that attempts to find and return the soakTestAddrTable.txt file-path.
Returns False if it can't find it.
"""
for item in sys.path:
if item.find("PyChips") >= 0 or item.find("pychips") >= 0:
if item.find("src") >= 0:
file = item + "/../addressTables/soakTestAddrTable.txt"
if os.path.exists(file) and os.path.isfile(file):
return file
return False
if __name__ == '__main__':
# Option parser stuff
parser = OptionParser(usage = "%prog target_ip_address [options]\n\n"
"IPbus Soak Tester\n"
"-----------------")
parser.add_option("-i", "--iterations", action="store", type="int", dest="maxIterations", default=10000,
help="Number of test iterations (default = %default) to perform; zero or less indicates the test should run ~indefinitely.")
parser.add_option("-p", "--port", action="store", type="int", dest="port", default=50001,
help="Port number of the target device you wish to soak-test (default = %default)")
parser.add_option("-f", "--addrTableFile", action="store", type="string", dest="addrTableFile",
help="Manually specify the complete filepath to soakTestAddrTable.txt if it can't automatically be found")
parser.add_option("-v", "--verbose", action="store_true", dest="verbose", default=False,
help="turn on verbose mode: prints outgoing/incoming packets to the console")
(options, args) = parser.parse_args()
# Find out if they have or haven't supplied an IP address
if len(args) != 1:
parser.error("\nYou must supply a target IP address when running this script!\n" \
"For example:\n\t./soakTest.py 192.168.200.16\n\t./soakTest.py localhost\n" \
"For more help and options, do:\n\t./soakTest.py -h")
ipAddr = args[0]
# Open up a log file.
logName = '/tmp/soakTestLog_' + ipAddr + '.txt'
log = open(logName, 'w')
screenAndLog(log, "\nSoak-Test Session Started at " + asctime() + \
"\n-----------------------------------------------------\n")
# Either get the address table filename from the command line option,
# or try to automatically find it
if options.addrTableFile != None:
addrTableFile = options.addrTableFile
else:
addrTableFile = findAddrTable()
if addrTableFile:
screenAndLog(log, "Using the following automatically located address-table file:\n" + addrTableFile + "\n")
else:
screenAndLog(log, "Failed to locate 'soakTestAddrTable.txt' within your PyChips installation.\n" \
"Please specify the complete path to this file using the -f option next\n" \
"time you run this soak test script.")
sys.exit()
# Now import PyChips
from PyChipsUser import *
# Set the logger up according to verbosity flag
if options.verbose: chipsLog.setLevel(logging.DEBUG)
else: chipsLog.setLevel(logging.INFO)
# Create the UDP IPbus to the target device being soak-tested
board = ChipsBusUdp(AddressTable(addrTableFile), ipAddr, options.port)
# If the user specified zero or less iterations, this means run ~indefinitely
if options.maxIterations < 1: options.maxIterations = 1000000000000
# Counters
testIterationCount = 0
writeReadErrors = 0
blockWriteReadErrors = 0
exceptionCount = 0
loop = True
print "\nLog output also at:", logName
print "Press ctrl-c to terminate test."
print "Running...\n"
while testIterationCount < options.maxIterations:
try:
testIterationCount += 1
if(testIterationCount % 1000 == 0):
screenAndLog(log, "Iteration " + repr(testIterationCount))
testVal = getRandU32Value()
board.write("Test", testVal)
readResult = board.read("Test")
if readResult != testVal:
screenAndLog(log, "Write/Read error occurred! Expected " + hex(testVal) + " but got " + hex(readResult))
writeReadErrors += 1
blockTestVal = getRandU32List(16)
board.blockWrite("BigTestRam", blockTestVal)
blockReadResult = board.blockRead("BigTestRam", 16)
if blockTestVal != blockReadResult:
screenAndLog(log, "Block Write/Read error occurred!" \
"\n\tExpected Received" \
"\n\t-------- --------" \
+ uInt32HexDualListStr(blockTestVal, blockReadResult))
blockWriteReadErrors += 1
if writeReadErrors >= 30 or blockWriteReadErrors >= 30:
screenAndLog(log, "\nToo many errors have occurred - ending test program!")
break
except KeyboardInterrupt:
screenAndLog(log, "\nKeyboard interrupt (ctrl-c) received - ending test program!")
break
except ChipsException, what:
screenAndLog(log, "An exception occurred at " + asctime() + ":\n\t" + str(what))
exceptionCount += 1
if exceptionCount >= 30:
screenAndLog(log, "\nToo many exceptions have occurred - ending test program!")
break
screenAndLog(log, "\n\n-----------------\nTest Summary:\n")
screenAndLog(log, "\tTotal Test Iterations = " + repr(testIterationCount))
screenAndLog(log, "\tTotal Write/Read Errors = " + repr(writeReadErrors))
screenAndLog(log, "\tTotal Block Write/Read Errors = " + repr(blockWriteReadErrors))
screenAndLog(log, "\tTotal ChipsBus Exceptions during test = " + repr(exceptionCount))
K 25
svn:wc:ra_dav:version-url
V 36
/svn/!svn/ver/1570/trunk/PyChips/src
END
ChipsLog.py
K 25
svn:wc:ra_dav:version-url
V 47
/svn/!svn/ver/472/trunk/PyChips/src/ChipsLog.py
END
CommonTools.py
K 25
svn:wc:ra_dav:version-url
V 51
/svn/!svn/ver/1546/trunk/PyChips/src/CommonTools.py
END
DummyHardware.py
K 25
svn:wc:ra_dav:version-url
V 53
/svn/!svn/ver/1537/trunk/PyChips/src/DummyHardware.py
END
IPbusHeader.py
K 25
svn:wc:ra_dav:version-url
V 51
/svn/!svn/ver/1537/trunk/PyChips/src/IPbusHeader.py
END
AddressTableItem.py
K 25
svn:wc:ra_dav:version-url
V 55
/svn/!svn/ver/472/trunk/PyChips/src/AddressTableItem.py
END
AddressTable.py
K 25
svn:wc:ra_dav:version-url
V 51
/svn/!svn/ver/506/trunk/PyChips/src/AddressTable.py
END
PyChipsUser.py
K 25
svn:wc:ra_dav:version-url
V 50
/svn/!svn/ver/924/trunk/PyChips/src/PyChipsUser.py
END
TransactionElement.py
K 25
svn:wc:ra_dav:version-url
V 58
/svn/!svn/ver/1251/trunk/PyChips/src/TransactionElement.py
END
ChipsBus.py
K 25
svn:wc:ra_dav:version-url
V 48
/svn/!svn/ver/1547/trunk/PyChips/src/ChipsBus.py
END
ChipsException.py
K 25
svn:wc:ra_dav:version-url
V 53
/svn/!svn/ver/472/trunk/PyChips/src/ChipsException.py
END
PyChipsUserDebug.py
K 25
svn:wc:ra_dav:version-url
V 55
/svn/!svn/ver/924/trunk/PyChips/src/PyChipsUserDebug.py
END
SerDes.py
K 25
svn:wc:ra_dav:version-url
V 46
/svn/!svn/ver/1570/trunk/PyChips/src/SerDes.py
END
Transaction.py
K 25
svn:wc:ra_dav:version-url
V 50
/svn/!svn/ver/956/trunk/PyChips/src/Transaction.py
END
10
dir
1570
https://cactus.hepforge.org/svn/trunk/PyChips/src
https://cactus.hepforge.org/svn
2013-05-09T15:13:09.517614Z
1570
harderk
18a17b70-165d-4f71-a96d-00e1b61b3a60
ChipsLog.py
file
2013-07-29T17:57:10.000000Z
ede91262d18eff2163e0aa38ee67090e
2011-06-01T08:34:46.344756Z
472
frazier
has-props
604
CommonTools.py
file
2013-07-29T17:57:10.000000Z
058956a3079ba9c4934fea5ff8c67fe1
2013-01-25T13:28:04.724988Z
1546
frazier
has-props
3277
DummyHardware.py
file
2013-07-29T17:57:10.000000Z
4056699a267b577487ae0052719e3403
2013-01-25T13:18:23.396423Z
1537
frazier
has-props
16649
IPbusHeader.py
file
2013-07-29T17:57:10.000000Z
8701b1435a9bd15d76ba2dac99efd49f
2013-01-25T13:18:23.396423Z
1537
frazier
has-props
7591
AddressTableItem.py
file
2013-07-29T17:57:11.000000Z
4bc1de12e50b911aebbc19ad77b35a0b
2011-06-01T08:34:46.344756Z
472
frazier
has-props
2325
AddressTable.py
file
2013-07-29T17:57:11.000000Z
7a27d2f02b0d2e7dc76041791f7279e7
2011-06-22T23:34:24.913862Z
506
frazier
has-props
5223
PyChipsUser.py
file
2013-07-29T17:57:11.000000Z
e567655aadba09930a7b82a6686b0bd9
2011-12-15T17:28:13.689110Z
924
frazier
has-props
339
TransactionElement.py
file
2013-07-29T17:57:11.000000Z
7ff8e1764f8c4126d2fe1f3961648d06
2012-10-08T13:03:50.396807Z
1251
frazier
has-props
2967
ChipsBus.py
file
2013-07-29T17:57:11.000000Z
92bc1ec27da979b5cb9383bee756bdc5
2013-01-25T13:33:45.505934Z
1547
frazier
has-props
29500
ChipsException.py
file
2013-07-29T17:57:11.000000Z
db7c943377f223a828ed9c8ee6913af9
2011-06-01T08:34:46.344756Z
472
frazier
has-props
274
PyChipsUserDebug.py
file
2013-07-29T17:57:11.000000Z
f7e0b28c31952de7f267c3304f5ff1dd
2011-12-15T17:28:13.689110Z
924
frazier
has-props
347
SerDes.py
file
2013-07-29T17:57:11.000000Z
b8f9681933e99d69f463405a1841bfe3
2013-05-09T15:13:09.517614Z
1570
harderk
has-props
4648
Transaction.py
file
2013-07-29T17:57:11.000000Z
4ec291995a370a9d31da3c263172dce0
2012-02-29T10:53:12.622662Z
956
frazier
has-props
6597
'''
@author: Robert Frazier
May 2010
'''
# Project imports
from AddressTableItem import AddressTableItem
from CommonTools import uInt32Compatible
from ChipsException import ChipsException
class AddressTable(object):
'''Address table class to hold address table items and read in address tables from file
The format is very simple. Each line should contain a register name,
a 32-bit (hex) register address, and a 32-bit (hex) register mask, in
that order. Any further words/values on that line are then ignored.
Empty lines or lines starting with a "*" character are also ignored.
Note that the IPbus protocol, and hence PyChips assumes 32-bit word
addressing, allowing up to 2^34 bytes to be addressed. I.e. each
address points to a 32-bit word.
An example is as follows:
* RegName RegAddr RegMask R W Description
*---------------------------------------------------------------------------------------------
someRegister 00000001 ffffffff 1 0 Any text you like here
anotherRegister_low 0000000f 0000ffff 1 1 Bottom 16 bits for one purpose
anotherRegister_high 0000000f ffff0000 1 1 Top 16 bits for another purpose.
*---------------------------------------------------------------------------------------------
'''
def __init__(self, addressTableFile):
object.__init__(self)
self.items = {}
self.fileName = addressTableFile
self._readAddrTable(addressTableFile)
def getItem(self, registerName):
"""Returns the AddressTableItem object that corresponds to a particular register name."""
if self.checkItem(registerName):
return self.items[registerName]
else:
raise ChipsException("Register '" + registerName + "' does not exist in the address table file '" + self.fileName + "'!")
def checkItem(self, registerName):
"""Returns True if registerName is present in the address table; returns False if it's not."""
if registerName in self.items:
return True
return False
def _readAddrTable(self, addressTableFile):
'''Read in an address table from the specified file'''
file = open(addressTableFile, 'r')
line = file.readline() # Get the first line
lineNum = 1
while len(line) != 0: # i.e. not the end of the file
words = line.split() # Split up the line into words by its whitespace
if len(words) != 0: # A blank line (or a line with just whitespace).
if line[0] != '*': # Not a commented line
if len(words) < 5:
raise ChipsException("Line " + str(lineNum) + " of file '" + addressTableFile +
"' does not conform to file format expectations!")
try:
regName = words[0]
regAddr = int(words[1], 16)
regMask = int(words[2], 16)
regRead = int(words[3])
regWrite= int(words[4])
except Exception, err:
raise ChipsException("Line " + str(lineNum) + " of file '" + addressTableFile +
"' does not conform to file format expectations! (Detail: " + str(err))
if regName in self.items:
raise ChipsException("Register '" + regName + "' is included in the file '"
+ addressTableFile + "' more than once!")
if not(uInt32Compatible(regAddr) and uInt32Compatible(regMask)):
raise ChipsException("Register address or mask on line " + str(lineNum) +
" of file '" + addressTableFile + "' is not " \
"a valid 32-bit unsigned number!")
if regMask == 0: raise ChipsException("Register mask on line " + str(lineNum) +
" of file '" + addressTableFile +
"' is zero! This is not allowed!")
if regRead != 0 and regRead != 1:raise ChipsException("Read flag on line " +
str(lineNum) + " of file '" +
addressTableFile + "' is not one or zero!")
if regWrite != 0 and regWrite != 1: raise ChipsException("Write flag on line " +
str(lineNum) + " of file '" +
addressTableFile + "' is not one or zero!")
item = AddressTableItem(regName, regAddr, regMask, regRead, regWrite)
self.items[regName] = item
line = file.readline() # Get the next line and start again.
lineNum += 1
'''
@author: Robert Frazier
May 2010
'''
from CommonTools import *
from ChipsException import ChipsException
class AddressTableItem(object):
'''Represents an address table item for A32/D32 address tables
Note: Addresses, data or masks beyond 32 bits in size will be truncated
without warning (to the least significant 32-bits of input). It
is up to the user of this class to ensure stupid values are not
entered.
'''
def __init__(self, name, address, mask, read = True, write = True):
object.__init__(self)
self._name = name
self._address = 0xffffffff & address
self._mask = 0xffffffff & mask
self._read = bool(read)
self._write= bool(write)
self._bitShiftForMaskedData = self._maskedDataBitShift()
def getName(self): return self._name
def getAddress(self): return self._address
def getMask(self): return self._mask
def getReadFlag(self): return self._read
def getWriteFlag(self): return self._write
def setName(self, name): self._name = name
def setAddress(self, address): self._address = 0xffffffff & address
def setMask(self, mask): self._mask = (0xffffffff & mask); self._bitShiftForMaskedData = self._maskedDataBitShift()
def setReadFlag(self, read): self._read = bool(read)
def setWriteFlag(self, write): self._write = bool(write)
def shiftDataToMask(self, data):
"""Shifts data upto the start of the register mask
Note: data over 32-bits in size will be truncated without warning.
"""
shiftedData = (data & 0xffffffff) << self._bitShiftForMaskedData
if (~self._mask & shiftedData) != 0:
raise ChipsException("The data (0x" + uInt32HexStr(data) + ") being written to register '" +
self._name + "' exceeds bounds of mask!")
return shiftedData
def shiftDataFromMask(self, data):
"""Shifts data down from the start register mask"""
return (data & self._mask) >> self._bitShiftForMaskedData
def _maskedDataBitShift(self):
shiftingMask = self._mask
bitShiftRequired = 0
while (shiftingMask & 0x1) == 0:
shiftingMask >>= 1
bitShiftRequired += 1
return bitShiftRequired
'''
Created on May 12, 2010
@author: Robert Frazier, Carl Jeske
'''
# System imports
import socket
# Project imports
import IPbusHeader
from TransactionElement import TransactionElement
from Transaction import Transaction
from CommonTools import uInt32HexStr, uInt32Compatible, uInt32BitFlip
from ChipsLog import chipsLog
from ChipsException import ChipsException
class ChipsBusBase(object):
"""Common Hardware Interface Protocol System Bus (CHIPS-Bus) abstract base-class
Allows you to communicate with and control devices running Jeremy Mans's, et al, IP-based
uTCA control system firmware. This base class represents the part of the ChipsBus code
that is protocol-agnostic. Protocol-specific concrete classes, using either UDP or TCP,
derive from this.
The bus assumes 32-bit word addressing, so in a 32-bit address space up to 2^34 bytes in
total can be addressed.
"""
IPBUS_PROTOCOL_VER = 2 # I.e. IPbus Protocol v1.4
SOCKET_BUFFER_SIZE = 32768 # Max UDP/TCP socket buffer size in bytes for receiving packets.
MAX_TRANSACTION_ID = 4095 # The maximum value the transaction ID field can go up to.
# Max depth of a block read or write before bridging the read/write over multiple requests.
# Note that for UDP the max IPBus packet size cannot exceed 368 32-bit words (1472 bytes), or
# it'll fail due to reaching the max Ethernet packet payload size (without using Jumbo Frames).
# If you are Jumbo-Frames capable, then this number should not exceed 2000. Note that the
# jumbo-frames firmware uses a 8192-byte buffer, so we can't make use of the full 9000 byte
# Jumbo Frame anyway.
MAX_BLOCK_TRANSFER_DEPTH = 255 # Temporary hack to get IPbus v2.0 compatible code working
# The max size of the request queue (note: current API excludes ability to queue block transfer requests)
MAX_QUEUED_REQUESTS = 80
def __init__(self, addrTable, hostIp, hostPort, localPort = None):
"""ChipsBus abstract base-class constructor
addrTable: An instance of AddressTable for the device you wish to communicate with.
hostIP: The IP address of the device you want to control, e.g. the string '192.168.1.100'.
hostPort: The network port number of the device you want to control.
localPort: If you wish to bind the socket to a particular local port, then specify the
the local port number here. The default (None) means that the socket will not bind
to any specific local port - an available port be found when it comes to sending any
packets.
"""
object.__init__(self)
self._transactionId = 1
self.addrTable = addrTable
self._hostAddr = (hostIp, hostPort)
self._queuedRequests = [] # Request queue
self._queuedAddrTableItems = [] # The corresponding address table item for each request in the request queue
self._queuedIsARead = [] # This holds a True if the corresponding request in _queuedRequests is a read, or a False if it's a write.
def queueRead(self, name, addrOffset = 0):
"""Create a read transaction element and add it to the transaction queue.
This works in the same way as a normal read(), except that many can be queued
into a packet and dispatched all at once rather than individually. Run the
queued transactions with queueRun().
Only single-register reads/writes can be queued. Block reads/writes, etc, cannot
be queued.
"""
if len(self._queuedRequests) < ChipsBus.MAX_QUEUED_REQUESTS:
chipsLog.debug("Read queued: register '" + name + "' with addrOffset = 0x" + uInt32HexStr(addrOffset))
addrTableItem = self.addrTable.getItem(name) # Get the details of the relevant item from the addr table.
if not addrTableItem.getReadFlag():
raise ChipsException("Read transaction creation error: read is not allowed on register '" + addrTableItem.getName() + "'.")
self._queuedRequests.append(self._createReadTransactionElement(addrTableItem, 1, addrOffset))
self._queuedAddrTableItems.append(addrTableItem)
self._queuedIsARead.append(True)
else:
chipsLog.warning("Warning: transaction not added to queue as transaction queue has reached its maximum length!\n" +
"\tPlease either run or clear the transaction queue before continuing.\n")
def queueWrite(self, name, dataU32, addrOffset = 0):
"""Create a register write (RMW-bits) transaction element and add it to the transaction queue.
This works in the same way as a normal write(), except that many can be queued
into a packet and dispatched all at once rather than individually. Run the
queued transactions with queueRun().
Only single-register reads/writes can be queued. Block reads/writes, etc, cannot
be queued.
"""
if len(self._queuedRequests) < ChipsBus.MAX_QUEUED_REQUESTS:
dataU32 = dataU32 & 0xffffffff # Ignore oversize input.
chipsLog.debug("Write queued: dataU32 = 0x" + uInt32HexStr(dataU32) + " to register '"
+ name + "' with addrOffset = 0x" + uInt32HexStr(addrOffset))
addrTableItem = self.addrTable.getItem(name) # Get the details of the relevant item from the addr table.
if not addrTableItem.getWriteFlag():
raise ChipsException("Write transaction creation error: write is not allowed on register '" + addrTableItem.getName() + "'.")
self._queuedRequests.append(self._createRMWBitsTransactionElement(addrTableItem, dataU32, addrOffset))
self._queuedAddrTableItems.append(addrTableItem)
self._queuedIsARead.append(False)
else:
chipsLog.warning("Warning: transaction not added to queue as transaction queue has reached its maximum length!\n" +
"\tPlease either run or clear the transaction queue before continuing.\n")
def queueRun(self):
"""Runs the current queue of single register read or write transactions and returns two lists. The
first contains the values read and the second contains the values written.
Note: Only single-register reads/writes can be queued. Block reads/writes, etc, cannot
be queued.
"""
chipsLog.debug("Running all queued transactions")
requestQueueLength = len(self._queuedRequests)
readResponse = []
writeResponse = []
try:
transaction = self._makeAndRunTransaction(self._queuedRequests)
except ChipsException, err:
self.queueClear()
raise ChipsException("Error while running queued transactions:\n\t" + str(err))
for i in range(requestQueueLength):
addrTableItem = self._queuedAddrTableItems[i]
transactionResponse = transaction.responses[i - requestQueueLength].getBody()[0] & 0xffffffff
transactionResponse = addrTableItem.shiftDataFromMask(transactionResponse)
if self._queuedIsARead[i]:
readResponse.append(transactionResponse)
chipsLog.debug("Read success! Register '" + addrTableItem.getName() + "' returned: 0x" + uInt32HexStr(transactionResponse))
else:
writeResponse.append(transactionResponse)
chipsLog.debug("Write success! Register '" + addrTableItem.getName() + "' assigned: 0x" + uInt32HexStr(transactionResponse))
self.queueClear()
response = [readResponse, writeResponse]
return response
def queueClear(self):
"""Clears the current queue of transactions"""
chipsLog.debug("Clearing transaction queue")
self._queuedRequests = []
self._queuedAddrTableItems = []
self._queuedIsARead =[]
def read(self, name, addrOffset=0):
"""Read from a single masked/unmasked 32-bit register. The result is returned from the function.
This read transaction runs straight away - i.e it's not queued at all.
Warning: using this method clears any previously queued transactions
that have not yet been run!
name: the register name of the register you want to read from.
addrOffset: optional - provide a 32-bit word offset if you wish.
Notes: Use the addrOffset at your own risk! No checking is done to
see if offsets are remotely sensible!
"""
if len(self._queuedRequests):
chipsLog.warning("Warning: Individual read requested, clearing previously queued transactions!\n")
self.queueClear()
self.queueRead(name, addrOffset)
result = self.queueRun()
return result[0][0]
def write(self, name, dataU32, addrOffset=0):
"""Write to a single register (masked, or otherwise).
This write transaction runs straight away - i.e it's not queued at all.
Warning: using this method clears any previously queued transactions
that have not yet been run!
name: the register name of the register you want to read from.
dataU32: the 32-bit value you want writing
addrOffset: optional - provide a 32-bit word offset if you wish.
Notes:
Use the addrOffset at your own risk! No checking is done to
see if offsets are remotely sensible!
Under the hood, this is implemented as an RMW-bits transaction.
"""
if len(self._queuedRequests):
chipsLog.warning("Warning: Individual write requested, clearing previously queued transactions!\n")
self.queueClear()
dataU32 = dataU32 & 0xffffffff # Ignore oversize input.
self.queueWrite(name, dataU32, addrOffset)
self.queueRun()
def blockRead(self, name, depth=1, addrOffset=0):
"""Block read (not for masked registers!). Returns a list of the read results (32-bit numbers).
The blockRead() transaction runs straight away - it cannot be queued.
name: the register name of the register you want to read from.
depth: the number of 32-bit reads deep you want to go from the start address.
(i.e. depth=3 will return a list with three 32-bit values).
addrOffset: optional - provide a 32-bit word offset if you wish.
Notes: Use the depth and addrOffset at your own risk! No checking is done to
see if these values are remotely sensible!
"""
chipsLog.debug("Block read requested: register '" + name + "' with addrOffset = 0x"
+ uInt32HexStr(addrOffset) + " and depth = " + str(depth))
return self._blockOrFifoRead(name, depth, addrOffset, False)
def fifoRead(self, name, depth=1, addrOffset=0):
"""Non-incrementing block read (not for masked registers!). Returns list of the read results.
Reads from the same address the number of times specified by depth
The fifoRead() transaction runs straight away - it cannot be queued.
name: the register name of the register you want to read from.
depth: the number of 32-bit reads you want to perform on the FIFO
(i.e. depth=3 will return a list with three 32-bit values).
addrOffset: optional - provide a 32-bit word offset if you wish.
Notes: Use the depth and addrOffset at your own risk! No checking is done to
see if these values are remotely sensible!
"""
chipsLog.debug("FIFO read (non-incrementing block read) requested: register '" + name + "' with addrOffset = 0x"
+ uInt32HexStr(addrOffset) + " and depth = " + str(depth))
return self._blockOrFifoRead(name, depth, addrOffset, True)
def blockWrite(self, name, dataList, addrOffset=0):
"""Block write (not for masked registers!).
The blockWrite() transaction runs straight away - it cannot be queued.
name: the register name of the register you want to read from.
dataList: the list of 32-bit values you want writing. The size of the list
determines how deep the block write goes.
addrOffset: optional - provide a 32-bit word offset if you wish.
Notes: Use this at your own risk! No checking is currently done to see if
you will be stomping on any other registers if the dataList or addrOffset
is inappropriate in size!
"""
chipsLog.debug("Block write requested: register '" + name + "' with addrOffset = 0x"
+ uInt32HexStr(addrOffset) + " and depth = " + str(len(dataList)))
return self._blockOrFifoWrite(name, dataList, addrOffset, False)
def fifoWrite(self, name, dataList, addrOffset=0):
"""Non-incrementing block write (not for masked registers!).
Writes all the values held in the dataList to the same register.
The fifoWrite() transaction runs straight away - it cannot be queued.
name: the register name of the register you want to read from.
dataList: the list of 32-bit values you want writing. The size of the list
determines how many writes will be performed on the FIFO.
addrOffset: optional - provide a 32-bit word offset if you wish.
Notes: Use this at your own risk! No checking is currently done to see if
you will be stomping on any other registers if the dataList or addrOffset
is inappropriate in size!
"""
chipsLog.debug("FIFO write (non-incrementing block write) requested: register '" + name + "' with addrOffset = 0x"
+ uInt32HexStr(addrOffset) + " and depth = " + str(len(dataList)))
return self._blockOrFifoWrite(name, dataList, addrOffset, True)
def _getTransactionId(self):
"""Returns the current value of the transaction ID counter and increments.
Note: Transaction ID = 0 will be reserved for byte-order transactions, which
are common and rather uninteresting. For any other kind of transaction, this
can be used to get access to an incrementing counter, that will go from 1->2047
before looping back around to 1.
"""
currentValue = self._transactionId
if self._transactionId < ChipsBus.MAX_TRANSACTION_ID:
self._transactionId += 1
else:
self._transactionId = 1
return currentValue
def _createRMWBitsTransactionElement(self, addrTableItem, dataU32, addrOffset = 0):
"""Returns a Read/Modify/Write Bits Request transaction element (i.e. masked write)
addrTableItem: The relevant address table item you want to perform the RMWBits transaction on.
dataU32: The data (32 bits max, or equal in width to the bit-mask).
addrOffset: The offset on the address specified within the address table item, default is 0.
"""
if not uInt32Compatible(dataU32):
raise ChipsException("Read-Modify-Write Bits transaction creation error: cannot create a RMW-bits " \
"transaction with data values (" + hex(dataU32) +") that are not valid 32-bit " \
"unsigned integers!")
rmwHeader = IPbusHeader.makeHeader(ChipsBus.IPBUS_PROTOCOL_VER, self._getTransactionId(), 1, IPbusHeader.TYPE_ID_RMW_BITS, IPbusHeader.INFO_CODE_REQUEST)
rmwBody = [addrTableItem.getAddress() + addrOffset, \
uInt32BitFlip(addrTableItem.getMask()), \
addrTableItem.shiftDataToMask(dataU32)]
return TransactionElement.makeFromHeaderAndBody(rmwHeader, rmwBody)
def _createWriteTransactionElement(self, addrTableItem, dataList, addrOffset = 0, isFifo = False):
"""Returns a Write Request transaction element (i.e. unmasked/block write)
addrTableItem: The relevant address table item you want to perform the write transaction on.
dataList: The list of 32-bit numbers you want to write (the list size defines the write depth)
addrOffset: The offset on the address specified within the address table item, default is 0.
isFifo: False gives a normal write transaction; True gives a non-incrementing write transaction (i.e. same addr many times).
"""
for value in dataList:
if not uInt32Compatible(value):
raise ChipsException("Write transaction creation error: cannot create a write transaction with data " \
"values (" + hex(value) +") that are not valid 32-bit unsigned integers!")
typeId = IPbusHeader.TYPE_ID_WRITE
if isFifo: typeId = IPbusHeader.TYPE_ID_NON_INCR_WRITE
writeHeader = IPbusHeader.makeHeader(ChipsBusBase.IPBUS_PROTOCOL_VER, self._getTransactionId(), len(dataList), typeId, IPbusHeader.INFO_CODE_REQUEST)
writeBody = [addrTableItem.getAddress() + addrOffset] + dataList
return TransactionElement.makeFromHeaderAndBody(writeHeader, writeBody)
def _createReadTransactionElement(self, addrTableItem, readDepth = 1, addrOffset = 0, isFifo = False):
"""Returns a Read Request transaction element
addrTableItem: The relevant address table item you want to perform the write transaction on.
readDepth: The depth of the read; default is 1, which would be a single 32-bit register read.
addrOffset: The offset on the address specified within the address table item, default is 0.
isFifo: False gives a normal read transaction; True gives a non-incrementing read transaction (i.e. same addr many times).
"""
typeId = IPbusHeader.TYPE_ID_READ
if isFifo: typeId = IPbusHeader.TYPE_ID_NON_INCR_READ
readHeader = IPbusHeader.makeHeader(ChipsBusBase.IPBUS_PROTOCOL_VER, self._getTransactionId(), readDepth, typeId, IPbusHeader.INFO_CODE_REQUEST)
readBody = [addrTableItem.getAddress() + addrOffset]
return TransactionElement.makeFromHeaderAndBody(readHeader, readBody)
def _makeAndRunTransaction(self, requestsList):
"""Constructs, runs and then returns a completed transaction from the given requestsList
requestsList: a list of TransactionElements (i.e. requests from client to the hardware).
Notes: _makeAndRunTransaction will automatically prepend one byte-order transaction.
"""
# Construct the transaction and serialise it - we prepend four byte-order transactions in
# order to ensure we meet minimum Ethernet payload requirements, else funny stuff happens.
transaction = Transaction.constructClientTransaction(requestsList, self._hostAddr)
transaction.serialiseRequests()
chipsLog.debug("Sending packet now.");
try:
# Send the transaction
self._socketSend(transaction)
except socket.error, socketError:
raise ChipsException("A socket error occurred whilst sending the IPbus transaction request packet:\n\t" + str(socketError))
try:
# Get response
transaction.serialResponses = self._socket.recv(ChipsBus.SOCKET_BUFFER_SIZE)
except socket.error, socketError:
raise ChipsException("A socket error occurred whilst getting the IPbus transaction response packet:\n\t" + str(socketError))
chipsLog.debug("Received response packet.");
transaction.deserialiseResponses()
transaction.doTransactionChecks() # Generic transaction checks
self._transactionId = 1 # TEMPORARY IPBUS V2.x HACK! Reset the transaction ID to 1 for each packet.
return transaction
def _initSocketCommon(self, localPort):
"""Performs common socket initialisation (i.e. common to UDP + TCP)"""
if localPort != None:
localAddr = ("", localPort)
self._socket.bind(localAddr)
self._socket.settimeout(1)
def _blockOrFifoRead(self, name, depth, addrOffset, isFifo = False):
"""Common code for either a block read or a FIFO read."""
if depth <= 0:
chipsLog.warn("Ignoring read with depth = 0 from register '" + name + "'!")
return
if depth > ChipsBus.MAX_BLOCK_TRANSFER_DEPTH:
return self._oversizeBlockOrFifoRead(name, depth, addrOffset, isFifo)
addrTableItem = self.addrTable.getItem(name) # Get the details of the relevant item from the addr table.
if addrTableItem.getMask() != 0xffffffff:
raise ChipsException("Block/FIFO read error: cannot perform block or FIFO read on a masked register address!")
try:
if not addrTableItem.getReadFlag(): raise ChipsException("Read transaction creation error: read is not allowed on register '" + addrTableItem.getName() + "'.")
# create and run the transaction and get the response
transaction = self._makeAndRunTransaction( [self._createReadTransactionElement(addrTableItem, depth, addrOffset, isFifo)] )
except ChipsException, err:
raise ChipsException("Block/FIFO read error on register '" + name + "':\n\t" + str(err))
blockReadResponse = transaction.responses[-1] # Block read response will be last in list
chipsLog.debug("Block/FIFO read success! Register '" + name + "' (addrOffset=0x"
+ uInt32HexStr(addrOffset) + ") was read successfully." )
return blockReadResponse.getBody().tolist()
def _oversizeBlockOrFifoRead(self, name, depth, addrOffset, isFifo):
"""Handles a block or FIFO read that's too big to be handled by a single UDP packet"""
chipsLog.debug("Read depth too large for single packet... will automatically split read over many packets")
remainingTransactions = depth
result =[]
offsetMultiplier = 1
if isFifo: offsetMultiplier = 0
while remainingTransactions > ChipsBus.MAX_BLOCK_TRANSFER_DEPTH:
result.extend(self._blockOrFifoRead(name, ChipsBus.MAX_BLOCK_TRANSFER_DEPTH, addrOffset + ((depth - remainingTransactions) * offsetMultiplier), isFifo))
remainingTransactions -= ChipsBus.MAX_BLOCK_TRANSFER_DEPTH
result.extend(self._blockOrFifoRead(name, remainingTransactions, addrOffset + ((depth - remainingTransactions) * offsetMultiplier), isFifo))
return result
def _blockOrFifoWrite(self, name, dataList, addrOffset, isFifo = False):
"""Common code for either a block write or a FIFO write."""
depth = len(dataList)
addrTableItem = self.addrTable.getItem(name) # Get the details of the relevant item from the addr table.
if addrTableItem.getMask() != 0xffffffff:
raise ChipsException("Block/FIFO write error: cannot perform block or FIFO write on a masked register address!")
if depth == 0:
chipsLog.warn("Ignoring block/FIFO write to register '" + name + "': dataList is empty!");
return
elif depth > ChipsBus.MAX_BLOCK_TRANSFER_DEPTH:
return self._oversizeBlockOrFifoWrite(name, dataList, addrOffset, isFifo)
try:
if not addrTableItem.getWriteFlag(): raise ChipsException("Write transaction creation error: write is not allowed on register '" + addrTableItem.getName() + "'.")
# create and run the transaction and get the response
self._makeAndRunTransaction( [self._createWriteTransactionElement(addrTableItem, dataList, addrOffset, isFifo)] )
except ChipsException, err:
raise ChipsException("Block/FIFO write error on register '" + name + "':\n\t" + str(err))
chipsLog.debug("Block/FIFO write success! " + str(depth) + " 32-bit words were written to '"
+ name + "' (addrOffset=0x" + uInt32HexStr(addrOffset) + ")")
def _oversizeBlockOrFifoWrite(self, name, dataList, addrOffset, isFifo):
"""Handling for a block write which is too big for the hardware to handle in one go"""
chipsLog.debug("Write depth too large for single packet... will automatically split write over many packets")
depth = len(dataList)
remainingTransactions = depth
offsetMultiplier = 1
if isFifo: offsetMultiplier = 0
while remainingTransactions > ChipsBus.MAX_BLOCK_TRANSFER_DEPTH:
self._blockOrFifoWrite(name, dataList[(depth - remainingTransactions):(depth - remainingTransactions) + ChipsBus.MAX_BLOCK_TRANSFER_DEPTH],
addrOffset + ((depth - remainingTransactions) * offsetMultiplier), isFifo)
remainingTransactions -= ChipsBus.MAX_BLOCK_TRANSFER_DEPTH
self._blockOrFifoWrite(name, dataList[(depth - remainingTransactions):], addrOffset + ((depth - remainingTransactions) * offsetMultiplier), isFifo)
def _socketSend(self, transaction):
raise NotImplementedError("ChipsBusBase is an Abstract Base Class!\n" \
"Please use a concrete implementation such as ChipsBusUdp or ChipsBusTcp!")
class ChipsBusUdp(ChipsBusBase):
"""Common Hardware Interface Protocol System Bus (CHIPS-Bus) using UDP packets for bus data.
Allows you to communicate with and control devices running Jeremy Mans's, et al, IP-based
uTCA control system firmware. This concrete class uses UDP packets for sending and
receiving the bus data.
"""
def __init__(self, addrTable, hostIp, hostPort, localPort = None):
"""Constructor for ChipsBus over UDP
addrTable: An instance of AddressTable for the device you wish to communicate with.
hostIP: The IP address of the device you want to control, e.g. the string '192.168.1.100'.
hostPort: The network port number of the device you want to control.
localPort: If you wish to bind the socket to a particular local port, then specify the
the local port number here. The default (None) means that the socket will not bind
to any specific local port - an available port be found when it comes to sending any
packets.
"""
ChipsBusBase.__init__(self, addrTable, hostIp, hostPort, localPort)
self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP
self._initSocketCommon(localPort)
def _socketSend(self, transaction):
"""Send a transaction (via UDP)"""
self._socket.sendto(transaction.serialRequests, transaction.addr) # UDP-specific
class ChipsBusTcp(ChipsBusBase):
"""Common Hardware Interface Protocol System Bus (CHIPS-Bus) using TCP packets for bus data.
Allows you to communicate with and control devices running Jeremy Mans's, et al, IP-based
uTCA control system firmware. This concrete class uses TCP packets for sending and
receiving the bus data.
"""
def __init__(self, addrTable, hostIp, hostPort, localPort = None):
"""ChipsBus over TCP
addrTable: An instance of AddressTable for the device you wish to communicate with.
hostIP: The IP address of the device you want to control, e.g. the string '192.168.1.100'.
hostPort: The network port number of the device you want to control.
localPort: If you wish to bind the socket to a particular local port, then specify the
the local port number here. The default (None) means that the socket will not bind
to any specific local port - an available port be found when it comes to sending any
packets.
"""
ChipsBusBase.__init__(self, addrTable, hostIp, hostPort, localPort)
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # TCP
self._initSocketCommon(localPort)
self._socket.connect((hostIp, hostPort)) # TCP-specific
def _socketSend(self, transaction):
"""Send a transaction (via TCP)"""
self._socket.send(transaction.serialRequests) # TCP-specific
class ChipsBus(ChipsBusUdp):
"""Deprecated! Essentially now just an alias for ChipsBusUdp. Please update
your code replacing usage of ChipsBus with ChipsBusUdp."""
def __init__(self, addrTable, hostIp, hostPort, localPort = None):
ChipsBusUdp.__init__(self, addrTable, hostIp, hostPort, localPort)
chipsLog.warning("Please note: this class has been deprecated - use ChipsBusUdp"\
" in the future if you want the same functionality.")
'''
Exceptions specific to the PyChips package.
@author: Robert Frazier
March 2011
'''
class ChipsException(Exception):
"""Base exception for all PyChips exceptions"""
def __init__(self, value):
self.value = value
def __str__(self):
return str(self.value)
\ No newline at end of file
'''
Common Hardware Interface Protocol System (CHIPS) logger
Just a simple logger.
Created on May 12, 2010
@author: Robert Frazier
'''
import logging
import sys
chipsLog = logging.getLogger("CHIPSLog")
logHandler = logging.StreamHandler(sys.stdout) # Use this to log to screen
#logHandler = logging.FileHandler("ChipsBus.log") # Use this to log to file
logFormatter = logging.Formatter("%(message)s")
logHandler.setFormatter(logFormatter)
chipsLog.addHandler(logHandler)
chipsLog.setLevel(logging.WARNING) # Change this log threshold to one of the following: DEBUG INFO WARNING ERROR CRITICAL
'''
Tools common to the entire project
Various tools, including tools to deal with the general annoyance of Python when you really
want to work with a well-defined 32-bit unsigned integer type.
Created on May 12, 2010
@author: Robert Frazier
'''
from ChipsException import ChipsException
def pyChipsVersion():
"""Returns the version number of the PyChips release as a string e.g.: 'PyChips vX.Y.Z'"""
return "PyChips v1.5.0_pre2"
def uInt32Compatible(integer):
"""Returns true if the integer provided is not negative and can be express within 32-bits."""
return integer < 0x100000000 and integer >= 0
def uInt32HexStr(uInt32, checkValidUInt32 = False):
"""Return a string of a 32-bit unsigned integer without any minus signs, or other stupidity.
I can't believe I actually had to to write this function. Python always manages to
fuckup displaying unsigned integers somehow or other.
"""
if checkValidUInt32:
if not uInt32Compatible(uInt32):
raise ChipsException("The value: " + hex(uInt32) + " is not a valid 32-bit unsigned integer!")
bigEnd = (uInt32 >> 16) & 0xffff
littleEnd = uInt32 & 0xffff
result = "%(#1)04x%(#2)04x" % { "#1" : bigEnd, "#2" : littleEnd }
return result
def uInt32BitFlip(uInt32, checkValidUInt32 = False):
"""Flip the bits of a 32-bit unsigned number, avoiding negative numbers or other stupidity.
I can't believe I had to write this function either. Using the ~ operator (which is the
bitwise complement operator) sometimes results in a negative number, which then breaks
the socket.htonl() or socket.ntohl() fuctions
"""
if checkValidUInt32:
if not uInt32Compatible(uInt32):
raise ChipsException("The value: " + hex(uInt32) + " is not a valid 32-bit unsigned integer!")
if uInt32 >= 0: return 0xffffffff ^ (0xffffffff & uInt32)
return 0xffffffff & ~uInt32
def uInt32HexListStr(uInt32List):
"""Converts a list of 32-bit unsigned numbers into a pretty string - basically a column of constant width 32-bit hex."""
outputStr = ""
for value in uInt32List:
outputStr += "\n\t" + uInt32HexStr(value)
outputStr += "\n"
return outputStr
def uInt32HexDualListStr(uInt32List1, uInt32List2):
"""Returns a pretty string of two 32-bit unsigned hex lists in columns side by side, for easy comparison
uInt32List1 and uInt32List2: Lists of equal length containing 32-bit unsigned values.
Any differences are marked with a != sign between the two columns.
"""
minListLength = len(uInt32List1)
if len(uInt32List2) < minListLength: minListLength = len(uInt32List2)
outputStr = ""
for i in range(minListLength):
val1 = uInt32List1[i]
val2 = uInt32List2[i]
middle = " "
if val1 != val2: middle = " != "
outputStr += "\n\t" + uInt32HexStr(val1) + middle + uInt32HexStr(val2)
outputStr += "\n"
return outputStr
def uInt8HexStr(uInt8):
"""Return a string of an 8-bit unsigned integer without any minus signs, or other possible stupidity."""
croppedTo8bit = uInt8 & 0xff # ensure the value is only 8 bit.
result = "%(#1)02x" % { "#1" : croppedTo8bit }
return result
\ No newline at end of file
'''
Created on May 12, 2010
@author: Robert Frazier
'''
# System imports
import socket
from Queue import Queue
from threading import Thread
from time import sleep
# Project imports
from ChipsLog import chipsLog
from Transaction import Transaction
from TransactionElement import TransactionElement
import IPbusHeader
from CommonTools import uInt32HexStr
from ChipsException import ChipsException
class DummyHardwareBase(Thread):
"""Represents a dummy hardware device using the IPbus control system.
This class can receive, act on, and respond to IPbus packets. It will
keep track of registers that are written to, but its initial register
state is zero throughout the entire address space.
It operates with two threads. The main program thread receives incoming
packets and queues them for action and response by a transaction handler
thread.
Note: Requesting a read on register address 0xffffffff will be interpreted
as a request to reset the internal state of the hardware, so all registers
will be set to zero.
"""
SOCKET_BUFFER_SIZE = 32768
REQUEST_QUEUE_SIZE = 1024
def __init__(self):
Thread.__init__(self)
self._registers = {}
self._transaction_queue = Queue(DummyHardware.REQUEST_QUEUE_SIZE)
self._transactionCounter = 0
self.stopServing = False
# This maps request type number to the function that handles the request.
self._requestTypeHandlerMap = {IPbusHeader.TYPE_ID_READ : self._handleReadRequest,
IPbusHeader.TYPE_ID_WRITE : self._handleWriteRequest,
IPbusHeader.TYPE_ID_NON_INCR_READ : self._handleFifoReadRequest,
IPbusHeader.TYPE_ID_NON_INCR_WRITE : self._handleFifoWriteRequest,
IPbusHeader.TYPE_ID_RMW_BITS : self._handleReadModifyWriteBitsRequest,
IPbusHeader.TYPE_ID_RMW_SUM : self._handleReadModifyWriteSumRequest,
IPbusHeader.TYPE_ID_RSVD_ADDR_INFO : self._handleGetReservedAddrInfoRequest}
def serveForever(self):
raise NotImplementedError("DummyHardwareBase is an Abstract Base Class!\n" \
"Please use a concrete implementation such as "\
"DummyHardwareUdp or DummyHardwareTcp!")
def run(self):
"""Start the transaction handler thread"""
chipsLog.info("Transaction handler thread started")
while not self.stopServing:
if self._transaction_queue.empty():
sleep(0.001)
else:
transaction = self._transaction_queue.get()
self._actAndRespond(transaction)
chipsLog.info("Transaction handler thread stopping")
def closeSockets(self):
"""Allows you to manually close any sockets that may have been opened."""
try:
self._socket.close()
chipsLog.debug("Socket closed successfully.")
except:
chipsLog.warn("Error closing socket!")
def _actAndRespond(self, transaction):
"""Performs the required action and returns the response for a given transaction"""
self._transactionCounter += 1
chipsLog.debug("*** Performing transaction #" + str(self._transactionCounter) + " ***")
try:
transaction.deserialiseRequests()
for request in transaction.requests:
transaction.appendResponse(self._requestTypeHandlerMap[IPbusHeader.getTypeId(request.getHeader())](request))
transaction.serialiseResponses()
chipsLog.debug("Sending response packet")
self._socketSend(transaction)
chipsLog.debug("Response packet sent!")
chipsLog.debug("*** Transaction #" + str(self._transactionCounter) + " completed! ***\n")
except ChipsException, err:
chipsLog.error("ERROR! Transaction #" + str(self._transactionCounter) +
" could not be successfully processed:\n\t" + str(err))
def _socketSend(self, transaction):
raise NotImplementedError("DummyHardwareBase is an Abstract Base Class!\n" \
"Please use a concrete implementation such as "\
"DummyHardwareUdp or DummyHardwareTcp!")
def _handleReadRequest(self, request):
requestHeader = request.getHeader()
words = IPbusHeader.getWords(requestHeader)
baseAddr = request.getBody()[0]
chipsLog.debug("Read requested on Addr=0x" + uInt32HexStr(baseAddr))
# Response header is the request header but with the Info Code field changed to INFO_CODE_RESPONSE
responseHeader = IPbusHeader.updateInfoCode(requestHeader, IPbusHeader.INFO_CODE_RESPONSE)
# The (baseAddr & 0xffffffff) forces baseAddr to be in unsigned form (i.e. 0xfffffffc, say, rather than -4)
if (baseAddr & 0xffffffff) == 0xffffffff: # A read on this register is a Dummy Hardware Reset Request.
chipsLog.info("** Dummy Hardware reset request received! Zeroing all registers. **")
self._registers.clear()
responseBody = []
appendToResponseBody = responseBody.append # This is for a speed optimisation
for offset in range(words):
currentReg = baseAddr + offset
# Create these registers if they don't already exist.
if currentReg not in self._registers:
self._registers[currentReg] = 0
appendToResponseBody(self._registers[currentReg])
return TransactionElement.makeFromHeaderAndBody(responseHeader, responseBody)
def _handleWriteRequest(self, request):
requestHeader = request.getHeader()
words = IPbusHeader.getWords(requestHeader)
requestBody = request.getBody()
baseAddr = requestBody[0]
chipsLog.debug("Write requested on Addr=0x" + uInt32HexStr(baseAddr))
# Response header is the request header but with the Info Code field changed to INFO_CODE_RESPONSE
responseHeader = IPbusHeader.updateInfoCode(requestHeader, IPbusHeader.INFO_CODE_RESPONSE)
for offset in range(words):
currentReg = baseAddr + offset
self._registers[currentReg] = requestBody[offset + 1]
return TransactionElement.makeFromHeaderAndBody(responseHeader)
def _handleFifoReadRequest(self, request):
requestHeader = request.getHeader()
words = IPbusHeader.getWords(requestHeader)
fifoAddr = request.getBody()[0]
chipsLog.debug("FIFO read requested on Addr=0x" + uInt32HexStr(fifoAddr))
# Response header is the request header but with the Info Code field changed to INFO_CODE_RESPONSE
responseHeader = IPbusHeader.updateInfoCode(requestHeader, IPbusHeader.INFO_CODE_RESPONSE)
# Create the register if they don't already exist in our memory space
if fifoAddr not in self._registers:
self._registers[fifoAddr] = 0
# Obviously we don't really have a FIFO, so we'll just have to return the value stored
# at the FIFO's address many times...
value = self._registers[fifoAddr]
responseBody = [value for iReads in range(words)]
return TransactionElement.makeFromHeaderAndBody(responseHeader, responseBody)
def _handleFifoWriteRequest(self, request):
requestHeader = request.getHeader()
requestBody = request.getBody()
fifoAddr = requestBody[0]
chipsLog.debug("FIFO write requested on Addr=0x" + uInt32HexStr(fifoAddr))
# Response header is the request header but with the Info Code field changed to INFO_CODE_RESPONSE
responseHeader = IPbusHeader.updateInfoCode(requestHeader, IPbusHeader.INFO_CODE_RESPONSE)
# Obviously we don't really have a FIFO, we just have a single register at the address of the supposed
# FIFO. So, whatever the last value written into the FIFO is, this will be the value this register will
# take. We ignore all the previous "writes" to the FIFO.
self._registers[fifoAddr] = requestBody[-1]
return TransactionElement.makeFromHeaderAndBody(responseHeader)
def _handleReadModifyWriteBitsRequest(self, request):
requestBody = request.getBody()
addr = requestBody[0]
andTerm = requestBody[1] # The and term is the bitwise complement of the register mask (i.e. mask = ~andTerm)
orTerm = requestBody[2]
chipsLog.debug("Read/Modify/Write-bits requested on Addr=0x" + uInt32HexStr(addr))
# Create the register if it doesn't already exist.
if addr not in self._registers: self._registers[addr] = 0
updatedValue = (self._registers[addr] & andTerm) | (orTerm)
self._registers[addr] = updatedValue
# Response header is the request header but with the Info Code field changed to INFO_CODE_RESPONSE
responseHeader = IPbusHeader.updateInfoCode(request.getHeader(), IPbusHeader.INFO_CODE_RESPONSE)
return TransactionElement.makeFromHeaderAndBody(responseHeader, [updatedValue])
def _handleReadModifyWriteSumRequest(self, request):
requestBody = request.getBody()
addr = requestBody[0]
addend = requestBody[1] # The value we add to the existing value
chipsLog.debug("Read/Modify/Write-sum requested on Addr=0x" + uInt32HexStr(addr))
# Create the register if it doesn't already exist.
if addr not in self._registers: self._registers[addr] = 0
updatedValue = (self._registers[addr] + addend) & 0xffffffff
self._registers[addr] = updatedValue
# Response header is the request header but with the Info Code field changed to INFO_CODE_RESPONSE
responseHeader = IPbusHeader.updateInfoCode(request.getHeader(), IPbusHeader.INFO_CODE_RESPONSE)
return TransactionElement.makeFromHeaderAndBody(responseHeader, [updatedValue])
def _handleGetReservedAddrInfoRequest(self, request):
chipsLog.debug("Reserved Address Info transaction requested")
# Response header is the request header but with the Info Code field changed to INFO_CODE_RESPONSE
responseHeader = IPbusHeader.updateInfoCode(request.getHeader(), IPbusHeader.INFO_CODE_RESPONSE)
responseHeader = IPbusHeader.updateWords(responseHeader, 2)
# Returning zeros for the response body, as no real idea what else it should be.
return TransactionElement.makeFromHeaderAndBody(responseHeader, [0,0])
def _stopServingAndJoinThreads(self):
self.stopServing = True
self.join(1) #Let's wait for the transaction handler thread to complete normally.
chipsLog.info("Dummy hardware server stopping.")
class DummyHardwareUdp(DummyHardwareBase):
def __init__(self, port = 50001):
DummyHardwareBase.__init__(self)
self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP
self._socket.bind(('', port))
def serveForever(self):
"""The only function any user of this class needs to run!
Receives, acts on, and responds to UDP control packets as the Mini-T (or similar hardware) would.
Packets are received by the main thread and queued for action and response by a second thread.
"""
chipsLog.info("Dummy Hardware UDP Server starting")
# This starts the packet "act and respond" handler thread
self.start()
while not self.stopServing:
try:
data, addr = self._socket.recvfrom(DummyHardware.SOCKET_BUFFER_SIZE)
except KeyboardInterrupt:
chipsLog.warning("\nKeyboard interrupt (ctrl-c) received whilst waiting for incoming UDP packet")
self._stopServingAndJoinThreads()
return
except:
chipsLog.warning("\nException caught whilst waiting for incoming UDP packet")
self._stopServingAndJoinThreads()
return
if not data:
chipsLog.warning("Socket received an empty packet from " + repr(addr) + \
". Assuming socket now closed.\nTerminating dummy hardware server...")
self._stopServingAndJoinThreads()
return
else:
chipsLog.debug("\nReceived packet from " + repr(addr))
transaction = Transaction.constructHostTransaction(data, addr)
self._transaction_queue.put(transaction)
def _socketSend(self, transaction):
self._socket.sendto(transaction.serialResponses, transaction.addr)
class DummyHardwareTcp(DummyHardwareBase):
def __init__(self, port = 50002):
DummyHardwareBase.__init__(self)
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # TCP
self._socket.bind(('', port))
self._socket.listen(1) # TCP-specific: we will only accept a single TCP connection
self._connectedSocket = None # Once we have a TCP connection, this will be used for responding to the client.
def closeSockets(self):
"""Allows you to manually close any sockets that may have been opened."""
if self._connectedSocket != None:
try:
self._connectedSocket.close()
chipsLog.debug("Connected TCP socket closed successfully.")
except:
chipsLog.debug("Error closing connected TCP socket!")
DummyHardwareBase.closeSockets(self)
def serveForever(self):
"""The only function any user of this class needs to run!
Receives, acts on, and responds to TCP control packets as the Mini-T (or similar hardware) would.
Packets are received by the main thread and queued for action and response by a second thread.
"""
chipsLog.info("Dummy Hardware TCP Server starting")
# This starts the packet "act and respond" handler thread
self.start()
try:
chipsLog.debug("Awaiting connection...")
self._connectedSocket, addr = self._socket.accept() # TCP
chipsLog.debug("Client connection accepted.")
except KeyboardInterrupt:
chipsLog.warning("\nKeyboard interrupt (ctrl-c) received whilst waiting for a TCP connection")
self._stopServingAndJoinThreads()
return
while not self.stopServing:
try:
data = self._connectedSocket.recv(DummyHardware.SOCKET_BUFFER_SIZE) # TCP
except KeyboardInterrupt:
chipsLog.warning("\nKeyboard interrupt (ctrl-c) received whilst waiting for incoming TCP packet")
self._stopServingAndJoinThreads()
return
except:
chipsLog.warning("\nException caught whilst waiting for incoming TCP packet")
self._stopServingAndJoinThreads()
return
if not data:
chipsLog.debug("TCP socket received an empty packet from " + repr(addr) + ": assuming connection closed.")
try:
chipsLog.debug("Awaiting new connection...")
self._connectedSocket, addr = self._socket.accept() # TCP
chipsLog.debug("Client connection accepted.")
continue
except KeyboardInterrupt:
chipsLog.warning("\nKeyboard interrupt (ctrl-c) received whilst waiting for a TCP connection")
self._stopServingAndJoinThreads()
return
chipsLog.debug("\nReceived TCP packet from " + repr(addr))
transaction = Transaction.constructHostTransaction(data, addr)
self._transaction_queue.put(transaction)
chipsLog.info("Dummy Hardware TCP Server stopping.")
def _socketSend(self, transaction):
self._connectedSocket.send(transaction.serialResponses) # TCP-specific
class DummyHardware(DummyHardwareUdp):
"""Deprecated! Essentially now just an alias for DummyHardwareUdp"""
def __init__(self, port = 50001):
DummyHardwareUdp.__init__(self, port)
chipsLog.warning("Please note: this class has been deprecated - use DummyHardwareUdp in the future.")
'''
Created on May 14, 2010
@author: Robert Frazier
'''
from ChipsException import ChipsException
from CommonTools import uInt32HexStr
# This is an enumeration of the different "Info Codes", that define packet direction and error status.
INFO_CODE_RESPONSE = 0x0 # Host reports that the request was handled successfully.
INFO_CODE_BAD_HEADER = 0x1 # Host reports that the client request had a bad header.
INFO_CODE_BUS_ERR_ON_RD = 0x2 # Host reports that there was bus error on read.
INFO_CODE_BUS_ERR_ON_WR = 0x3 # Host reports that there was a bus error on write.
INFO_CODE_BUS_TIMEOUT_ON_RD = 0x4 # Host reports that there was a bus timeout on read.
INFO_CODE_BUS_TIMEOUT_ON_WR = 0x5 # Host reports that there was a bus timeout on write.
INFO_CODE_REQUEST = 0xf # A client transaction request
# A mapping of meaningful strings for the various info-codes above.
_infoCodeInterpretation = { INFO_CODE_RESPONSE : "The client request was handled successfully",
INFO_CODE_BAD_HEADER : "The client request had a bad header",
INFO_CODE_BUS_ERR_ON_RD : "The client request resulted in a bus-error-on-read",
INFO_CODE_BUS_ERR_ON_WR : "The client request resulted in a bus-error-on-write",
INFO_CODE_BUS_TIMEOUT_ON_RD : "The client request resulted in a bus-read timeout",
INFO_CODE_BUS_TIMEOUT_ON_WR : "The client request resulted in a bus-write timeout",
INFO_CODE_REQUEST : "Outgoing client request" }
# This is an enumeration of the different codes for the different transaction types
TYPE_ID_READ = 0x0 # Read transaction code
TYPE_ID_WRITE = 0x1 # Write transaction code
TYPE_ID_NON_INCR_READ = 0x2 # Non-Incrementing Read transaction code
TYPE_ID_NON_INCR_WRITE = 0x3 # Non-Incrementing Write transaction code
TYPE_ID_RMW_BITS = 0x4 # Read-Modify-Write Bits transaction code
TYPE_ID_RMW_SUM = 0x5 # Read-Modify-Write Sum transaction code
TYPE_ID_RSVD_ADDR_INFO = 0xE # Reserved Address Info Request transaction code
# Request transaction element (i.e. controller-to-target) minimum body size
_minimumRequestBodySize = { TYPE_ID_READ : 1, # Read
TYPE_ID_WRITE : 1, # Write
TYPE_ID_RMW_BITS : 3, # Read-Modify-Write Bits
TYPE_ID_RMW_SUM : 2, # Read-Modify-Write Sum
TYPE_ID_NON_INCR_READ : 1, # Non-incrementing Read
TYPE_ID_NON_INCR_WRITE : 1, # Non-incrementing Write
TYPE_ID_RSVD_ADDR_INFO : 0 } # Reserved Address Info request
# Response transaction element (i.e. target-to-controller) minimum body size
_minimumResponseBodySize = { TYPE_ID_READ : 0, # Read
TYPE_ID_WRITE : 0, # Write
TYPE_ID_RMW_BITS : 1, # Read-Modify-Write Bits
TYPE_ID_RMW_SUM : 1, # Read-Modify-Write Sum
TYPE_ID_NON_INCR_READ : 0, # Non-incrementing Read
TYPE_ID_NON_INCR_WRITE : 0, # Non-incrementing Write
TYPE_ID_RSVD_ADDR_INFO : 2 } # Reserved Address Info request
def makeHeader(version, transactionId, words, typeId, infoCode):
'''Returns a 32-bit IPbus transaction element header as created from individual header components.'''
rawHeaderU32 = ((version & 0xf) << 28) | \
((transactionId & 0xfff) << 16) | \
((words & 0xff) << 8) | \
((typeId & 0xf) << 4) | \
(infoCode & 0xf)
return rawHeaderU32
def getExpectedBodySize(rawHeaderU32):
'''Calculates the expected body size of an IPbus TransactionElement from the raw 32-bit header word'''
result = 0
typeId = getTypeId(rawHeaderU32)
# Test if it's a known request/response typeId (doesn't matter which bodySize map we use)
if typeId in _minimumResponseBodySize:
# response from target to controller
if getInfoCode(rawHeaderU32) == INFO_CODE_RESPONSE:
if typeId == TYPE_ID_READ or typeId == TYPE_ID_NON_INCR_READ:
result = getWords(rawHeaderU32) + _minimumResponseBodySize[typeId]
else: result = _minimumResponseBodySize[typeId]
# request from controller to target
else:
if typeId == TYPE_ID_WRITE or typeId == TYPE_ID_NON_INCR_WRITE:
result = getWords(rawHeaderU32) + _minimumRequestBodySize[typeId]
else: result = _minimumRequestBodySize[typeId]
# Unknown transaction typeId.
else:
raise ChipsException("Cannot determine the expected body size from the given IPbus header '0x" + uInt32HexStr(rawHeaderU32) +
"', as it is of unknown transaction type '" + hex(typeId) + "'!")
return result
def getVersion(rawHeaderU32):
'''Returns the IPbus Version number contained within a raw IPbus Header'''
return (rawHeaderU32 >> 28) & 0xf
def getWords(rawHeaderU32):
'''Returns the value stored in the Words field of a raw IPbus Header'''
return (rawHeaderU32 >> 8) & 0xff
def updateWords(rawHeaderU32, newWords):
'''Returns the raw IPbus header but with the words field updated to the value specified'''
return (rawHeaderU32 & 0xffff00ff | ((newWords & 0xff) << 8))
def getTransactionId(rawHeaderU32):
'''Returns the Transaction ID number contained within a raw IPbus Header'''
return (rawHeaderU32 >> 16) & 0xfff
def getTypeId(rawHeaderU32):
'''Returns the Type ID Code contained within a raw IPbus Header'''
return (rawHeaderU32 >> 4) & 0xf
def getInfoCode(rawHeaderU32):
'''Returns the Info Code contained within a raw IPbus Header'''
return rawHeaderU32 & 0xf
def updateInfoCode(rawHeaderU32, newInfoCode):
'''Returns the raw IPbus header but with the Info Code updated to the value specified'''
return (rawHeaderU32 & 0xfffffff0) | (newInfoCode & 0xf)
def isRequestHeader(rawHeaderU32):
'''Returns true if the header is a request header (i.e. from client to target device)'''
return (getInfoCode(rawHeaderU32) == INFO_CODE_REQUEST)
def isResponseHeader(rawHeaderU32):
'''Returns true if the header is any kind of response header (i.e. either a success or error response).'''
return not isRequestHeader(rawHeaderU32)
def isResponseSuccessHeader(rawHeaderU32):
'''Returns true if the header is a successful host response header'''
return (getInfoCode(rawHeaderU32) == INFO_CODE_RESPONSE)
def isResponseErrorHeader(rawHeaderU32):
'''Returns true if the header is a host response error header'''
return isResponseHeader(rawHeaderU32) and not isResponseSuccessHeader(rawHeaderU32)
def interpretInfoCode(infoCode):
'''Returns a user-readable sentence string of the meaning of a given Info Code.'''
if infoCode in _infoCodeInterpretation:
return _infoCodeInterpretation[infoCode]
else:
return "The IPbus header Info Code '" + hex(0xf & infoCode) + "' is undefined/reserved!"
'''
Common imports for using PyChips as a standard user (normal amount of logging)
@author: Robert Frazier
May 2010
'''
from ChipsBus import ChipsBus, ChipsBusUdp, ChipsBusTcp
from AddressTable import AddressTable
from CommonTools import *
print pyChipsVersion()
from ChipsLog import chipsLog, logging
chipsLog.setLevel(logging.INFO)
\ No newline at end of file
"""
Common imports for using PyChips as a standard user, but with lots of DEBUG printouts
@author: Robert Frazier
May 2010
"""
from ChipsBus import ChipsBus, ChipsBusUdp, ChipsBusTcp
from AddressTable import AddressTable
from CommonTools import *
print pyChipsVersion()
from ChipsLog import chipsLog, logging
chipsLog.setLevel(logging.DEBUG)
\ No newline at end of file
'''
Created on June 22, 2011
@author: Robert Frazier
'''
from array import array
from IPbusHeader import getExpectedBodySize
from TransactionElement import TransactionElement, reprTransactionElementList
from CommonTools import uInt32HexStr
import logging
from ChipsLog import chipsLog
from ChipsException import ChipsException
class SerDes(object):
'''Class for serialising/deserialising IPbus transaction data
Can serialise a list of TransactionElements into a string, or deserialise
a string into a list of TransactionElements. The deserialisation stage
checks the byte-ordering by looking for a byte-order header, and deals
with the byte-reordering as appropriate. If a byte-reorder was required
on the deserialisation stage, then the serialisation stage will also
perform a reorder in order to respond correctly.
Byte-reordering is off by default, as it's obviously a waste of CPU.
'''
def __init__(self):
'''Constructor - no arguments'''
object.__init__(self)
self._doByteReorder = True;
def serialise(self, transactionElementList):
'''Serialises a list of transaction elements into an ASCII string for transmission'''
if chipsLog.level <= logging.DEBUG:
msg = "\nSerialising the following packet content:\n"
msg += reprTransactionElementList(transactionElementList)
chipsLog.debug(msg)
allTransactionsArray = array('I', [0x200000f0]) # TEMPORARY IPBUS V2.x HACK! Add a packet header of [0x200000f0] to the beginning of each packet.
extendFunc = allTransactionsArray.extend
for element in transactionElementList:
extendFunc(element.getAll())
if self._doByteReorder: allTransactionsArray.byteswap()
return allTransactionsArray.tostring()
def deserialise(self, packetPayloadString):
'''Deserialises a packet payload ASCII string into list of transaction elements'''
# 1) Unpack the string into unsigned integers
try:
rawU32Array = array('I', packetPayloadString) # Unpack string to an unsigned 32-bit array
except Exception, err:
raise ChipsException("Deserialisation error:\n\t" + str(err))
# 2) Debug output of the raw packet
if chipsLog.level <= logging.DEBUG:
msg = "\nRaw received packet content is:\n"
for u32 in rawU32Array: msg += " 0x" + uInt32HexStr(u32) + "\n"
chipsLog.debug(msg)
# 3) Detect if we need to do a byte-reorder
firstWord = rawU32Array[0]
firstWordMasked = firstWord & 0xf00000f0 # This selects only the bits relevant for detecting the byte ordering
if firstWordMasked == 0x200000f0:
chipsLog.debug("Packet header detected: no byte-reorder is required.")
elif firstWordMasked == 0xf0000020:
chipsLog.debug("Packet header detected: a byte-reorder will be performed.")
self._doByteReorder = True
else:
chipsLog.warn("Warning: No packet header (or unknown protocol version)! First word = 0x" + uInt32HexStr(firstWord) + ". Will hope for the best!...")
# 4) Do the byte-reorder if necessary
if self._doByteReorder: rawU32Array.byteswap()
# 5) Now deserialise into a list of TransactionElements
transactionElementList = [] # The list all the deserialised transaction elements will go into
appendToTransactionElementList = transactionElementList.append # This is needed for a speed optimisation
iU32 = 1 # Index for moving through the rawU32Array # TEMPORARY IPBUS V2.x HACK! Skip the first word which is now a packet ID
rawU32ArraySize = len(rawU32Array)
while iU32 < rawU32ArraySize:
expectedBodySize = getExpectedBodySize(rawU32Array[iU32])
if rawU32ArraySize - iU32 - 1 - expectedBodySize < 0:
raise ChipsException("Deserialisation error: packet not correctly formatted " \
"or does not contain the expected amount of data!")
appendToTransactionElementList(TransactionElement(rawU32Array[iU32: iU32 + 1 + expectedBodySize]))
iU32 += (1 + expectedBodySize)
# 6) Debug output of deserialised packet content
if chipsLog.level <= logging.DEBUG:
msg = "\nDeserialised packet content is:\n" \
+ reprTransactionElementList(transactionElementList)
chipsLog.debug(msg)
return transactionElementList
'''
Created on May 12, 2010
@author: Robert Frazier
'''
# System imports
import logging
import socket
# Project imports
from ChipsLog import chipsLog
from ChipsException import ChipsException
from CommonTools import uInt32HexStr
from SerDes import SerDes
from IPbusHeader import getTransactionId, getTypeId, getInfoCode, isRequestHeader, isResponseErrorHeader, interpretInfoCode
class Transaction(object):
'''Used for storing the details of a transaction in terms of serial/non-serial requests and responses'''
def __init__(self):
object.__init__(self)
self.addr = ("", 0) # Tuple of host string and port number, e.g. ("192.168.0.1", 50001)
self.requests = [] # Requests: a list of TransactionElement objects
self.serialRequests = "" # For holding the serialised requests payload string
self.responses = [] # Responses: a list of TransactionElement objects
self.serialResponses = "" # For holding the serialised responses payload string
self._serdes = SerDes() # For serialising/deserialising transactions
@classmethod
def constructHostTransaction(cls, packetPayload, clientAddr):
"""Constructs a typical host Transaction object instance
Usage: the host has just received a serial packetPayload string from a
client with clientAddr. clientAddr is a tuple of ("hostIP", portNumber),
e.g. ("192.168.0.1", 50001). Thus, you want to make a Transaction object
with this data set via constructor.
"""
transaction = cls()
transaction.addr = clientAddr
transaction.serialRequests = packetPayload
return transaction
@classmethod
def constructClientTransaction(cls, transactionElementList, targetAddr):
"""Constructs a typical client Transaction object instance
Usage: you want to send the host (i.e. hardware) a read/write/etc
request, and have already formulated the TransactionElementList that
needs to be serialised for sending to the host. The argument targetAddr
is the host's IP address and port number in the form of a tupe like this:
("hostIP", portNumber), e.g. ("192.168.0.2", 50002).
"""
transaction = cls()
transaction.addr = targetAddr
transaction.requests = transactionElementList
return transaction
def getHostAddr(self):
"""Returns the host IP address"""
return self.addr[0]
def appendRequest(self, transactionElement):
"""Append a request to the transaction list"""
self.requests.append(transactionElement)
def appendResponse(self, transactionElement):
"""Append a response to the transaction list"""
self.responses.append(transactionElement)
def serialiseRequests(self):
"""Serialise current requests list - the result is put into the serialisedRequests string"""
self.serialRequests = self._serdes.serialise(self.requests)
def serialiseResponses(self):
"""Serialise current responses list - the result is put into the serialisedResponses string"""
self.serialResponses = self._serdes.serialise(self.responses)
def deserialiseRequests(self):
"""Deserialise current serialisedRequests string - the result is put into the requests list"""
self.requests = self._serdes.deserialise(self.serialRequests)
def deserialiseResponses(self):
"""Deserialise current serialisedResponses string - the result is put into the responses list"""
self.responses = self._serdes.deserialise(self.serialResponses)
def doTransactionChecks(self):
"""Performs a generic set of client transaction checks on all transaction elements in the request/response lists"""
if len(self.requests) != len(self.responses):
raise ChipsException("Transaction error: incorrect number of responses returned!")
for iElement in range(len(self.requests)):
request = self.requests[iElement]
response = self.responses[iElement]
requestHeader = request.getHeader()
responseHeader = response.getHeader()
if getTypeId(requestHeader) != getTypeId(responseHeader):
raise ChipsException("Transaction error from '" + self.getHostAddr() + " (header=0x" + uInt32HexStr(responseHeader) + "): " \
"request and response transaction types do not match; expected '" + hex(getTypeId(requestHeader)) +
"' but got '" + hex(getTypeId(responseHeader)) + "'!")
if getTransactionId(requestHeader) != getTransactionId(responseHeader):
raise ChipsException("Transaction error from '" + self.getHostAddr() + " (header=0x" + uInt32HexStr(responseHeader) + "): " \
"request and response transactions IDs do not match; expected '" + str(getTransactionId(requestHeader)) +
"' but got '" + str(getTransactionId(responseHeader)) + "'!")
if isRequestHeader(responseHeader):
raise ChipsException("Transaction error from '" + self.getHostAddr() + " (header=0x" + uInt32HexStr(responseHeader) + "): " \
"the received response for transaction ID " + str(getTransactionId(responseHeader)) + " has an INFO CODE " \
"that declares it as a request!")
if isResponseErrorHeader(responseHeader):
raise ChipsException("Transaction error from '" + self.getHostAddr() + " (header=0x" + uInt32HexStr(responseHeader) + "): " \
"the host reports the following error occurred (INFO CODE = " + hex(getInfoCode(responseHeader)) +
") whilst processing transaction ID " + str(getTransactionId(responseHeader)) + ": '" +
interpretInfoCode(getInfoCode(responseHeader)) + "'!")
if not response.validBodySize():
raise ChipsException("Transaction error from '" + self.getHostAddr() + " (header=0x" + uInt32HexStr(responseHeader) + "): " \
"incorrect response body size for transaction number " + str(getTransactionId(responseHeader)) + "!")
'''
Created on May 17, 2010
@author: Robert Frazier
'''
from array import array
import IPbusHeader
from ChipsException import ChipsException
from CommonTools import uInt32HexStr
class TransactionElement(object):
'''Represents a single IPbus request or response (read, write, etc.).'''
def __init__(self, arrayU32):
'''Construct from a unsigned 32-bit array object'''
object.__init__(self)
if isinstance(arrayU32, array) and arrayU32.typecode == 'I' and len(arrayU32) >= 1:
# self._rawData holds header and body, to make serialisation as fast as possible
self._rawData = arrayU32
else:
raise ChipsException("Cannot construct a TransactionElement object from anything other " \
"than an unsigned 32-bit array object of length >= 1!")
@classmethod
def makeFromHeaderAndBody(cls, headerU32, bodyU32List = []):
'''Construct a TransactionElement object from a header and an optional body list.
header: A raw unsigned 32-bit header word.
bodyU32List: A list of unsigned 32-bit words (optional).
'''
arrayU32 = array('I', [headerU32] + bodyU32List)
return cls(arrayU32)
def getAll(self):
'''Returns the all the raw data of the transaction element (i.e. header & body)
Returns an array object with TypeCode = 'I' (i.e. unsigned 32-bit array).
'''
return self._rawData
def getU32TransmitSize(self):
'''Returns the transmit size of the TransactionElement in terms of number of 32-bit words'''
return len(self._rawData)
def getHeader(self):
'''Return the 32-bit header word of the TransactionElement.
Use the helper functions in the IPbusHeader module if you want to decode the returned header.
'''
return self._rawData[0]
def getBody(self):
'''Get the body of the TransactionElement.
Returns an array object with TypeCode = 'L' (i.e. unsigned 32-bit array).
'''
return self._rawData[1:]
def validBodySize(self):
'''Returns true/false if the current body size matches the size inferred by the header.'''
return (len(self._rawData) - 1) == IPbusHeader.getExpectedBodySize(self.getHeader())
def __repr__(self):
'''For representing the object in string form'''
resultString = " 0x" + uInt32HexStr(self.getHeader()) + "\tHeader\n"
for index, u32 in enumerate(self.getBody()):
resultString += " 0x" + uInt32HexStr(u32) + "\tBody " + str(index) + "\n"
return resultString
def reprTransactionElementList(transactionElementList):
"""Convenience function for getting a string representation of a list of transaction elements"""
resultString = ""
for element in transactionElementList:
resultString += repr(element)
return resultString
K 25
svn:wc:ra_dav:version-url
V 37
/svn/!svn/ver/1537/trunk/PyChips/test
END
Test_CommonTools.py
K 25
svn:wc:ra_dav:version-url
V 56
/svn/!svn/ver/473/trunk/PyChips/test/Test_CommonTools.py
END
TestSuite.py
K 25
svn:wc:ra_dav:version-url
V 49
/svn/!svn/ver/506/trunk/PyChips/test/TestSuite.py
END
Test_IPbusHeader.py
K 25
svn:wc:ra_dav:version-url
V 57
/svn/!svn/ver/1537/trunk/PyChips/test/Test_IPbusHeader.py
END
Test_AddressTableItem.py
K 25
svn:wc:ra_dav:version-url
V 61
/svn/!svn/ver/473/trunk/PyChips/test/Test_AddressTableItem.py
END
Test_AddressTable.py
K 25
svn:wc:ra_dav:version-url
V 57
/svn/!svn/ver/473/trunk/PyChips/test/Test_AddressTable.py
END
Test_TransactionElement.py
K 25
svn:wc:ra_dav:version-url
V 64
/svn/!svn/ver/1251/trunk/PyChips/test/Test_TransactionElement.py
END
Test_ChipsBus.py
K 25
svn:wc:ra_dav:version-url
V 53
/svn/!svn/ver/506/trunk/PyChips/test/Test_ChipsBus.py
END
Test_SerDes.py
K 25
svn:wc:ra_dav:version-url
V 52
/svn/!svn/ver/1537/trunk/PyChips/test/Test_SerDes.py
END
startDummyHardware.py
K 25
svn:wc:ra_dav:version-url
V 58
/svn/!svn/ver/922/trunk/PyChips/test/startDummyHardware.py
END
10
dir
1570
https://cactus.hepforge.org/svn/trunk/PyChips/test
https://cactus.hepforge.org/svn
2013-01-25T13:18:23.396423Z
1537
frazier
18a17b70-165d-4f71-a96d-00e1b61b3a60
Test_CommonTools.py
file
2013-07-29T17:57:09.000000Z
8238216e8dfe7fa161f871f6a8181a70
2011-06-01T08:35:26.606026Z
473
frazier
has-props
3525
TestSuite.py
file
2013-07-29T17:57:09.000000Z
243c00f3eef039d7f46afff85dc0ee81
2011-06-22T23:34:24.913862Z
506
frazier
has-props
798
Test_IPbusHeader.py
file
2013-07-29T17:57:09.000000Z
9244f31959b340a63594c7c442abbd68
2013-01-25T13:18:23.396423Z
1537
frazier
has-props
7822
Test_AddressTableItem.py
file
2013-07-29T17:57:09.000000Z
9f7352529e8ba0fb77db7e6637e9b02a
2011-06-01T08:35:26.606026Z
473
frazier
has-props
3716
Test_AddressTable.py
file
2013-07-29T17:57:09.000000Z
bc833bcc30a2938baf335ced9361237a
2011-06-01T08:35:26.606026Z
473
frazier
has-props
7265
Test_TransactionElement.py
file
2013-07-29T17:57:09.000000Z
b056aafeab59230e7d4de34f94dcef83
2012-10-08T13:03:50.396807Z
1251
frazier
has-props
3428
Test_ChipsBus.py
file
2013-07-29T17:57:09.000000Z
6872269c0937e1c0750084e3d6951007
2011-06-22T23:34:24.913862Z
506
frazier
has-props
12066
Test_SerDes.py
file
2013-07-29T17:57:09.000000Z
01f5d56ed8f2ae34fc38cdaea0ed5e9a
2013-01-25T13:18:23.396423Z
1537
frazier
has-props
5737
startDummyHardware.py
file
2013-07-29T17:57:09.000000Z
7c7a3412724fba9b4c9f071ff1461167
2011-12-15T17:20:49.504833Z
922
frazier
has-props
2364
K 14
svn:executable
V 1
*
K 13
svn:mime-type
V 10
text/plain
END
'''
Module for running every single test available of the PyChips source code.
NOTE: The script "startDummyHardwareUdp.py" must be already
running for all the tests to complete properly
Created on May 21, 2010
@author: Robert Frazier
'''
#************** IMPORTANT NOTE ********************
# The script "startDummyHardwareUdp.py" must be already
# running for all the tests to complete properly.
#************** IMPORTANT NOTE ********************
import unittest
from Test_CommonTools import *
from Test_IPbusHeader import *
from Test_TransactionElement import *
from Test_AddressTableItem import *
from Test_AddressTable import *
from Test_SerDes import *
from Test_ChipsBus import *
if __name__ == '__main__':
"""Runs all the tests we have"""
unittest.main()
\ No newline at end of file
'''
Tests for the AddressTable class
@author Robert Frazier
June 2010
'''
import unittest
from AddressTable import AddressTable
from ChipsException import ChipsException
import os
class Test_AddressTable(unittest.TestCase):
# Filename for the good address table.
goodAddressTable_fileName = "Test_AddressTable_goodAddressTable.txt"
# Filenames for malformed address tables
addressNotValidNumber_fileName = "Test_AddressTable_addressNotValidNumber.txt"
addressNot32Bit_fileName = "Test_AddressTable_addressNot32Bit.txt"
addressNegative_fileName = "Test_AddressTable_addressNegative.txt"
maskNotValidNumber_fileName = "Test_AddressTable_maskNotValidNumber.txt"
maskNot32Bit_fileName = "Test_AddressTable_maskNot32Bit.txt"
maskNegative_fileName = "Test_AddressTable_maskNegative.txt"
maskZero_fileName = "Test_AddressTable_maskZero.txt"
readNotValidNumber_filename = "Test_AddressTable_readNotValidNumber.txt"
writeNotValidNumber_filename = "Test_AddressTable_writeNotValidNumber.txt"
lineTooShort_fileName = "Test_AddressTable_lineTooShort.txt"
registerNameUsedTwice_fileName = "Test_AddressTable_registerNameUsedTwice.txt"
def setUp(self):
# Create the good address table
myFile = open(Test_AddressTable.goodAddressTable_fileName, 'w')
myFile.write("* This line is a comment and should be ignored\n" \
"MyRegister1 00000000 ffffffff 1 1 This text is ignored\n" \
"MyRegister2 0x00000004 0x0000ffff 1 1 Test we can prepend hex with a '0x'\n" \
"* Now we will test if a blank line and a blank line with whitespace are ignored:\n" \
"\n" \
" \n" \
"* Finally we test to see if a register is picked up even if there is no newline afterwards:\n" \
"MyRegister3 f0000000 00ffff00 1 1")
myFile.close()
# Create all the malformed address tables
myFile = open(Test_AddressTable.addressNotValidNumber_fileName, 'w')
myFile.write("MyRegister thisIsNotARealNumberButAString! 0xff000000 1 1\n")
myFile.close()
myFile = open(Test_AddressTable.addressNot32Bit_fileName, 'w')
myFile.write("MyRegister 0x100000000 0xff000000 1 1\n")
myFile.close()
myFile = open(Test_AddressTable.addressNegative_fileName, 'w')
myFile.write("MyRegister -0x12345670 0xff000000 1 1\n")
myFile.close()
myFile = open(Test_AddressTable.maskNotValidNumber_fileName, 'w')
myFile.write("MyRegister 0x00000000 thisIsNotARealNumberButAString! 1 1\n")
myFile.close()
myFile = open(Test_AddressTable.maskNot32Bit_fileName, 'w')
myFile.write("MyRegister 0x00000000 0x100000000 1 1\n")
myFile.close()
myFile = open(Test_AddressTable.maskNegative_fileName, 'w')
myFile.write("MyRegister 0xff000000 -0x12345670 1 1\n")
myFile.close()
myFile = open(Test_AddressTable.maskZero_fileName, 'w')
myFile.write("MyRegister 0x00000000 0x00000000 1 1\n")
myFile.close()
myFile = open(Test_AddressTable.readNotValidNumber_filename, 'w')
myFile.write("MyRegister 0x00000000 0xffffffff y 1\n")
myFile.close()
myFile = open(Test_AddressTable.writeNotValidNumber_filename, 'w')
myFile.write("MyRegister 0x00000000 0xffffffff 1 -1\n")
myFile.close()
myFile = open(Test_AddressTable.lineTooShort_fileName, 'w')
myFile.write("MyRegister 0x00000000 \n")
myFile.close()
myFile = open(Test_AddressTable.registerNameUsedTwice_fileName, 'w')
myFile.write("MyRegister 0x00000000 0xffffffff 1 1\n" \
"MyRegister 0x00000010 0xffffffff 1 1\n")
myFile.close()
def tearDown(self):
# Clean up after the tests are over.
os.remove(Test_AddressTable.goodAddressTable_fileName)
os.remove(Test_AddressTable.addressNotValidNumber_fileName)
os.remove(Test_AddressTable.addressNot32Bit_fileName)
os.remove(Test_AddressTable.addressNegative_fileName)
os.remove(Test_AddressTable.maskNotValidNumber_fileName)
os.remove(Test_AddressTable.maskNegative_fileName)
os.remove(Test_AddressTable.maskNot32Bit_fileName)
os.remove(Test_AddressTable.maskZero_fileName)
os.remove(Test_AddressTable.readNotValidNumber_filename)
os.remove(Test_AddressTable.writeNotValidNumber_filename)
os.remove(Test_AddressTable.lineTooShort_fileName)
os.remove(Test_AddressTable.registerNameUsedTwice_fileName)
def test_constructionWithGoodFile(self):
myAddrTable = AddressTable(Test_AddressTable.goodAddressTable_fileName)
self.assertEqual(len(myAddrTable.items), 3) # Should be three items in the address table
self.assertEqual(myAddrTable.fileName, Test_AddressTable.goodAddressTable_fileName)
def test_getItemMethod(self):
myAddrTable = AddressTable(Test_AddressTable.goodAddressTable_fileName)
item = myAddrTable.getItem("MyRegister3")
self.assertEqual(item.getName(), "MyRegister3")
self.assertEqual(item.getAddress(), 0xf0000000)
self.assertEqual(item.getMask(), 0x00ffff00)
self.assertEqual(item.getReadFlag(), 1)
self.assertEqual(item.getWriteFlag(), 1)
def test_throwsWhenAddressNotValidNumber(self):
self.assertRaises(ChipsException, AddressTable, Test_AddressTable.addressNotValidNumber_fileName)
def test_throwsWhenAddressNot32Bit(self):
self.assertRaises(ChipsException, AddressTable, Test_AddressTable.addressNot32Bit_fileName)
def test_throwsWhenAddressNegative(self):
self.assertRaises(ChipsException, AddressTable, Test_AddressTable.addressNegative_fileName)
def test_throwsWhenMaskNotValidNumber(self):
self.assertRaises(ChipsException, AddressTable, Test_AddressTable.maskNotValidNumber_fileName)
def test_throwsWhenMaskNot32Bit(self):
self.assertRaises(ChipsException, AddressTable, Test_AddressTable.maskNot32Bit_fileName)
def test_throwsWhenMaskNegative(self):
self.assertRaises(ChipsException, AddressTable, Test_AddressTable.maskNegative_fileName)
def test_throwsWhenMaskZero(self):
self.assertRaises(ChipsException, AddressTable, Test_AddressTable.maskZero_fileName)
def test_throwsWhenReadNotValidNumber(self):
self.assertRaises(ChipsException, AddressTable, Test_AddressTable.readNotValidNumber_filename)
def test_throwsWhenWriteNotValidNumber(self):
self.assertRaises(ChipsException, AddressTable, Test_AddressTable.writeNotValidNumber_filename)
def test_throwsWhenLineTooShort(self):
self.assertRaises(ChipsException, AddressTable, Test_AddressTable.lineTooShort_fileName)
def test_throwsWhenRegisterNameUsedTwice(self):
self.assertRaises(ChipsException, AddressTable, Test_AddressTable.registerNameUsedTwice_fileName)
'''
Tests for the AddressTableItem class
@author Robert Frazier
June 2010
'''
import unittest
from AddressTableItem import AddressTableItem
from ChipsException import ChipsException
class Test_AddressTableItem(unittest.TestCase):
def setUp(self):
self.testObj = AddressTableItem("myRegisterName", 0x87654320, 0x00ff0000)
def tearDown(self):
pass
def test_getName(self):
self.setUp()
self.assertEqual(self.testObj.getName(), "myRegisterName")
def test_setName(self):
self.setUp()
self.testObj.setName("myOtherRegisterName")
self.assertEqual(self.testObj.getName(), "myOtherRegisterName")
def test_getAddress(self):
self.setUp()
self.assertEqual(self.testObj.getAddress(), 0x87654320)
def test_setAddress(self):
self.setUp()
self.testObj.setAddress(0x0000000c)
self.assertEqual(self.testObj.getAddress(), 0x0000000c)
def test_getMask(self):
self.setUp()
self.assertEqual(self.testObj.getMask(), 0x00ff0000)
def test_shiftDataFromMask_afterInit(self):
self.setUp()
self.assertEqual(self.testObj.shiftDataFromMask(0xffffffff), 0xff)
def test_shiftDataToMask_afterInit(self):
self.setUp()
self.assertEqual(self.testObj.shiftDataToMask(0x81), 0x00810000)
def test_shiftDataToMask_dataBiggerThanMask(self):
self.setUp()
self.assertRaises(ChipsException, self.testObj.shiftDataToMask, 0x100)
def test_shiftDataToMask_dataBiggerThanMask2(self):
# A shiftDataToMask bug was spotted during testing of another
# module, so added this test to look for it explicitly.
localTestObj = AddressTableItem("myRegisterName", 0xffff000c, 0xff000000)
self.assertRaises(ChipsException, localTestObj.shiftDataToMask, 0x87654321)
def test_setMask(self):
self.setUp()
self.testObj.setMask(0xe0000000)
self.assertEqual(self.testObj.getMask(), 0xe0000000)
def test_shiftDataFromMask_afterSetMask(self):
self.setUp()
self.testObj.setMask(0xe0000000)
self.assertEqual(self.testObj.shiftDataFromMask(0xcfffffff), 0x6)
def test_shiftDataToMask_afterSetMask(self):
self.setUp()
self.testObj.setMask(0xe0000000)
self.assertEqual(self.testObj.shiftDataToMask(0x5), 0xa0000000)
def test_setAddress_ignoreOversizeValue(self):
self.setUp()
self.testObj.setAddress(0xf87654320)
self.assertEqual(self.testObj.getAddress(), 0x87654320)
def test_setMask_ignoreOversizeValue(self):
self.setUp()
self.testObj.setMask(0xffffffffff)
self.assertEqual(self.testObj.getMask(), 0xffffffff)
def test_ignoreOversizeArgsOnInit(self):
testObj = AddressTableItem("myRegister", 0xf00000000, 0xf0000ffff)
self.assertEqual(testObj.getAddress(), 0x00000000)
self.assertEqual(testObj.getMask(), 0x0000ffff)
def test_getDefaultRWFlags(self):
self.setUp() #The AddressTableItem defined in setUp() has no RW flags defined so takes the default value of 1 for both
self.assertEqual(self.testObj.getReadFlag(), 1)
self.assertEqual(self.testObj.getWriteFlag(), 1)
def test_getRWFlags(self):
testObj = AddressTableItem("myRegisterName", 0x87654320, 0x00ff0000, 1, 0)
self.assertEqual(testObj.getReadFlag(), 1)
self.assertEqual(testObj.getWriteFlag(), 0)
def test_setWriteFlag(self):
testObj = AddressTableItem("myRegisterName", 0x87654320, 0x00ff0000, 1, 0)
testObj.setWriteFlag(1)
self.assertEqual(testObj.getWriteFlag(), 1)
'''
Tests for the ChipsBusBase/ChipsBusUdp classes (ChipsBus.py)
@author Robert Frazier
June 2010
'''
import unittest
import os, random
from ChipsBus import ChipsBusBase, ChipsBusUdp
from ChipsException import ChipsException
from AddressTable import AddressTable
####### IMPORTANT ######
# NOTE: The script "startDummyHardware.py" must be already
# running for this test to complete properly!
##########################
class Test_ChipsBus(unittest.TestCase):
def setUp(self):
self.testObj = ChipsBusUdp(AddressTable("../addressTables/simpleTestAddrTable.txt"), "localhost", 50001)
myFile = open("Test_ChipsBus_oversizeAddrTable.txt", 'w')
for i in range(400):
myFile.write("Reg" + str(i) + " " + str(hex(0x00000000+i)) + " 0xffffffff 1 1\n")
myFile.write("ResetDummyHW 0xffffffff 0xffffffff 1 1\n")
myFile.close()
self.oversizeTestObj = ChipsBusUdp(AddressTable("Test_ChipsBus_oversizeAddrTable.txt"), "localhost", 50001)
def tearDown(self):
del(self.testObj)
del(self.oversizeTestObj)
os.remove("Test_ChipsBus_oversizeAddrTable.txt")
def test__unmaskedRead(self): # We want this test to come first out of all the tests in this module.
self.assertEqual(self.testObj.read("UnitTest_ResetDummyHW"), 0) # Mechanism to reset dummy hardware.
self.assertEqual(self.testObj.read("UnitTest1"), 0)
self.testObj.write("UnitTest1_mask1", 0x80)
self.testObj.write("UnitTest1_mask2", 0x18)
self.testObj.write("UnitTest1_mask3", 0xff0)
self.testObj.write("UnitTest1_mask4", 0x1)
self.assertEqual(self.testObj.read("UnitTest1"), 0x80181fe1)
def test_maskedRead(self):
self.testObj.read("UnitTest_ResetDummyHW") # Mechanism to reset dummy hardware.
self.testObj.write("UnitTest1_mask3", 0xff0)
self.assertEqual(self.testObj.read("UnitTest1_mask3"), 0xff0)
def test_unmaskedReadWithAddrOffset(self):
self.testObj.read("UnitTest_ResetDummyHW") # Mechanism to reset dummy hardware.
self.testObj.write("UnitTest2", 0xdeadbeef)
self.assertEqual(self.testObj.read("UnitTest1", 1), 0xdeadbeef)
def test_maskedReadWithAddrOffset(self):
self.testObj.read("UnitTest_ResetDummyHW") # Mechanism to reset dummy hardware.
self.testObj.write("UnitTest2", 0xdeadbeef)
self.assertEqual(self.testObj.read("UnitTest1_mask3", 1), 0x5f77)
def test_blockRead(self):
self.testObj.read("UnitTest_ResetDummyHW") # Mechanism to reset dummy hardware.
result = [0xdeadbeef, 0xffffffff, 0x12345678, 0x87654321,
0x89abcdef, 0xfedcba98, 0x81818181, 0x18181818]
self.testObj.write("UnitTest1", result[0])
self.testObj.write("UnitTest2", result[1])
self.testObj.write("UnitTest3", result[2])
self.testObj.write("UnitTest4", result[3])
self.testObj.write("UnitTest5", result[4])
self.testObj.write("UnitTest6", result[5])
self.testObj.write("UnitTest7", result[6])
self.testObj.write("UnitTest8", result[7])
self.assertEqual(self.testObj.blockRead("UnitTest1", 8), result)
def test_blockReadWithAddrOffset(self):
self.testObj.read("UnitTest_ResetDummyHW") # Mechanism to reset dummy hardware.
result = [0xdeadbeef, 0xffffffff, 0x12345678, 0x87654321,
0x89abcdef, 0xfedcba98, 0x81818181, 0x18181818]
self.testObj.write("UnitTest1", result[0])
self.testObj.write("UnitTest2", result[1])
self.testObj.write("UnitTest3", result[2])
self.testObj.write("UnitTest4", result[3])
self.testObj.write("UnitTest5", result[4])
self.testObj.write("UnitTest6", result[5])
self.testObj.write("UnitTest7", result[6])
self.testObj.write("UnitTest8", result[7])
self.assertEqual(self.testObj.blockRead("UnitTest1", 5, 2), result[2:-1])
def test_blockReadZeroDepthIgnored(self):
self.assertEqual(self.testObj.blockRead("UnitTest1", 0), None)
def test_blockReadNegativeDepthIgnored(self):
self.assertEqual(self.testObj.blockRead("UnitTest1", -1), None)
def test_unmaskedWrite(self):
self.testObj.write("UnitTest1", 0xdeadbeef)
self.assertEqual(self.testObj.read("UnitTest1"), 0xdeadbeef)
def test_unmaskedWriteWithAddrOffset(self):
self.testObj.write("UnitTest1", 0xdeadbeef, 4)
self.assertEqual(self.testObj.read("UnitTest2"), 0xdeadbeef)
def test_unmaskedWriteOversizeDataIgnored(self):
self.testObj.write("UnitTest1", 0xffffdeadbeef)
self.assertEqual(self.testObj.read("UnitTest1"), 0xdeadbeef)
def test_maskedWrite(self):
self.testObj.write("UnitTest1_mask2", 0x81)
self.assertEqual(self.testObj.read("UnitTest1_mask2"), 0x81)
def test_maskedWriteWithAddrOffset(self):
self.testObj.read("UnitTest_ResetDummyHW") # Mechanism to reset dummy hardware.
self.testObj.write("UnitTest1_mask2", 0xe7, 1)
self.assertEqual(self.testObj.read("UnitTest2"), 0x00e70000)
def test_oversizeMaskedWriteThrows(self):
# Want to see if requesting a write that is too large
# for the mask will throw an exception
self.assertRaises(ChipsException, self.testObj.write, "UnitTest1_mask3", 0xffff)
def test_blockWrite(self):
data = [0xdeadbeef, 0xffffffff, 0x12345678, 0x87654321,
0x89abcdef, 0xfedcba98, 0x81818181, 0x18181818]
self.testObj.blockWrite("UnitTest1", data)
self.assertEqual(self.testObj.blockRead("UnitTest1", 8), data)
def test_blockWriteWithAddrOffset(self):
data = [0xdeadbeef, 0xffffffff, 0x12345678, 0x87654321,
0x89abcdef, 0xfedcba98]
self.testObj.blockWrite("UnitTest1", data, 2)
self.assertEqual(self.testObj.blockRead("UnitTest3", 6), data)
def test_blockWriteOnMaskedRegisterThrows(self):
data = [0xdeadbeef, 0xffffffff, 0x12345678, 0x87654321,
0x89abcdef, 0xfedcba98]
self.assertRaises(ChipsException, self.testObj.blockWrite, "UnitTest1_mask1", data)
def test_oversizeBlockRead(self):
data = []
for i in range(400):
data.append(int(random.random()*0xffffffff))
self.oversizeTestObj.blockWrite("Reg0", data[:200])
self.oversizeTestObj.blockWrite("Reg200", data[200:])
self.assertEqual(self.oversizeTestObj.blockRead("Reg0",400), data)
def test_oversizeBlockWrite(self):
data = []
for i in range(400):
data.append(int(random.random()*0xffffffff))
self.oversizeTestObj.blockWrite("Reg0", data)
self.assertEqual(self.oversizeTestObj.blockRead("Reg0",200)+self.oversizeTestObj.blockRead("Reg200",200), data)
def test_getTransactionId(self):
previousTransactionId = self.testObj._getTransactionId()
transactionId = self.testObj._getTransactionId()
self.assertTrue(previousTransactionId + 1 == transactionId)
for i in range(ChipsBusBase.MAX_TRANSACTION_ID + 1):
transactionId = self.testObj._getTransactionId()
self.assertTrue(transactionId > 0 and transactionId <= ChipsBusBase.MAX_TRANSACTION_ID)
def test_queueRead(self):
self.testObj.read("UnitTest_ResetDummyHW") # Mechanism to reset dummy hardware.
data = [0xdeadbeef, 0xffffffff, 0x12345678, 0x87654321,
0x89abcdef, 0xfedcba98]
self.testObj.blockWrite("UnitTest1", data)
self.testObj.queueRead("UnitTest1")
self.testObj.queueRead("UnitTest2")
self.testObj.queueRead("UnitTest3")
self.testObj.queueRead("UnitTest4")
self.testObj.queueRead("UnitTest5")
self.testObj.queueRead("UnitTest6")
self.assertEqual(self.testObj.queueRun()[0], data)
def test_queueWrite(self):
self.testObj.read("UnitTest_ResetDummyHW") # Mechanism to reset dummy hardware.
result = [0xdeadbeef, 0xffffffff, 0x12345678, 0x87654321,
0x89abcdef, 0xfedcba98, 0x81818181, 0x18181818]
self.testObj.queueWrite("UnitTest1", result[0])
self.testObj.queueWrite("UnitTest2", result[1])
self.testObj.queueWrite("UnitTest3", result[2])
self.testObj.queueWrite("UnitTest4", result[3])
self.testObj.queueWrite("UnitTest5", result[4])
self.testObj.queueWrite("UnitTest6", result[5])
self.testObj.queueWrite("UnitTest7", result[6])
self.testObj.queueWrite("UnitTest8", result[7])
self.testObj.queueRun()
self.assertEqual(self.testObj.blockRead("UnitTest1", 8), result)
def test_queueReadWrite(self):
self.testObj.read("UnitTest_ResetDummyHW") # Mechanism to reset dummy hardware.
self.testObj.queueWrite("UnitTest1", 0x1234567)
self.testObj.queueRead("UnitTest1")
self.testObj.queueWrite("UnitTest2", 0xdeadbeef)
self.testObj.queueRead("UnitTest2")
self.assertEqual(self.testObj.queueRun()[0], [0x1234567, 0xdeadbeef])
def test_readNotAllowed(self):
self.assertRaises(ChipsException, self.testObj.read, "i2c_reset")
self.assertRaises(ChipsException, self.testObj.blockRead, "i2c_reset", 1)
def test_writeNotAllowed(self):
self.assertRaises(ChipsException, self.testObj.write, "SysId", 0xffffffff)
self.assertRaises(ChipsException, self.testObj.blockWrite, "SysId", [ 0xffffffff ])
def test_fifoRead(self):
self.testObj.write("UnitTest1", 0xf0e1d2c3) # The dummy hardware doesn't have a real fifo... just a register
self.assertEqual(self.testObj.fifoRead("UnitTest1", 4), [0xf0e1d2c3, 0xf0e1d2c3, 0xf0e1d2c3, 0xf0e1d2c3])
def test_fifoReadWithAddrOffset(self):
self.testObj.write("UnitTest1", 0x8a7b6c5d, 2)
self.assertEqual(self.testObj.fifoRead("UnitTest1", 4, 2), [0x8a7b6c5d, 0x8a7b6c5d, 0x8a7b6c5d, 0x8a7b6c5d])
def test_fifoReadOnMaskedRegisterThrows(self):
self.assertRaises(ChipsException, self.testObj.fifoRead, "UnitTest1_mask1", 2)
def test_oversizeFifoRead(self):
self.testObj.write("UnitTest1", 0xc4b2ea61)
expectedResult=[]
for i in range(400):
expectedResult.append(0xc4b2ea61)
self.assertEqual(self.testObj.fifoRead("UnitTest1", 400), expectedResult)
def test_fifoWrite(self):
self.testObj.fifoWrite("UnitTest1", [0x12345678, 0x90abcdef, 0x8a7b6c5d])
# Dummy hardware doesn't have a real FIFO. Last value written will be the value the register holds.
self.assertEqual(self.testObj.read("UnitTest1"), 0x8a7b6c5d)
def test_fifoWriteWithAddrOffset(self):
self.testObj.fifoWrite("UnitTest1", [0x65748392, 0x91827364, 0xf9e8d7c6, 0xa1b2c3d4], 6)
self.assertEqual(self.testObj.read("UnitTest7"), 0xa1b2c3d4)
def test_fifoWriteOnMaskedRegisterThrows(self):
self.assertRaises(ChipsException, self.testObj.fifoWrite, "UnitTest1_mask1", [0x90abcdef, 0x98765432, 0x56473829])
def test_oversizeFifoWrite(self):
# Reset the dummy hardware
self.oversizeTestObj.read("ResetDummyHW")
dataToWrite = []
expectedBlockReadResult = []
for i in range(400):
dataToWrite.append(0xcafebabe)
expectedBlockReadResult.append(0)
expectedBlockReadResult[0] = 0xcafebabe
# Only Reg1 should have a non-zero value, as this is a FIFO write, not a block write.
self.oversizeTestObj.fifoWrite("Reg1", dataToWrite)
# Read back with a blockRead. Reg1 will contain 0xcafebabe, Reg2 to Reg400 will be all zeros.
self.assertEqual(self.oversizeTestObj.blockRead("Reg1", 400), expectedBlockReadResult)
'''
Tests for the functions in the CommonTools.py file
@author Robert Frazier
June 2010
'''
import unittest
from CommonTools import *
from ChipsException import ChipsException
class Test_uInt32Compatible(unittest.TestCase):
def runTest(self):
self.assertTrue(uInt32Compatible(0xffffffff))
self.assertTrue(uInt32Compatible(0x00000000))
self.assertTrue(uInt32Compatible(0x87654321))
self.assertTrue(uInt32Compatible(0x12345678))
self.assertFalse(uInt32Compatible(0x100000000))
self.assertFalse(uInt32Compatible(-0x1))
class Test_uInt32HexStr(unittest.TestCase):
def runTest(self):
self.assertEqual(uInt32HexStr(0xffffffff), "ffffffff")
self.assertEqual(uInt32HexStr(0x0), "00000000")
self.assertEqual(uInt32HexStr(0x0f1e2d3c), "0f1e2d3c")
self.assertEqual(uInt32HexStr(-0x1), "ffffffff")
self.assertEqual(uInt32HexStr(0x1ffffffff), "ffffffff") # Test input over 32-bit.
self.assertEqual(uInt32HexStr(-0xfffffff3124158d), "cedbea73") # Test negative input over 32-bit.
self.assertRaises(ChipsException, uInt32HexStr, 0x1ffffffff, True)
self.assertRaises(ChipsException, uInt32HexStr, -1, True)
class Test_uInt32BitFlip(unittest.TestCase):
def runTest(self):
# Various tests to see if it's flipping bits correctly
self.assertEqual(uInt32BitFlip(0xffffffff), 0)
self.assertEqual(uInt32BitFlip(0x0), 0xffffffff)
self.assertEqual(uInt32BitFlip(0x12345678), 0xedcba987)
self.assertEqual(uInt32BitFlip(-0x1), 0)
self.assertEqual(uInt32BitFlip(-0xdeadbef0), 0xdeadbeef)
self.assertEqual(uInt32BitFlip(0x123456789abcdef), 0x76543210) # Test ignores anything other than bits 0-31
# The below are tests to ensure that the number it returns is a valid uInt32
# (i.e. not negative, and not ever using more than 32 bits.)
# The above tests are testing equality but in a pythonic way,
# whereby (for example) 0xffffffff and -0x1 are equivalent. I.e.
# the above tests don't care if uInt32BitFlip returns a negative
# number, but we want to ensure that uInt32BitFlip does not return
# negative numbers. So, we make use of the uInt32Compatible function.
self.assertTrue(uInt32Compatible(uInt32BitFlip(0)))
self.assertTrue(uInt32Compatible(uInt32BitFlip(0xffffffff)))
self.assertTrue(uInt32Compatible(uInt32BitFlip(0x123456789abcedf))) # Test returns only a 32-bit number.
class Test_uInt32HexListStr(unittest.TestCase):
def runTest(self):
testList = [0xdeadbeef, 0xcafebabe, 0x0, 0x1, 0x80000000, 0xffffffff]
expectedResult = "\n\tdeadbeef\n\tcafebabe\n\t00000000\n\t00000001\n\t80000000\n\tffffffff\n"
self.assertEqual(uInt32HexListStr(testList), expectedResult)
class Test_uInt32HexDualListStr(unittest.TestCase):
def runTest(self):
testList1 = [0xdeadbeef, 0xcafebabe, 0x0, 0x0, 0x80000000, 0xffffffff]
testList2 = [0xfeadbeef, 0xcafebabe, 0x0, 0x1, 0x80000000, 0xffffffff]
expectedResult = "\n\tdeadbeef != feadbeef" \
"\n\tcafebabe cafebabe" \
"\n\t00000000 00000000" \
"\n\t00000000 != 00000001" \
"\n\t80000000 80000000" \
"\n\tffffffff ffffffff\n"
self.assertEqual(uInt32HexDualListStr(testList1, testList2), expectedResult)
'''
Unit test for the TransactionElementHeader class
Created on May 21, 2010
@author: Robert Frazier
'''
import unittest
from IPbusHeader import *
from ChipsException import ChipsException
class Test_IPbusHeader(unittest.TestCase):
def setUp(self):
self.arbitraryHeaderU32 = 0xd8fb7d9e
self.makeHeaderU32 = makeHeader(5, 0x932, 0x9c, TYPE_ID_NON_INCR_READ, INFO_CODE_BUS_ERR_ON_WR)
def tearDown(self):
pass
def testVersionMethod_arbitraryHeader(self):
self.assertEqual(0xd, getVersion(self.arbitraryHeaderU32))
def testVersionMethod_makeHeader(self):
self.assertEqual(0x5, getVersion(self.makeHeaderU32))
def testWordsMethod_arbitraryHeader(self):
self.assertEqual(0x8fb, getTransactionId(self.arbitraryHeaderU32))
def testWordsMethod_makeHeader(self):
self.assertEqual(0x932, getTransactionId(self.makeHeaderU32))
def testTransactionIdMethod_arbitraryHeader(self):
self.assertEqual(0x7d, getWords(self.arbitraryHeaderU32))
def testTransactionIdMethod_makeHeader(self):
self.assertEqual(0x9c, getWords(self.makeHeaderU32))
def testTypeMethod_arbitraryHeader(self):
self.assertEqual(0x9, getTypeId(self.arbitraryHeaderU32))
def testTypeMethod_makeHeader(self):
self.assertEqual(0x2, getTypeId(self.makeHeaderU32))
def testInfoCodeMethod_arbitraryHeader(self):
self.assertEqual(0xe, getInfoCode(self.arbitraryHeaderU32))
def testInfoCodeMethod_makeHeader(self):
self.assertEqual(0x3, getInfoCode(self.makeHeaderU32))
def testIsRequestHeader(self):
self.assertTrue(isRequestHeader(0x0000000f))
self.assertFalse(isRequestHeader(0x00000000))
self.assertFalse(isRequestHeader(0x00000001))
self.assertFalse(isRequestHeader(0x0000000e))
def testIsResponseHeader(self):
self.assertTrue(isResponseHeader(0x9abcdef0))
self.assertTrue(isResponseHeader(0x9abcdef3))
self.assertTrue(isResponseHeader(0x9abcdefe))
self.assertFalse(isResponseHeader(0x0000000f))
def testIsResponseSuccessHeader(self):
self.assertTrue(isResponseSuccessHeader(0x9abcdef0))
self.assertFalse(isResponseSuccessHeader(0x0000000f))
self.assertFalse(isResponseSuccessHeader(0x9abcedf1))
def testIsResponseErrorHeader(self):
self.assertFalse(isResponseErrorHeader(0x9abcdef0))
self.assertTrue(isResponseErrorHeader(0x9abcdef1))
self.assertTrue(isResponseErrorHeader(0x0000000e))
self.assertTrue(isResponseErrorHeader(0x00000003))
self.assertFalse(isResponseErrorHeader(0x0000000f))
def testInterpretInfoCode(self):
self.assertEqual("The client request was handled successfully", interpretInfoCode(INFO_CODE_RESPONSE))
self.assertEqual("The client request resulted in a bus-write timeout", interpretInfoCode(INFO_CODE_BUS_TIMEOUT_ON_WR))
self.assertEqual("Outgoing client request", interpretInfoCode(INFO_CODE_REQUEST))
self.assertEqual("The IPbus header Info Code '0xe' is undefined/reserved!", interpretInfoCode(0xe))
def testExpectedBodySizeThrowsWithBadHeader(self):
self.assertRaises(ChipsException, getExpectedBodySize, self.arbitraryHeaderU32)
def testExpectedBodySize_readRequest(self):
testHeaderU32 = makeHeader(1, 1, 5, TYPE_ID_READ, INFO_CODE_REQUEST)
self.assertEqual(1, getExpectedBodySize(testHeaderU32))
def testExpectedBodySize_writeRequest(self):
testHeaderU32 = makeHeader(1, 1, 5, TYPE_ID_WRITE, INFO_CODE_REQUEST)
self.assertEqual(6, getExpectedBodySize(testHeaderU32))
def testExpectedBodySize_nonIncrementingReadRequest(self):
testHeaderU32 = makeHeader(1, 1, 5, TYPE_ID_NON_INCR_READ, INFO_CODE_REQUEST)
self.assertEqual(1, getExpectedBodySize(testHeaderU32))
def testExpectedBodySize_nonIncrementingWriteRequest(self):
testHeaderU32 = makeHeader(1, 1, 5, TYPE_ID_NON_INCR_WRITE, INFO_CODE_REQUEST)
self.assertEqual(6, getExpectedBodySize(testHeaderU32))
def testExpectedBodySize_readWriteModifyBitsRequest(self):
testHeaderU32 = makeHeader(1, 1, 5, TYPE_ID_RMW_BITS, INFO_CODE_REQUEST)
self.assertEqual(3, getExpectedBodySize(testHeaderU32))
def testExpectedBodySize_readWriteModifySumRequest(self):
testHeaderU32 = makeHeader(1, 1, 5, TYPE_ID_RMW_SUM, INFO_CODE_REQUEST)
self.assertEqual(2, getExpectedBodySize(testHeaderU32))
def testExpectedBodySize_reservedAddressInfoRequest(self):
testHeaderU32 = makeHeader(1, 1, 5, TYPE_ID_RSVD_ADDR_INFO, INFO_CODE_REQUEST)
self.assertEqual(0, getExpectedBodySize(testHeaderU32))
def testExpectedBodySize_readResponse(self):
testHeaderU32 = makeHeader(1, 1, 5, TYPE_ID_READ, INFO_CODE_RESPONSE)
self.assertEqual(5, getExpectedBodySize(testHeaderU32))
def testExpectedBodySize_writeResponse(self):
testHeaderU32 = makeHeader(1, 1, 5, TYPE_ID_WRITE, INFO_CODE_RESPONSE)
self.assertEqual(0, getExpectedBodySize(testHeaderU32))
def testExpectedBodySize_nonIncrementingReadResponse(self):
testHeaderU32 = makeHeader(1, 1, 5, TYPE_ID_NON_INCR_READ, INFO_CODE_RESPONSE)
self.assertEqual(5, getExpectedBodySize(testHeaderU32))
def testExpectedBodySize_nonIncrementingWriteResponse(self):
testHeaderU32 = makeHeader(1, 1, 5, TYPE_ID_NON_INCR_WRITE, INFO_CODE_RESPONSE)
self.assertEqual(0, getExpectedBodySize(testHeaderU32))
def testExpectedBodySize_readWriteModifyBitsResponse(self):
testHeaderU32 = makeHeader(1, 1, 5, TYPE_ID_RMW_BITS, INFO_CODE_RESPONSE)
self.assertEqual(1, getExpectedBodySize(testHeaderU32))
def testExpectedBodySize_readWriteModifySumResponse(self):
testHeaderU32 = makeHeader(1, 1, 5, TYPE_ID_RMW_SUM, INFO_CODE_RESPONSE)
self.assertEqual(1, getExpectedBodySize(testHeaderU32))
def testExpectedBodySize_reservedAddressInfoResponse(self):
testHeaderU32 = makeHeader(1, 1, 5, TYPE_ID_RSVD_ADDR_INFO, INFO_CODE_RESPONSE)
self.assertEqual(2, getExpectedBodySize(testHeaderU32))
def testUpdateWords(self):
testHeaderU32_orig = makeHeader(1, 1, 0xff, 0x3, 0)
testHeaderU32_update1 = updateWords(testHeaderU32_orig, 129)
self.assertEqual(129, getWords(testHeaderU32_update1))
testHeaderU32_update2 = updateWords(testHeaderU32_orig, 0)
self.assertEqual(0, getWords(testHeaderU32_update2))
testHeaderU32_update3 = updateWords(testHeaderU32_update2, 0xfffffff) # silly input
self.assertEqual(0xff, getWords(testHeaderU32_update3))
self.assertEqual(testHeaderU32_orig, testHeaderU32_update3) # if the silly input hasn't overwritten anything else, then the original should equal update3
def testUpdateInfoCode(self):
testHeaderU32_orig = makeHeader(1, 5, 1, 0x2, INFO_CODE_REQUEST)
testHeaderU32_update1 = updateInfoCode(testHeaderU32_orig, 0)
self.assertEqual(0, getInfoCode(testHeaderU32_update1))
testHeaderU32_update2 = updateInfoCode(testHeaderU32_update1, INFO_CODE_BAD_HEADER)
self.assertEqual(1, getInfoCode(testHeaderU32_update2))
testHeaderU32_update3 = updateInfoCode(testHeaderU32_update2, 0xfffffffffffff) # silly input
self.assertEqual(INFO_CODE_REQUEST, getInfoCode(testHeaderU32_update3))
self.assertEqual(testHeaderU32_orig, testHeaderU32_update3) # if the silly input hasn't overwritten anything else, then update1 should equal update3
def testSillyConstruction_fromComponents(self):
testHeaderU32 = makeHeader(0xffb, 0xfa5a, 0xfe5, 0x777, 0x5a)
self.assertEqual(0xba5ae57a, testHeaderU32)
'''
Tests for the ChipsBusBase/ChipsBusUdp classes (ChipsBus.py)
@author Robert Frazier
June 2010
'''
import unittest
from array import array
from ChipsBus import ChipsBusBase
from ChipsException import ChipsException
from TransactionElement import TransactionElement
from SerDes import SerDes
class Test_SerDes(unittest.TestCase):
def setUp(self):
pass
def tearDown(self):
pass
def test_serialisation(self):
# Tests that the serialiser is producing the expected result
serdes = SerDes()
arbitraryTransaction1 = TransactionElement(array('I', [0x0ddba115, 0xdeafbabe, 0xbeefcafe]))
arbitraryTransaction2 = TransactionElement(array('I', [0x19283746, 0x12345678, 0x8181f0f0]))
transactionElementList = [arbitraryTransaction1, arbitraryTransaction2]
serialisedData = serdes.serialise(transactionElementList)
# TEMPORARY IPBUS V2.x HACK! Add a little-endian packet header of [0xf0000020] to the beginning of each packet.
expectedResult = "\xf0\x00\x00\x20" \
"\x15\xa1\xdb\x0d\xbe\xba\xaf\xde\xfe\xca\xef\xbe" \
"\x46\x37\x28\x19\x78\x56\x34\x12\xf0\xf0\x81\x81"
self.assertEqual(expectedResult, serialisedData)
def test_deserilisation_throwsOnBadInputStringLength(self):
# Tests that the deserialiser throws when given an input string that
# cannot be translated into an integer number of 32-bit values.
inputString_5bytes = "\xf0\x00\x00\x20\x11"
inputString_6bytes = "\xf0\x00\x00\x20\x11\x22"
inputString_7bytes = "\xf0\x00\x00\x20\x11\x22\x33"
serdes = SerDes()
self.assertRaises(ChipsException, serdes.deserialise, inputString_5bytes)
self.assertRaises(ChipsException, serdes.deserialise, inputString_6bytes)
self.assertRaises(ChipsException, serdes.deserialise, inputString_7bytes)
def test_deserialisation_throwsOnMalformedIPbusPacket(self):
# A valid packet in terms of integer number of 32-bit words, but it's
# not a valid IPbus packet
inputString = "\xf0\x00\x00\x20\x15\xa1\xdb\x07"
serdes = SerDes()
self.assertRaises(ChipsException, serdes.deserialise, inputString)
def test_deserialisation_bigEndianValidIPbusPacket(self):
# Tests big-endian deserialisation with big-endian byte-order header
# TEMPORARY IPBUS V2.x HACK! Add a packet header of [0x200000f0] to the beginning of each packet.
inputString = "\x20\x00\x00\xf0" \
"\x20\x06\x0c\x0f\xba\x5e\xad\xd4"
serdes = SerDes()
blockReadTransaction_depth6 = TransactionElement(array('I', [0x20060c0f, 0xba5eadd4]))
expectedResult = [blockReadTransaction_depth6]
deserialisedResult = serdes.deserialise(inputString)
# Have to delve into the content of each transaction element, as haven't
# implemented a direct comparison of two transaction element objects.
self.assertEqual(expectedResult[0].getAll(), deserialisedResult[0].getAll())
def test_deserialisation_littleEndianValidIPbusPacket(self):
# Tests little-endian deserialisation with little-endian byte-order header
# TEMPORARY IPBUS V2.x HACK! Add a little-endian packet header of [0xf0000020] to the beginning of each packet.
inputString = "\xf0\x00\x00\x20" \
"\x0f\x0c\x06\x20\xd4\xad\x5e\xba"
serdes = SerDes()
blockReadTransaction_depth6 = TransactionElement(array('I', [0x20060c0f, 0xba5eadd4]))
expectedResult = [blockReadTransaction_depth6]
deserialisedResult = serdes.deserialise(inputString)
# Have to delve into the content of each transaction element, as haven't
# implemented a direct comparison of two transaction element objects.
self.assertEqual(expectedResult[0].getAll(), deserialisedResult[0].getAll())
def test_deserialiseThenSerialise_bigEndian(self):
# Tests we get back what we started with, using a big-endian byte-order header
# Specifically, this tests that the serialiser correctly remembers the
# byte-order "state". If the deserialiser does a byte-swap, then the serialiser
# should also perform a byte-swap. If deserialiser doesn't perform a byte-swap,
# then neither does the serialiser.
# TEMPORARY IPBUS V2.x HACK! Add a packet header of [0x200000f0] to the beginning of each packet.
originalInput = "\x20\x00\x00\xf0\x20\x06\x0c\x0f\xba\x5e\xad\xd4"
serdes = SerDes()
deserialisedData = serdes.deserialise(originalInput)
reserialisedData = serdes.serialise(deserialisedData)
self.assertEqual(originalInput, reserialisedData)
def test_deserialiseThenSerialise_littleEndian(self):
# Tests we get back what we started with, using a little-endian byte-order header
# Specifically, this tests that the serialiser correctly remembers the
# byte-order "state". If the deserialiser does a byte-swap, then the serialiser
# should also perform a byte-swap. If deserialiser doesn't perform a byte-swap,
# then neither does the serialiser.
# TEMPORARY IPBUS V2.x HACK! Add a packet header of [0x200000f0] to the beginning of each packet.
originalInput = "\xf0\x00\x00\x20\x0f\x0c\x06\x20\xd4\xad\x5e\xba"
serdes = SerDes()
deserialisedData = serdes.deserialise(originalInput)
reserialisedData = serdes.serialise(deserialisedData)
self.assertEqual(originalInput, reserialisedData)
'''
Unit test for the TransactionElementHeader class
Created on May 21, 2010
@author: Robert Frazier
'''
import unittest
from array import array
from TransactionElement import TransactionElement
from IPbusHeader import makeHeader, TYPE_ID_WRITE, TYPE_ID_NON_INCR_WRITE, INFO_CODE_REQUEST
from ChipsException import ChipsException
class Test_TransactionElement(unittest.TestCase):
def setUp(self):
# The test object I'm making here from the normal constructor is just totally
# arbitrary - it doesn't conform to the IPbus spec at all.
self.badTransactionElementArray = array('I', [0x82345671, 0xdeadbeef, 0xcafebabe, 0x0ddba115])
self.badTestObjFromInit = TransactionElement(self.badTransactionElementArray)
# The test object I'm making here is done via the makeFromHeaderAndBody() named
# constructor, and is a proper IPbus transaction that conforms to the spec.
goodHeader = makeHeader(2, 0x32, 0x5, TYPE_ID_WRITE, INFO_CODE_REQUEST)
self.goodBodyList = [0x30303030, 0xdeadbeef, 0xcafebabe, 0xbabecafe, 0xbeefbabe, 0xbeefba11]
self.goodTestObjFromHeaderAndBody = TransactionElement.makeFromHeaderAndBody(goodHeader, self.goodBodyList)
def tearDown(self):
pass
def testGetAll_fromInit(self):
self.assertEqual(self.badTransactionElementArray, self.badTestObjFromInit.getAll())
def testGetAll_fromHeaderAndBody(self):
self.assertEqual(array('I', [0x2032051f] + self.goodBodyList), self.goodTestObjFromHeaderAndBody.getAll())
def testGetU32TransmitSize(self):
self.assertEqual(7, self.goodTestObjFromHeaderAndBody.getU32TransmitSize())
def testGetHeader_fromInit(self):
self.assertEqual(0x82345671, self.badTestObjFromInit.getHeader())
def testGetHeader_fromHeaderAndBody(self):
self.assertEqual(0x2032051f, self.goodTestObjFromHeaderAndBody.getHeader())
def testGetBody_fromInit(self):
self.assertEqual(array('I', [0xdeadbeef, 0xcafebabe, 0x0ddba115]), self.badTestObjFromInit.getBody())
def testGetBody_fromHeaderAndBody(self):
self.assertEqual(array('I', self.goodBodyList), self.goodTestObjFromHeaderAndBody.getBody())
def testValidBodySize(self):
validHeader = makeHeader(2, 0x32, 5, TYPE_ID_NON_INCR_WRITE, INFO_CODE_REQUEST)
badBody = [0xab5ba115]
testObj_validHeaderBadBodySize = TransactionElement.makeFromHeaderAndBody(validHeader, badBody)
self.assertEqual(True, self.goodTestObjFromHeaderAndBody.validBodySize())
self.assertEqual(False, testObj_validHeaderBadBodySize.validBodySize())
self.assertRaises(ChipsException, self.badTestObjFromInit.validBodySize)
def testStringRepresentation(self):
expectedResult = " 0x2032051f\tHeader\n"\
" 0x30303030\tBody 0\n"\
" 0xdeadbeef\tBody 1\n"\
" 0xcafebabe\tBody 2\n"\
" 0xbabecafe\tBody 3\n"\
" 0xbeefbabe\tBody 4\n"\
" 0xbeefba11\tBody 5\n"
testStr = str(self.goodTestObjFromHeaderAndBody)
testRepr = repr(self.goodTestObjFromHeaderAndBody)
self.assertEqual(expectedResult, testStr)
self.assertEqual(expectedResult, testRepr)
#! /usr/bin/env python
'''
startDummyHardware.py
---------------------
Starts the IPbus Dummy Hardware. Use ctrl-c to stop it from running.
By default it runs in UDP mode on port 50001 with a "quiet" amount of logging.
To run in different modes, see the command-line options via:
./startDummyHardware.py -h
Created on Dec 15, 2011
@author: Robert Frazier
'''
# Python imports
from optparse import OptionParser
# Project imports
from DummyHardware import DummyHardwareUdp, DummyHardwareTcp
from ChipsLog import chipsLog, logging
if __name__ == "__main__":
# Option parser stuff
parser = OptionParser(usage = "%prog [options]\n\n"
"IPbus Dummy Hardware Server help\n"
"--------------------------------")
parser.add_option("-v", "--verbose", action="store_true", dest="verbose", default=False,
help="turn on verbose mode: prints incoming/outgoing packets to the console")
parser.add_option("-p", "--port", action="store", type="int", dest="port", default=50001,
help="port number the dummy hardware will listen to (default = %default)")
parser.add_option("-u", "--udp", action="store_true", dest="udp", default=True,
help="UDP mode (the default): the dummy hardware will listen for UDP packets")
parser.add_option("-t", "--tcp", action="store_false", dest="udp",
help="TCP mode: the dummy hardware will listen for TCP packets")
(options, args) = parser.parse_args()
# Print out what is about to happen
protocol = "UDP"
if not options.udp: # If we aren't using UDP, then it must be TCP
protocol = "TCP"
print "IPbus Dummy Hardware will listen for", protocol, "packets on 'localhost' port " + str(options.port) + "...\n"
# Turn on debug mode if the verbose option was selected
if options.verbose: chipsLog.setLevel(logging.DEBUG)
else: chipsLog.setLevel(logging.INFO)
dummyHardware = None
if options.udp:
dummyHardware = DummyHardwareUdp(port = options.port)
else:
dummyHardware = DummyHardwareTcp(port = options.port)
dummyHardware.serveForever() # Ctrl-c will stop it serving forever.
dummyHardware.closeSockets() # Tidy up any open sockets we no-longer need.
del(dummyHardware)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment