Commit f59f84b7 authored by Matthieu Cattin's avatar Matthieu Cattin

Change exception handling structure.

Exception from lower level modules (i2c, spi, onewire, etc...) are forwarded
to the upper layer by the device modules (ds18b20, ltc217x, etc...).
Each layer is adding specific informations to the error message (e.g.
wishbone core base address, spi slave number or i2c address, etc...).
This will simplify the exception handling in the higher level modules.
parent 81af9aa4
......@@ -16,10 +16,11 @@ import rr
# Class to access 32-bit wishbone registers on BAR0
class CSRDeviceOperationError(Exception):
def __init__(self, msg):
def __init__(self, addr, msg):
self.msg = msg
self.addr = addr
def __str__(self):
return ("CSR Device produced the error: %s" %(msg))
return ("CSR [Wishbone core base address:0x%08X]: %s" %(self.addr, self.msg))
class CCSR:
......
......@@ -8,13 +8,22 @@
# Website: http://www.sevensols.com
# Last modifications: 1/4/2012
# Import standard modules
import sys
import rr
import time
import onewire
# Import specific modules
import rr
from onewire import *
# Class to access the DS18B20 (temperature sensor & unique ID) chip.
# It uses the onewire class.
# It uses the onewire class and supports only one device on the onewire bus.
class DS18B20OperationError(Exception):
def __init__(self, msg):
self.msg = msg
def __str__(self):
return ("DS18B20: %s" %(self.msg))
class CDS18B20:
......@@ -45,13 +54,14 @@ class CDS18B20:
self.port = port
def read_serial_number(self):
try:
#print '[DS18B20] Reading serial number'
self.onewire.reset(self.port)
#print '[DS18B20] Write ROM command %.2X' % self.ROM_READ
self.onewire.write_byte(self.port, self.ROM_READ)
family_code = self.onewire.read_byte(self.port)
if family_code != self.FAMILY_CODE:
raise onewire.OneWireDeviceOperationError("Invalid family code reported:"+hex(family_code)+" (expected:"+hex(self.FAMILY_CODE)+"). Wrong chip mounted?")
raise DS18B20OperationError("Invalid family code reported:0x02X (expected:0x02X). Wrong chip mounted?" % (family_code, self.FAMILY_CODE))
serial_number = 0
for i in range(6):
serial_number |= self.onewire.read_byte(self.port) << (i*8)
......@@ -60,12 +70,15 @@ class CDS18B20:
#print '[DS18B20] Serial number: %.12X' % serial_number
#print '[DS18B20] CRC : %.2X' % crc
return ((crc<<56) | (serial_number<<8) | family_code)
except OneWireDeviceOperationError as e:
raise DS18B20OperationError(e)
def access(self, serial_number):
try:
#print '[DS18B20] Accessing device'
self.onewire.reset(self.port)
#print '[DS18B20] Write ROM command %.2X' % self.ROM_MATCH
err = self.onewire.write_byte(self.port, self.ROM_MATCH)
self.onewire.write_byte(self.port, self.ROM_MATCH)
#print '[DS18B20] Serial number: 0x%16X' % serial_number
block = []
for i in range(8):
......@@ -73,8 +86,11 @@ class CDS18B20:
serial_number >>= 8
#print '[DS18B20] block[%d]:0x%02X' % (i, block[-1])
self.onewire.write_block(self.port, block)
except OneWireDeviceOperationError as e:
raise DS18B20OperationError(e)
def read_temp(self, serial_number):
try:
#print '[DS18B20] Reading temperature'
self.access(serial_number)
#print '[DS18B20] Write function command %.2X' % self.CONVERT_TEMP
......@@ -91,8 +107,10 @@ class CDS18B20:
temp = -0x10000 + temp
temp = temp/16.0
if temp < self.MIN_TEMPER or temp > self.MAX_TEMPER:
raise onewire.OneWireDeviceOperationError("Sensor reported an invalid temperature")
raise DS18B20OperationError("Sensor reported an invalid temperature.")
return temp
except OneWireDeviceOperationError as e:
raise DS18B20OperationError(e)
# Set temperature thresholds
# TODO
......
......@@ -14,11 +14,18 @@ import time
# Import specific modules
import rr
import i2c
from i2c import *
# Class to access the 24AA64 (64K EEPROM) chip.
# It uses the I2C class.
class Eeprom24AA64OperationError(Exception):
def __init__(self, addr, msg):
self.msg = msg
self.addr = addr
def __str__(self):
return ("EEPROM 24AA64 [I2C address:0x%02X]: %s" %(self.addr, self.msg))
class C24AA64:
PAGE_SIZE = 32 # in bytes
......@@ -29,27 +36,34 @@ class C24AA64:
self.i2c_addr = i2c_addr
def wr_byte(self, mem_addr, byte):
try:
self.i2c.start(self.i2c_addr, True)
self.i2c.write((mem_addr >> 8), False)
self.i2c.write((mem_addr & 0xFF), False)
self.i2c.write(byte,True)
except I2CDeviceOperationError as e:
raise Eeprom24AA64OperationError(self.i2c_addr, e)
def rd_byte(self, mem_addr):
try:
self.i2c.start(self.i2c_addr, True)
self.i2c.write((mem_addr >> 8), False)
self.i2c.write((mem_addr & 0xFF), False)
self.i2c.start(self.i2c_addr, False)
return self.i2c.read(True)
except I2CDeviceOperationError as e:
raise Eeprom24AA64OperationError(self.i2c_addr, e)
def wr_page(self, mem_addr, data):
try:
#print '[24AA64] write data lenght=%d' % (len(data))
#print '[24AA64] write addr=%04X' % (mem_addr)
if(len(data) == 0):
raise i2c.I2CDeviceOperationError("Nothing to transmit, data size is 0!")
raise Eeprom24AA64OperationError(self.i2c_addr, "Nothing to transmit, data size is 0!")
if(len(data) > self.PAGE_SIZE):
raise i2c.I2CDeviceOperationError("Maximum write size is " + str(self.PAGE_SIZE)+" byte!")
raise Eeprom24AA64OperationError(self.i2c_addr, "Maximum write size is %d byte!" % self.PAGE_SIZE)
if((mem_addr | self.PAGE_BOUNDARY_MASK) ^ self.PAGE_BOUNDARY_MASK):
raise i2c.I2CDeviceOperationError("Start write address is not aligned to a page boundary!")
raise Eeprom24AA64OperationError(self.i2c_addr, "Start write address is not aligned to a page boundary!")
self.i2c.start(self.i2c_addr, True)
self.i2c.write((mem_addr >> 8), False)
self.i2c.write((mem_addr & 0xFF), False)
......@@ -62,8 +76,11 @@ class C24AA64:
i += 1
#print '[24AA64] write last i=%d' % (i)
self.i2c.write(data[i],True)
except I2CDeviceOperationError as e:
raise Eeprom24AA64OperationError(self.i2c_addr, e)
def rd_seq(self, mem_addr, size):
try:
self.i2c.start(self.i2c_addr, True)
self.i2c.write((mem_addr >> 8), False)
self.i2c.write((mem_addr & 0xFF), False)
......@@ -79,3 +96,5 @@ class C24AA64:
#print '[24AA64] read last i=%d' % (i)
data.append(self.i2c.read(True))
return data
except I2CDeviceOperationError as e:
raise Eeprom24AA64OperationError(self.i2c_addr, e)
......@@ -22,7 +22,7 @@ class GN4124OperationError(Exception):
def __init__(self, msg):
self.msg = msg
def __str__(self):
return ("GN4124 produced the error: %s" %(msg))
return ("GN4124: %s" %(self.msg))
class CGN4124:
......@@ -153,8 +153,8 @@ class CGN4124:
# 2 = C3 D4 A1 B2 (swap words)
# 3 = D4 C3 B2 A1 (invert bytes)
def set_dma_swap(self, swap):
if(swap > 3):
raise GN4124OperationError('Invalid swapping configuration : %d' % swap)
if (swap > 3) | (swap < 0):
raise GN4124OperationError('Invalid swapping configuration : requested %d (should be between 0 and 3)' % swap)
else:
self.dma_csr.wr_reg(self.R_DMA_CTL, (swap << self.DMA_CTL_SWAP))
......
......@@ -6,7 +6,7 @@
# Licence: GPL v2 or later.
# Website: http://www.ohwr.org
# Website: http://www.sevensols.com
# Last modifications: 26/4/2012
# Last modifications: 4/5/2012
# Import standard modules
import sys
......@@ -19,10 +19,11 @@ import rr
# http://opencores.org/project,i2c
class I2CDeviceOperationError(Exception):
def __init__(self, msg):
def __init__(self, addr, msg):
self.msg = msg
self.addr = addr
def __str__(self):
return ("I2C Device produced the error: %s" %(msg))
return ("I2C [Wishbone core base address:0x%08X]: %s" %(self.addr, self.msg))
class COpenCoresI2C:
# OpenCores I2C registers description
......@@ -71,13 +72,13 @@ class COpenCoresI2C:
self.wr_reg(self.R_CTR, self.CTR_EN)
#print "[I2C] CTR: %.2X" % self.rd_reg(self.R_CTR)
if not(self.rd_reg(self.R_CTR) & self.CTR_EN):
raise I2CDeviceOperationError("I2C core is not enabled")
raise I2CDeviceOperationError(self.base_addr, "Core at base address 0x%08X is not enabled" % self.base_addr)
def wait_busy(self):
init_time=time.time()
while(self.rd_reg(self.R_SR) & self.SR_TIP):
if (time.time()-init_time) > self.WAIT_TIME_OUT:
raise I2CDeviceOperationError("Wait timeout")
raise I2CDeviceOperationError(self.base_addr, "Wait timeout")
def start(self, addr, write_mode):
#print '[I2C] START addr=%.2X'%addr
......@@ -92,7 +93,7 @@ class COpenCoresI2C:
self.wait_busy()
if self.rd_reg(self.R_SR) & self.SR_RXACK:
raise I2CDeviceOperationError("Device ACK not received at the I2C message start")
raise I2CDeviceOperationError(self.base_addr, "ACK not received after start from device at address 0x%02X" % (addr >> 1))
def write(self, data, last):
self.wr_reg(self.R_TXR, data)
......@@ -102,7 +103,7 @@ class COpenCoresI2C:
self.wr_reg(self.R_CR, cmd)
self.wait_busy()
if(self.rd_reg(self.R_SR) & self.SR_RXACK):
raise I2CDeviceOperationError("No ACK upon write")
raise I2CDeviceOperationError(self.base_addr, "No ACK upon write")
def read(self, last):
cmd = self.CR_RD
......
......@@ -13,11 +13,18 @@ import time
# Import specific modules
import rr
import spi
from spi import *
# Class to access the LTC217x (ADC) chip.
# It uses the SPI class.
class LTC217xOperationError(Exception):
def __init__(self, addr, msg):
self.msg = msg
self.addr = addr # addr corresponds to the SPI slave number
def __str__(self):
return ("LTC217x [SPI slave number:%d]: %s" %(self.addr, self.msg))
class CLTC217x:
# LTC217x registers
......@@ -55,15 +62,21 @@ class CLTC217x:
# addr = ltc217x register address (1 byte)
# value = value to write to the register (1 byte)
def wr_reg(self, addr, value):
try:
tx = [value, addr]
self.spi.transaction(self.slave, tx)
except SPIDeviceOperationError as e:
raise LTC217xOperationError(self.slave, e)
# addr = ltc217x register address (1 byte)
# return = value of the register (1 byte)
def rd_reg(self, addr):
try:
tx = [0xFF, (addr | 0x80)]
rx = self.spi.transaction(self.slave, tx)
return (rx[0] & 0xFF)
except SPIDeviceOperationError as e:
raise LTC217xOperationError(self.slave, e)
def __init__(self, spi, slave):
self.spi = spi
......@@ -147,7 +160,7 @@ class CLTC217x:
#print "[LTC217x] Output mode reg: 0x%02X" %(reg)
self.wr_reg(self.R_OUTMODE, reg) # Write new value to register
else:
raise spi.SPIDeviceOperationError("Requiered LVDS current setting doesn't exist for LPC217x!")
raise LTC217xOperationError(self.slave, "Requiered LVDS current setting (%s) doesn't exist for LPC217x!" % current)
def set_outmode(self, mode):
if mode in self.OUTMODE:
......@@ -160,7 +173,7 @@ class CLTC217x:
#print "[LTC217x] Output mode reg: 0x%02X" %(reg)
self.wr_reg(self.R_OUTMODE, reg) # Write new value to register
else:
raise spi.SPIDeviceOperationError("Requiered output mode setting doesn't exist for LPC217x!")
raise LTC217xOperationError(self.slave, "Requiered output mode setting (%s) doesn't exist for LPC217x!" % mode)
def set_lvds_termination(self, en):
reg = self.rd_reg(self.R_OUTMODE)
......
......@@ -13,11 +13,18 @@ import time
# Import specific modules
import rr
import spi
from spi import *
# Class to access the MAX5442 (DAC) chip.
# It uses the SPI class.
class MAX5442OperationError(Exception):
def __init__(self, addr, msg):
self.msg = msg
self.addr = addr # addr corresponds to the SPI slave number
def __str__(self):
return ("MAX5442 [SPI slave number:%d]: %s" %(self.addr, self.msg))
class CMAX5442:
def __init__(self, spi, slave):
......@@ -26,9 +33,12 @@ class CMAX5442:
# value = DAC digital value (2 bytes)
def set_value(self, value):
try:
tx = [(value & 0xFF), ((value & 0xFF00)>>8)]
#print '[max5442] Value to set: %.4X' % value
#for i in range(len(tx)):
# print '[max5442] tx[%d]: %.2X' %(i, tx[i])
self.spi.transaction(self.slave, tx)
except SPIDeviceOperationError as e:
raise MAX5442OperationError(self.slave, e)
......@@ -19,10 +19,11 @@ import rr
# http://opencores.org/project,sockit_owm
class OneWireDeviceOperationError(Exception):
def __init__(self, msg):
def __init__(self, addr, msg):
self.msg = msg
self.addr = addr
def __str__(self):
return ("OneWire Device produced the error: %s" %(msg))
return ("OneWire [Wishbone core base address:0x%08X]: %s" %(self.addr, self.msg))
class COpenCoresOneWire:
......@@ -73,10 +74,10 @@ class COpenCoresOneWire:
init_time=time.time()
while(self.rd_reg(self.R_CSR) & self.CSR_CYC_MSK):
if (time.time()-init_time) > self.WAIT_TIME_OUT:
raise OneWireDeviceOperationError("Wait timeout")
raise OneWireDeviceOperationError(self.base_addr, "Wait timeout")
reg = self.rd_reg(self.R_CSR)
if not (~reg & self.CSR_DAT_MSK):
raise OneWireDeviceOperationError("No presence pulse detected")
raise OneWireDeviceOperationError(self.base_addr, "No presence pulse detected")
def slot(self, port, bit):
data = ((port<<self.CSR_SEL_OFS) & self.CSR_SEL_MSK) | self.CSR_CYC_MSK | (bit & self.CSR_DAT_MSK)
......@@ -84,7 +85,7 @@ class COpenCoresOneWire:
init_time=time.time()
while(self.rd_reg(self.R_CSR) & self.CSR_CYC_MSK):
if (time.time()-init_time) > self.WAIT_TIME_OUT:
raise OneWireDeviceOperationError("Wait timeout")
raise OneWireDeviceOperationError(self.base_addr, "Wait timeout")
reg = self.rd_reg(self.R_CSR)
return reg & self.CSR_DAT_MSK
......@@ -107,11 +108,11 @@ class COpenCoresOneWire:
data |= self.write_bit(port, (byte & 0x1)) << i
byte >>= 1
if(byte_old != data):
raise OneWireDeviceOperationError("Error while checking written byte")
raise OneWireDeviceOperationError(self.base_addr, "Error while checking written byte")
def write_block(self, port, block):
if(self.MAX_BLOCK_LEN < len(block)):
raise OneWireDeviceOperationError("Block too long")
raise OneWireDeviceOperationError(self.base_addr, "Block too long")
data = []
for i in range(len(block)):
data.append(self.write_byte(port, block[i]))
......@@ -119,7 +120,7 @@ class COpenCoresOneWire:
def read_block(self, port, length):
if(self.MAX_BLOCK_LEN < length):
raise OneWireDeviceOperationError("Block too long")
raise OneWireDeviceOperationError(self.base_addr, "Block too long")
data = []
for i in range(length):
data.append(self.read_byte(port))
......
#!/usr/bin/python
#! /usr/bin/env python
# coding: utf8
# Copyright CERN, 2011
# Author: Matthieu Cattin (CERN)
# Licence: GPL v2 or later.
# Website: http://www.ohwr.org
# Last modifications: 27/4/2012
# Import standard modules
import sys
import rr
import time
import i2c
# Import specific modules
import rr
from i2c import *
# Class to access the Si57x (VCXO) chip.
# It uses the I2C class.
class Si57xOperationError(Exception):
def __init__(self, addr, msg):
self.msg = msg
self.addr = addr
def __str__(self):
return ("Si57x [I2C address:0x%02X]: %s" %(self.addr, self.msg))
class CSi57x:
......@@ -37,17 +56,23 @@ class CSi57x:
self.addr = addr
def rd_reg(self, addr):
try:
self.i2c.start(self.addr, True)
self.i2c.write(addr, False)
self.i2c.start(self.addr, False)
reg = self.i2c.read(True)
#print("raw data from Si570: %.2X")%reg
return reg
except I2CDeviceOperationError as e:
raise Si57xOperationError(self.addr, e)
def wr_reg(self, addr, data):
try:
self.i2c.start(self.addr, True)
self.i2c.write(addr, False)
self.i2c.write(data, True)
except I2CDeviceOperationError as e:
raise Si57xOperationError(self.addr, e)
def get_rfreq(self):
rfreq = self.rd_reg(self.R_RFREQ0)
......
......@@ -18,10 +18,11 @@ import rr
# http://opencores.org/project,spi
class SPIDeviceOperationError(Exception):
def __init__(self, msg):
def __init__(self, addr, msg):
self.msg = msg
self.addr = addr
def __str__(self):
return ("SPI Device produced the error: %s" %(msg))
return ("SPI [Wishbone core base address:0x%08X]: %s" %(self.addr, self.msg))
class COpenCoresSPI:
# OpenCores SPI registers description
......@@ -68,7 +69,7 @@ class COpenCoresSPI:
init_time=time.time()
while(self.rd_reg(self.R_CTRL) & self.CTRL_BSY):
if (time.time()-init_time) > self.WAIT_TIME_OUT:
raise I2CDeviceOperationError("Wait timeout")
raise I2CDeviceOperationError(self.base_addr, "Wait timeout")
def config(self, ass, rx_neg, tx_neg, lsb, ie):
self.conf = 0
......@@ -94,6 +95,8 @@ class COpenCoresSPI:
for i in range(0, len(txrx)):
self.wr_reg(self.R_TX[i], txrx[i])
#print '[SPI] data length: 0x%X' % len(data)
if slave > len(self.SS_SEL):
raise I2CDeviceOperationError(self.base_addr, "Maximum number of slaves is %d" % len(self.SS_SEL))
self.wr_reg(self.R_SS, self.SS_SEL[slave])
self.wr_reg(self.R_CTRL, (self.LGH_MASK & (len(data)<<3)) | self.CTRL_GO | self.conf)
self.wait_busy()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment