Commit eda7837c authored by Matthieu Cattin's avatar Matthieu Cattin

Apply changes made by Richard to the onewire and ds18b20 modules.

Changes made by Richard Carrillo (7solutions) for the fmcdio5chttla.
* Proper exception raising and handling.
* Avoiding infinite loop by adding timeouts.

Some cleanup in the comments and commented debug messages.
parent 2f5f41b7
#!/usr/bin/python
# Copyright CERN, 2011
# Author: Matthieu Cattin (CERN)
# Author (modifications): Richard Carrillo <rcarrillo(AT)sevensols.com>
# Licence: GPL v2 or later.
# Website: http://www.ohwr.org
# Website: http://www.sevensols.com
# Last modifications: 1/4/2012
import sys
import rr
import time
import onewire
# Class to access the DS18B20 (temperature sensor & unique ID) chip.
# It uses the onewire class.
class CDS18B20:
......@@ -15,7 +25,7 @@ class CDS18B20:
ROM_SKIP = 0xCC
ROM_ALARM_SEARCH = 0xEC
# DS18B20 fonctions commands
# DS18B20 functions commands
CONVERT_TEMP = 0x44
WRITE_SCRATCHPAD = 0x4E
READ_SCRATCHPAD = 0xBE
......@@ -23,6 +33,10 @@ class CDS18B20:
RECALL_EEPROM = 0xB8
READ_POWER_SUPPLY = 0xB4
FAMILY_CODE = 0x28
MIN_TEMPER = -55
MAX_TEMPER = 125
# Thermometer resolution configuration
RES = {'9-bit':0x0, '10-bit':0x1, '11-bit':0x2, '12-bit':0x3}
......@@ -31,61 +45,57 @@ class CDS18B20:
self.port = port
def read_serial_number(self):
#print('[DS18B20] Reading serial number')
if(1 != self.onewire.reset(self.port)):
print('[DS18B20] No presence pulse detected')
return -1
else:
#print('[DS18B20] Write ROM command %.2X') % self.ROM_READ
err = self.onewire.write_byte(self.port, self.ROM_READ)
if(err != 0):
print('[DS18B20] Write error')
return -1
family_code = self.onewire.read_byte(self.port)
serial_number = 0
for i in range(6):
serial_number |= self.onewire.read_byte(self.port) << (i*8)
crc = self.onewire.read_byte(self.port)
#print('[DS18B20] Family code : %.2X') % family_code
#print('[DS18B20] Serial number: %.12X') % serial_number
#print('[DS18B20] CRC : %.2X') % crc
#print '[DS18B20] Reading serial number'
self.onewire.reset(self.port)
#print '[DS18B20] Write ROM command %.2X' % self.ROM_READ
self.onewire.write_byte(self.port, self.ROM_READ)
family_code = self.onewire.read_byte(self.port)
if family_code != self.FAMILY_CODE:
raise onewire.OneWireDeviceOperationError("Invalid family code reported:"+hex(family_code)+" (expected:"+hex(self.FAMILY_CODE)+"). Wrong chip mounted?")
serial_number = 0
for i in range(6):
serial_number |= self.onewire.read_byte(self.port) << (i*8)
crc = self.onewire.read_byte(self.port)
#print '[DS18B20] Family code : %.2X' % family_code
#print '[DS18B20] Serial number: %.12X' % serial_number
#print '[DS18B20] CRC : %.2X' % crc
return ((crc<<56) | (serial_number<<8) | family_code)
def access(self, serial_number):
#print('[DS18B20] Accessing device')
if(1 != self.onewire.reset(self.port)):
print('[DS18B20] No presence pulse detected')
return -1
else:
#print('[DS18B20] Write ROM command %.2X') % self.ROM_MATCH
err = self.onewire.write_byte(self.port, self.ROM_MATCH)
#print serial_number
block = []
for i in range(8):
block.append(serial_number & 0xFF)
serial_number >>= 8
#print block
self.onewire.write_block(self.port, block)
return 0
#print '[DS18B20] Accessing device'
self.onewire.reset(self.port)
#print '[DS18B20] Write ROM command %.2X' % self.ROM_MATCH
err = self.onewire.write_byte(self.port, self.ROM_MATCH)
#print '[DS18B20] Serial number: 0x%16X' % serial_number
block = []
for i in range(8):
block.append(serial_number & 0xFF)
serial_number >>= 8
#print '[DS18B20] block[%d]:0x%02X' % (i, block[-1])
self.onewire.write_block(self.port, block)
def read_temp(self, serial_number):
#print('[DS18B20] Reading temperature')
err = self.access(serial_number)
#print('[DS18B20] Write function command %.2X') % self.CONVERT_TEMP
err = self.onewire.write_byte(self.port, self.CONVERT_TEMP)
time.sleep(0.8)
err = self.access(serial_number)
#print('[DS18B20] Write function command %.2X') % self.READ_SCRATCHPAD
err = self.onewire.write_byte(self.port, self.READ_SCRATCHPAD)
#print '[DS18B20] Reading temperature'
self.access(serial_number)
#print '[DS18B20] Write function command %.2X' % self.CONVERT_TEMP
self.onewire.write_byte(self.port, self.CONVERT_TEMP)
time.sleep(0.9)
self.access(serial_number)
#print '[DS18B20] Write function command %.2X' % self.READ_SCRATCHPAD
self.onewire.write_byte(self.port, self.READ_SCRATCHPAD)
data = self.onewire.read_block(self.port, 9)
#for i in range(9):
# print('Scratchpad data[%1d]: %.2X') % (i, data[i])
# print '[DS18B20] Scratchpad data[%1d]: %.2X' % (i, data[i])
temp = (data[1] << 8) | (data[0])
if(temp & 0x1000):
if temp & 0x1000:
temp = -0x10000 + temp
temp = temp/16.0
if temp < self.MIN_TEMPER or temp > self.MAX_TEMPER:
raise onewire.OneWireDeviceOperationError("Sensor reported an invalid temperature")
return temp
# Set temperature thresholds
# TODO
# Configure thermometer resolution
# TODO
#!/usr/bin/python
# Copyright CERN, 2011
# Author: Matthieu Cattin (CERN)
# Author (modifications): Richard Carrillo <rcarrillo(AT)sevensols.com>
# Licence: GPL v2 or later.
# Website: http://www.ohwr.org
# Website: http://www.sevensols.com
# Last modifications: 26/4/2012
import sys
import rr
import time
# Class to access the wishbone to onewire master module from OpenCores
# http://opencores.org/project,sockit_owm
class OneWireDeviceOperationError(Exception):
def __init__(self, msg):
self.msg = msg
def __str__(self):
return ("OneWire Device produced the error: %s" %(msg))
class COpenCoresOneWire:
......@@ -27,13 +43,16 @@ class COpenCoresOneWire:
CDR_OVD_OFS = 16
CDR_OVD_MSK = (0XFFFF<<16)
WAIT_TIME_OUT = 2
MAX_BLOCK_LEN = 160
def wr_reg(self, addr, val):
self.bus.iwrite(0, self.base_addr + addr, 4, val)
def rd_reg(self,addr):
return self.bus.iread(0, self.base_addr + addr, 4)
# Function called during object creation
# Initialise the onewire master module
# bus = host bus (PCIe, VME, etc...)
# base_addr = 1-wire core base address
# clk_div_nor = clock divider normal operation, clk_div_nor = Fclk * 5E-6 - 1
......@@ -41,31 +60,27 @@ class COpenCoresOneWire:
def __init__(self, bus, base_addr, clk_div_nor, clk_div_ovd):
self.bus = bus
self.base_addr = base_addr
#print('\n### Onewire class init ###')
#print("Clock divider (normal operation): %.4X") % clk_div_nor
#print("Clock divider (overdrive operation): %.4X") % clk_div_ovd
data = ((clk_div_nor & self.CDR_NOR_MSK) | ((clk_div_ovd<<self.CDR_OVD_OFS) & self.CDR_OVD_MSK))
#print('CRD register wr: %.8X') % data
self.wr_reg(self.R_CDR, data)
#print('CRD register rd: %.8X') % self.rd_reg(self.R_CDR)
# return: 1 -> presence pulse detected
# 0 -> no presence pulse detected
def reset(self, port):
data = ((port<<self.CSR_SEL_OFS) & self.CSR_SEL_MSK) | self.CSR_CYC_MSK | self.CSR_RST_MSK
#print('[onewire] Sending reset command, CSR: %.8X') % data
self.wr_reg(self.R_CSR, data)
init_time=time.time()
while(self.rd_reg(self.R_CSR) & self.CSR_CYC_MSK):
pass
if (time.time()-init_time) > self.WAIT_TIME_OUT:
raise OneWireDeviceOperationError("Wait timeout")
reg = self.rd_reg(self.R_CSR)
#print('[onewire] Reading CSR: %.8X') % reg
return ~reg & self.CSR_DAT_MSK
if not (~reg & self.CSR_DAT_MSK):
raise OneWireDeviceOperationError("No presence pulse detected")
def slot(self, port, bit):
data = ((port<<self.CSR_SEL_OFS) & self.CSR_SEL_MSK) | self.CSR_CYC_MSK | (bit & self.CSR_DAT_MSK)
self.wr_reg(self.R_CSR, data)
init_time=time.time()
while(self.rd_reg(self.R_CSR) & self.CSR_CYC_MSK):
pass
if (time.time()-init_time) > self.WAIT_TIME_OUT:
raise OneWireDeviceOperationError("Wait timeout")
reg = self.rd_reg(self.R_CSR)
return reg & self.CSR_DAT_MSK
......@@ -87,24 +102,21 @@ class COpenCoresOneWire:
for i in range(8):
data |= self.write_bit(port, (byte & 0x1)) << i
byte >>= 1
if(byte_old == data):
return 0
else:
return -1
if(byte_old != data):
raise OneWireDeviceOperationError("Error while checking written byte")
def write_block(self, port, block):
if(160 < len(block)):
return -1
if(self.MAX_BLOCK_LEN < len(block)):
raise OneWireDeviceOperationError("Block too long")
data = []
for i in range(len(block)):
data.append(self.write_byte(port, block[i]))
return data
def read_block(self, port, length):
if(160 < length):
return -1
if(self.MAX_BLOCK_LEN < length):
raise OneWireDeviceOperationError("Block too long")
data = []
for i in range(length):
data.append(self.read_byte(port))
return data
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment