vv_pts.py 7.69 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241
#!   /usr/bin/python
#    coding: utf8

# Copyright CERN, 2014
# Author: Julian Lewis <julian.lewis@cern.ch>
#         Theodor Stana <t.stana@cern.ch>
# Licence: GPL v2 or later.
# Website: http://www.ohwr.org

import sys
import time
from ctypes import *
import os, errno, re, sys, struct
import os.path
from ptsexcept import *
import socket
from socket import SHUT_RDWR
import binascii

from ptsdefine import *

class BusException(Exception):
    pass

class BusWarning(Exception):
    pass

class VME(object):

    def __init__(self,lun):
        """ The vmeio driver lun (logical unit).
            At driver install time, insmod maps lun on to
            VME (csr, application window, interrupts).
            Lun is set when creating a VME object
        """
        self.lib = CDLL('./libvv_pts.so')
        int = 0
        if type(lun) == type(int):
            self.lun = lun
        else:
            self.lun = int
            raise BusWarning("Warning: VME __init__: Bad lun, default to 0")

    def vv_init(self):
        """ Initialize the library
            loads the libvv_pts.so dynamic library
            opens vmeio driver for lun 0
            prepares the svec bootloader
        """
        self.handle = cast(self.lib.vv_init(self.lun), c_void_p)
        if self.handle == 0:
            raise BusException("Failed vv_init: Can't initialize VME library")
        return self.handle

    def vv_close(self):
        """ Close the driver and free up resources
        """
        cc = self.lib.vv_close(self.handle)
        self.handle = 0
        if cc != 0:
            raise BusException("Failed vv_close: Can't close VME library")
        return cc

    def vv_load(self, bit_stream, id):
        """ Load the FPGA
            reads the FPGA bitstream image file
            initializes the svec bootloader
            loads the FPGA image
            initializes the loaded VME core for lun 0
        """
        cc = self.lib.vv_load(self.handle, bit_stream, id)
        if cc == -1:
            raise BusException("Failed vv_load: Can't load bit_stream: %s" % (bit_stream))
        return cc

    def vv_write(self, byte_offset, value):
        """ Write to the application FPGA wishbone bus
            The byte offset will be aligned to D32
            The value is a 32 bit integer
        """
        x = c_int(value)
        cc = self.lib.vv_write(self.handle, byte_offset, byref(x), 4)
        if cc != 0:
            raise BusException("Failed vv_write: offset:0x%X value:%d" % (byte_offset, value))
        return cc

    def vv_write_array(self, byte_offset, buf, size):
        """ Write an array of data from the string array buf
            size is the number of bytes to write aligned D32
        """
        cc = self.lib.vv_write(self.handle, byte_offset, id(buf), size)
        if cc != 0:
            raise BusException("Failed vv_write_array: offset:0x%X size:%d" % (byte_offset, size))
        return cc

    def vv_read(self, byte_offset):
        """ Read from the application FPGA wishbone bus
            The byte offset will be aligned to D32
            The value will contain the 32 bit integer read
        """
        x = c_int(0)
        cc = self.lib.vv_read(self.handle, byte_offset, byref(x), 4)
        value = x.value
        if cc != 0:
            raise BusException("Failed vv_read: offset:0x%X" % (byte_offset))
        return value & 0xFFFFFFFF

    def vv_read_array(self, byte_offset, buf, size):
        """ Read size bytes into the string array buf
            The byte_offset and size will be D32 aligned
        """
        cc = self.lib.vv_read(self.handle, byte_offset, id(buf), size)
        if cc != 0:
            raise BusException("Failed vv_read_array: offset:0x%X size:%d" % (byte_offset, size))
        return cc

    def vv_irqwait(self):
        """ Wait for an interrupt
            An ISR is installed and reads the interrupt source register from the FPGA application
            If no interrupt occurs after one second the return is -1 and errno is ETIME
        """
        x = c_int(0)
        cc = self.lib.vv_irqwait(self.handle, byref(x))
        irq_src = x.value
        if cc != 0:
            raise BusException("Failed vv_irqwait: No interrupt")
        return irq_src

class PCI(object):

    def __init__(self,lun):
        """ The pciio driver lun (logical unit).
        """
        self.lib = CDLL('./libvv_pts.so')
        int = 0
        if type(lun) == type(int):
            self.lun = lun
        else:
            self.lun = int
            raise BusWarning("Warning: PCI __init__: Bad lun, default to 0")

class SKT(object):

    def __init__(self,lun):
        """ Telnet access over a socket to ELMA I2C bus
        """
        int = ELMASLOT
        if type(lun) == type(int):
            self.lun = lun
        else:
            self.lun = int
            raise BusWarning("Warning: SKT __init__: Bad lun=(slot), default to %s" % int)

        self.base = 0;
        
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.connect((ELMAIP, 23))
        s.recv(256)
        s.send("admin\r\n")
        s.recv(256)
        s.send(ELMAPWD + "\r\n")
        s.recv(256)
        self.handle = s

        # get crate firmware version, to apply proper address in readreg/writereg
        self.handle.send("version\r\n")
        ver = self.handle.recv(256)
        pos = ver.find("Software version")
        if (pos == -1):
            print("Unexpected response from \"version\" command, exiting...")
            self.close()
            sys.exit(2)
        ver = float(ver[pos+17:pos+21])
        self.ver = ver

    def vv_write(self, byte_offset, value):
        """ Write to the application FPGA via ELMA telnet
            The byte offset will be aligned to D32
            The value is a 32 bit integer
        """
        try:
            cm = "writereg %d %x %x\r\n" % (self.lun, byte_offset, value)
            if (self.ver < 2.27):
                rn = byte_offset/4 + 1
                cm = "writereg %d %d %x\r\n" % (self.lun,rn,value)
            #print "vv_write:Debug:cm:%s\n" % (cm)
            self.handle.send(cm)
        except Exception as e:
            msg = "vv_write: No reply from register at address 0x%03x " % (byte_offset)
            raise BusException(msg)

        return self.handle.recv(256).find("Done")

    def vv_read(self, byte_offset):
        """ Read from the application FPGA via ELMA telnet
            The byte offset will be aligned to D32
            The value will contain the 32 bit integer read
        """
        
        try:
            cm = "readreg %d %x\r\n" % (self.lun, byte_offset)
            if (self.ver < 2.27):
                rn = byte_offset/4 + 1
                cm = "readreg %d %d\r\n" % (self.lun,rn)
            #print "vv_read:Debug:cm:%s\n" % (cm)
            self.handle.send(cm)
            orig = self.handle.recv(256)
            rp = orig
            rp = rp.split(" ")[3]
            rp = rp.split("\n")[0]
            rp = int(rp,16)
        
        except Exception as e:
            msg = "vv_read: No reply from register at address 0x%03x " % (byte_offset)
            raise BusException(msg)
        
        return rp

    def vv_load(self):
        """ Load the FPGA, its sort of a NO-OP in SKT class
        """
        bid = self.vv_read(BIDR)
        bid = binascii.unhexlify("%s" % "{0:x}".format(bid))
        if bid in BIDR_ARR:
            return 0
        else:
            raise BusException("Failed vv_load: FPGA: Bad board ID: %s" % bid)
            return 2

    def vv_init(self):
        """ Init the library, its a NO-OP in SKT class
        """
        return self.handle

    def vv_close(self):
        """ Close the socket
        """
        self.handle.shutdown(SHUT_RDWR)
        self.handle.close()
        self.handle = 0
        return 0