#! /usr/bin/python # coding: utf8 # Copyright CERN, 2014 # Author: Julian Lewis # Theodor Stana # Licence: GPL v2 or later. # Website: http://www.ohwr.org import sys import time from ctypes import * import os, errno, re, sys, struct import os.path from ptsexcept import * import socket from socket import SHUT_RDWR import binascii from ptsdefine import * class BusException(Exception): pass class BusWarning(Exception): pass class VME(object): def __init__(self,lun): """ The vmeio driver lun (logical unit). At driver install time, insmod maps lun on to VME (csr, application window, interrupts). Lun is set when creating a VME object """ self.lib = CDLL('./libvv_pts.so') int = 0 if type(lun) == type(int): self.lun = lun else: self.lun = int raise BusWarning("Warning: VME __init__: Bad lun, default to 0") def vv_init(self): """ Initialize the library loads the libvv_pts.so dynamic library opens vmeio driver for lun 0 prepares the svec bootloader """ self.handle = cast(self.lib.vv_init(self.lun), c_void_p) if self.handle == 0: raise BusException("Failed vv_init: Can't initialize VME library") return self.handle def vv_close(self): """ Close the driver and free up resources """ cc = self.lib.vv_close(self.handle) self.handle = 0 if cc != 0: raise BusException("Failed vv_close: Can't close VME library") return cc def vv_load(self, bit_stream, id): """ Load the FPGA reads the FPGA bitstream image file initializes the svec bootloader loads the FPGA image initializes the loaded VME core for lun 0 """ cc = self.lib.vv_load(self.handle, bit_stream, id) if cc == -1: raise BusException("Failed vv_load: Can't load bit_stream: %s" % (bit_stream)) return cc def vv_write(self, byte_offset, value): """ Write to the application FPGA wishbone bus The byte offset will be aligned to D32 The value is a 32 bit integer """ x = c_int(value) cc = self.lib.vv_write(self.handle, byte_offset, byref(x), 4) if cc != 0: raise BusException("Failed vv_write: offset:0x%X value:%d" % (byte_offset, value)) return cc def vv_write_array(self, byte_offset, buf, size): """ Write an array of data from the string array buf size is the number of bytes to write aligned D32 """ cc = self.lib.vv_write(self.handle, byte_offset, id(buf), size) if cc != 0: raise BusException("Failed vv_write_array: offset:0x%X size:%d" % (byte_offset, size)) return cc def vv_read(self, byte_offset): """ Read from the application FPGA wishbone bus The byte offset will be aligned to D32 The value will contain the 32 bit integer read """ x = c_int(0) cc = self.lib.vv_read(self.handle, byte_offset, byref(x), 4) value = x.value if cc != 0: raise BusException("Failed vv_read: offset:0x%X" % (byte_offset)) return value & 0xFFFFFFFF def vv_read_array(self, byte_offset, buf, size): """ Read size bytes into the string array buf The byte_offset and size will be D32 aligned """ cc = self.lib.vv_read(self.handle, byte_offset, id(buf), size) if cc != 0: raise BusException("Failed vv_read_array: offset:0x%X size:%d" % (byte_offset, size)) return cc def vv_irqwait(self): """ Wait for an interrupt An ISR is installed and reads the interrupt source register from the FPGA application If no interrupt occurs after one second the return is -1 and errno is ETIME """ x = c_int(0) cc = self.lib.vv_irqwait(self.handle, byref(x)) irq_src = x.value if cc != 0: raise BusException("Failed vv_irqwait: No interrupt") return irq_src class PCI(object): def __init__(self,lun): """ The pciio driver lun (logical unit). """ self.lib = CDLL('./libvv_pts.so') int = 0 if type(lun) == type(int): self.lun = lun else: self.lun = int raise BusWarning("Warning: PCI __init__: Bad lun, default to 0") class SKT(object): def __init__(self,lun): """ Telnet access over a socket to ELMA I2C bus """ int = ELMASLOT if type(lun) == type(int): self.lun = lun else: self.lun = int raise BusWarning("Warning: SKT __init__: Bad lun=(slot), default to %s" % int) self.base = 0; s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((ELMAIP, 23)) s.recv(256) s.send("admin\r\n") s.recv(256) s.send(ELMAPWD + "\r\n") s.recv(256) self.handle = s # get crate firmware version, to apply proper address in readreg/writereg self.handle.send("version\r\n") ver = self.handle.recv(256) pos = ver.find("Software version") if (pos == -1): print("Unexpected response from \"version\" command, exiting...") self.close() sys.exit(2) ver = float(ver[pos+17:pos+21]) self.ver = ver def vv_write(self, byte_offset, value): """ Write to the application FPGA via ELMA telnet The byte offset will be aligned to D32 The value is a 32 bit integer """ try: cm = "writereg %d %x %x\r\n" % (self.lun, byte_offset, value) if (self.ver < 2.27): rn = byte_offset/4 + 1 cm = "writereg %d %d %x\r\n" % (self.lun,rn,value) #print "vv_write:Debug:cm:%s\n" % (cm) self.handle.send(cm) except Exception as e: msg = "vv_write: No reply from register at address 0x%03x " % (byte_offset) raise BusException(msg) return self.handle.recv(256).find("Done") def vv_read(self, byte_offset): """ Read from the application FPGA via ELMA telnet The byte offset will be aligned to D32 The value will contain the 32 bit integer read """ try: cm = "readreg %d %x\r\n" % (self.lun, byte_offset) if (self.ver < 2.27): rn = byte_offset/4 + 1 cm = "readreg %d %d\r\n" % (self.lun,rn) #print "vv_read:Debug:cm:%s\n" % (cm) self.handle.send(cm) orig = self.handle.recv(256) rp = orig rp = rp.split(" ")[3] rp = rp.split("\n")[0] rp = int(rp,16) except Exception as e: msg = "vv_read: No reply from register at address 0x%03x " % (byte_offset) raise BusException(msg) return rp def vv_load(self): """ Load the FPGA, its sort of a NO-OP in SKT class """ bid = self.vv_read(BIDR) bid = binascii.unhexlify("%s" % "{0:x}".format(bid)) if bid in BIDR_ARR: return 0 else: raise BusException("Failed vv_load: FPGA: Bad board ID: %s" % bid) return 2 def vv_init(self): """ Init the library, its a NO-OP in SKT class """ return self.handle def vv_close(self): """ Close the socket """ self.handle.shutdown(SHUT_RDWR) self.handle.close() self.handle = 0 return 0