Commit 01105d5b authored by Matthieu Cattin's avatar Matthieu Cattin

test39: Add adc saturation test.

parent eeb2ea75
#! /usr/bin/env python
# coding: utf8
# Copyright CERN, 2011
# Author: Matthieu Cattin <matthieu.cattin@cern.ch>
# Licence: GPL v2 or later.
# Website: http://www.ohwr.org
# Last modifications: 30/5/2012
# Import system modules
import sys
import time
import os
# Add common modules and libraries location to path
sys.path.append('../../../')
sys.path.append('../../../gnurabbit/python/')
sys.path.append('../../../common/')
# Import common modules
from ptsexcept import *
import rr
# Import specific modules
from fmc_adc_spec import *
from fmc_adc import *
from numpy import *
from pylab import *
from calibr_box import *
import find_usb_tty
from PAGE.Agilent33250A import *
from PAGE.SineWaveform import *
"""
test39: Tests saturation with/without gain/offset correction
"""
NB_CHANNELS = 4
AWG_SET_SLEEP = 0.3
SSR_SET_SLEEP = 0.05
BOX_SET_SLEEP = 0.01
ACQ_TIMEOUT = 10
PRE_TRIG_SAMPLES = 10
POST_TRIG_SAMPLES = 200
NB_SHOTS = 1
BYTES_PER_SAMPLE = 2
TRIG_TIMETAG_BYTES = 16
RANGES = ['10V', '1V', '100mV']
def open_all_channels(fmc):
for i in range(1,NB_CHANNELS+1):
fmc.set_input_range(i, 'OPEN')
time.sleep(SSR_SET_SLEEP)
def fmc_adc_init(spec, fmc):
print "Initialise FMC board.\n"
# Reset offset DACs
fmc.dc_offset_reset()
# Make sure all switches are OFF
open_all_channels(fmc)
# Set software trigger
fmc.set_soft_trig()
# Set acquisition
fmc.set_pre_trig_samples(PRE_TRIG_SAMPLES)
fmc.set_post_trig_samples(POST_TRIG_SAMPLES)
fmc.set_shots(NB_SHOTS)
# Converts two's complement hex to signed
def hex2signed(value):
if(value & 0x8000):
return -((~value & 0xFFFC) + 1)
else:
return (value & 0xFFFC)
# Converts digital value to volts
def digital2volt(value, full_scale, nb_bit):
return float(value) * float(full_scale)/2**nb_bit
# Converts hex gain value to float
def gain2float(value):
dec = (value & 0x8000) >> 15
frac = value & 0x7FFF
return (float) (dec + (frac * 1.0/2**15))
def get_corr_values(fmc):
off_corr = fmc.get_adc_offset_corr(1)
print("Offset corr:0x%04X (%d)"%(off_corr, hex2signed(off_corr)))
gain_corr = fmc.get_adc_gain_corr(1)
print("Gain corr :0x%04X (%1.6f)"%(gain_corr, gain2float(gain_corr)))
def acq_channels(fmc, carrier, adc_fs, pause):
# Make sure no acquisition is running
fmc.stop_acq()
time.sleep(pause)
# Start acquisition
fmc.start_acq()
time.sleep(pause)
# Trigger
fmc.sw_trig()
# Wait end of acquisition
timeout = 0
while('IDLE' != fmc.get_acq_fsm_state()):
time.sleep(.1)
timeout += 1
if(ACQ_TIMEOUT < timeout):
print "Acquisition timeout. Missing trigger?."
print "Acq FSm state: %s"%fmc.get_acq_fsm_state()
return 1
# Retrieve data trough DMA
trig_pos = fmc.get_trig_pos()
# Enable "DMA done" interrupt
carrier.enable_dma_done_irq()
# Read samples for all channels + trigger timetag
data_length = ((PRE_TRIG_SAMPLES + 1 + POST_TRIG_SAMPLES) * NB_CHANNELS * BYTES_PER_SAMPLE) + TRIG_TIMETAG_BYTES
channels_data = carrier.get_data((trig_pos-(PRE_TRIG_SAMPLES*8)), data_length)
trig_timetag = []
data = []
for i in range(8):
data.append(channels_data.pop(-1))
for i in range(0,8,2):
trig_timetag.append(((data[i] << 16) + data[i+1]))
# Disable "DMA done" interrupt
carrier.disable_dma_done_irq()
#print("raw hex data: 0x%08X"%(channels_data[0]))
channels_data = [hex2signed(item) for item in channels_data]
#print("signed data : 0x%08X (%d)"%(channels_data[0], channels_data[0]))
#channels_data = [digital2volt(item,adc_fs,16) for item in channels_data]
return channels_data, trig_timetag
def plot_channel(ncal_ch_data, cal_ch_data, scal_ch_data, ncal_ch_mean, cal_ch_mean, scal_ch_mean, sat, ylimit):
sample = arange(len(ncal_ch_data))
plot(sample, ncal_ch_data, 'go-', label='Non-corrected')
plot(sample, cal_ch_data, 'bo-', label='Corrected')
plot(sample, scal_ch_data, 'mo-', label='Corrected & saturated')
#plot(sample, [ncal_ch_mean]*len(sample), 'g')
plot(sample, [cal_ch_mean]*len(sample), 'b--', label='Corrected middle')
plot(sample, [scal_ch_mean]*len(sample), 'm--', label='Corrected & saturated middle')
plot(sample, [sat]*len(sample), 'm')
plot(sample, [-sat]*len(sample), 'm')
plot(sample, [32764]*len(sample), 'r')
plot(sample, [-32765]*len(sample), 'r')
ylim(-ylimit-(ylimit/10.0), ylimit+(ylimit/10.0))
grid(color='k', linestyle=':', linewidth=1)
legend(loc='upper left')
#draw()
show()
return 0
def main (default_directory='.'):
# Constants declaration
TEST_NB = 39
FMC_ADC_BITSTREAM = '../firmwares/spec_fmcadc100m14b4cha.bin'
FMC_ADC_BITSTREAM = os.path.join(default_directory, FMC_ADC_BITSTREAM)
EXPECTED_BITSTREAM_TYPE = 0x1
# Calibration box vendor and product IDs
BOX_USB_VENDOR_ID = 0x10c4 # Cygnal Integrated Products, Inc.
BOX_USB_PRODUCT_ID = 0xea60 # CP210x Composite Device
# Agilent AWG serial access vendor and product IDs
AWG_USB_VENDOR_ID = 0x0403 # Future Technology Devices International, Ltd
AWG_USB_PRODUCT_ID = 0x6001 # FT232 USB-Serial (UART) IC
AWG_BAUD = 57600
EEPROM_BIN_FILENAME = "eeprom_content.out"
EEPROM_BIN_FILENAME = os.path.join(default_directory, EEPROM_BIN_FILENAME)
EEPROM_SIZE = 8192 # in Bytes
CALIBR_BIN_FILENAME = "calibration_data.bin"
CALIBR_BIN_FILENAME = os.path.join(default_directory, CALIBR_BIN_FILENAME)
start_test_time = time.time()
print "================================================================================"
print "Test%02d start\n" % TEST_NB
# SPEC object declaration
print "Loading hardware access library and opening device.\n"
spec = rr.Gennum()
# Load FMC ADC firmware
print "Loading FMC ADC firmware: %s\n" % FMC_ADC_BITSTREAM
spec.load_firmware(FMC_ADC_BITSTREAM)
time.sleep(2)
# Carrier object declaration (SPEC board specific part)
# Used to check that the firmware is loaded.
try:
carrier = CFmcAdc100mSpec(spec, EXPECTED_BITSTREAM_TYPE)
except FmcAdc100mSpecOperationError as e:
raise PtsCritical("Carrier init failed, test stopped: %s" % e)
# Mezzanine object declaration (FmcAdc100m14b4cha board specific part)
try:
fmc = CFmcAdc100m(spec)
except FmcAdc100mOperationError as e:
raise PtsCritical("Mezzanine init failed, test stopped: %s" % e)
try:
# Others objects declaration
usb_tty = find_usb_tty.CttyUSB()
awg_tty = usb_tty.find_usb_tty(AWG_USB_VENDOR_ID, AWG_USB_PRODUCT_ID)
box_tty = usb_tty.find_usb_tty(BOX_USB_VENDOR_ID, BOX_USB_PRODUCT_ID)
gen = Agilent33250A(device=awg_tty[0], bauds=AWG_BAUD)
sine = SineWaveform()
box = CCalibr_box(box_tty[0])
# Initialise fmc adc
fmc_adc_init(spec, fmc)
# Use data pattern instead of ADC data
#fmc.testpat_en(0x1FFF) # max
#fmc.testpat_en(0x0) # mid
#fmc.testpat_en(0x2000) # min
# Set UTC
current_time = time.time()
utc_seconds = int(current_time)
fmc.set_utc_second_cnt(utc_seconds)
#print "UTC core seconds counter initialised to : %d" % fmc.get_utc_second_cnt()
utc_coarse = int((current_time - utc_seconds)/8E-9)
fmc.set_utc_coarse_cnt(utc_coarse)
#print "UTC core coarse counter initialised to : %d" % fmc.get_utc_coarse_cnt()
# Print configuration
#fmc.print_adc_core_config()
# Print ADC config
#fmc.print_adc_config()
# Acquisition parameters
ACQ_PAUSE = 1 # pause between acq. stop and start, start and trigger
IN_RANGE = '1V'
IN_TERM = 'ON'
ADC_FS = {'10V':10.0, '1V':1.0, '100mV':0.1}
##################################################
# Set awg sine params to make adc saturate
##################################################
sine.frequency = 1E6
sine.amplitude = 1.2 * ADC_FS[IN_RANGE]
sine.dc = 0
print "\nSine frequency:%3.3fMHz amplitude:%2.3fVp offset:%2.3fV" % (sine.frequency/1E6, sine.amplitude, sine.dc)
# Set AWG
gen.connect()
gen.play(sine)
gen.output = True
time.sleep(AWG_SET_SLEEP)
get_corr_values(fmc)
##################################################
# Acquire channel 1 and print
##################################################
print "\nAcquiring channel 1"
# Configure analogue input
fmc.set_input_range(1, IN_RANGE)
fmc.set_input_term(1, IN_TERM)
time.sleep(SSR_SET_SLEEP)
# connect AWG to current channel
box.select_output_ch(1)
time.sleep(BOX_SET_SLEEP)
# Perform an acquisition
acq_data, trig_timetag = acq_channels(fmc, carrier, ADC_FS[IN_RANGE], ACQ_PAUSE)
channel_data = acq_data[0::4]
print("Number of samples: %d"%(len(channel_data)))
# Calculate middle value
ch_max = max(channel_data)
ch_min = min(channel_data)
ch_mid = (abs(ch_max) - abs(ch_min))
ch_mean = mean(channel_data)
print("Channel max=%d, min=%d, mid=%d, mean=%d"%(ch_max, ch_min, ch_mid, ch_mean))
non_cal_ch_mean = ch_mid
non_cal_ch_data = channel_data
# Plot channel
#plot_channel(channel_data, ch_mean, 32768)
##################################################
# Apply gain and offset correction
##################################################
# Get ADC and DAC offset and gain correction parameters
#print "\nRead calibration data from FMC EEPROM:"
adc_corr_data = {'10V':{'offset':[],'gain':[],'temp':0},
'1V':{'offset':[],'gain':[],'temp':0},
'100mV':{'offset':[],'gain':[],'temp':0}}
dac_corr_data = {'10V':{'offset':[],'gain':[],'temp':0},
'1V':{'offset':[],'gain':[],'temp':0},
'100mV':{'offset':[],'gain':[],'temp':0}}
# Read entire EEPROM
#print("Read all eeprom content.")
eeprom_data_read = fmc.sys_i2c_eeprom_read(0, EEPROM_SIZE)
# Write EEPROM data to binary file
#print("Write eeprom content to file (binary): %s"%(EEPROM_BIN_FILENAME))
f_eeprom = open(EEPROM_BIN_FILENAME, "wb")
for byte in eeprom_data_read:
f_eeprom.write(chr(byte))
f_eeprom.close()
# Get calibration data
#print("Extract calibration binary file to: %s"%(CALIBR_BIN_FILENAME))
cmd = 'sdb-read -e 0x200 ' + EEPROM_BIN_FILENAME + ' calib > ' + CALIBR_BIN_FILENAME
#print("Exctract calibration binary file, cmd: %s"%(cmd))
os.system(cmd)
#print "Get calibration data from binary file."
calibr_data = []
f_calibr_data = open(CALIBR_BIN_FILENAME, "rb")
try:
byte = f_calibr_data.read(1)
while byte != "":
calibr_data.append(ord(byte))
byte = f_calibr_data.read(1)
finally:
f_eeprom.close()
# Re-arrange correction data into 16-bit number (from bytes)
eeprom_corr_data = []
for i in range(0,len(calibr_data),2):
eeprom_corr_data.append((calibr_data[i+1] << 8) + (calibr_data[i]))
#print "0x%04X" % eeprom_corr_data[-1]
#print "Calibration data length (16-bit): %d" % len(eeprom_corr_data)
#print "Correction data from eeprom:"
#print "\nGet ADC correction parameters:"
for RANGE in RANGES:
for ch in range(NB_CHANNELS):
adc_corr_data[RANGE]['offset'].append(hex2signed(eeprom_corr_data.pop(0)))
for ch in range(NB_CHANNELS):
adc_corr_data[RANGE]['gain'].append(eeprom_corr_data.pop(0))
adc_corr_data[RANGE]['temp'] = eeprom_corr_data.pop(0)/100.0
"""
for ranges in adc_corr_data.iteritems():
print "%s:" % ranges[0]
for corr in ranges[1].iteritems():
print " - %6s: " % corr[0],
if type(corr[1]) is list:
for val in corr[1]:
print "0x%04X (%6d) " % (val, val),
else:
print "%2.3f " % corr[1],
print ""
print ""
"""
#print "\nGet DAC correction parameters:"
for RANGE in RANGES:
for ch in range(NB_CHANNELS):
dac_corr_data[RANGE]['offset'].append(hex2signed(eeprom_corr_data.pop(0)))
for ch in range(NB_CHANNELS):
dac_corr_data[RANGE]['gain'].append(eeprom_corr_data.pop(0))
dac_corr_data[RANGE]['temp'] = eeprom_corr_data.pop(0)/100.0
"""
for ranges in dac_corr_data.iteritems():
print "%s:" % ranges[0]
for corr in ranges[1].iteritems():
print " - %6s: " % corr[0],
if type(corr[1]) is list:
for val in corr[1]:
print "%6d " % val,
else:
print "%2.3f " % corr[1],
print ""
print ""
"""
# Write DAC gain and offset correction value to fmc class
print "\nApply DAC correction\n"
fmc.set_dac_corr(dac_corr_data)
g = adc_corr_data[IN_RANGE]['gain'][0]
o = adc_corr_data[IN_RANGE]['offset'][0]
print "\nApply ADC offset correction: gain=0x%04X, offset=0x%04X" %(g, o)
fmc.set_adc_gain_offset_corr(1, g, o)
get_corr_values(fmc)
##################################################
# Acquire channel 1 and print
##################################################
print "\nAcquiring channel 1"
# Perform an acquisition
acq_data, trig_timetag = acq_channels(fmc, carrier, ADC_FS[IN_RANGE], ACQ_PAUSE)
channel_data = acq_data[0::4]
print("Number of samples: %d"%(len(channel_data)))
# Calculate middle value
ch_max = max(channel_data)
ch_min = min(channel_data)
ch_mid = (abs(ch_max) - abs(ch_min))
ch_mean = mean(channel_data)
print("Channel max=%d, min=%d, mid=%d, mean=%d"%(ch_max, ch_min, ch_mid, ch_mean))
cal_ch_mean = ch_mid
cal_ch_data = channel_data
##################################################
# Artificially saturate corrected data
##################################################
sat_thres = 28000
sat_cal_ch_data = []
for d in cal_ch_data:
if d < -sat_thres:
sat_cal_ch_data.append(-sat_thres)
elif d > sat_thres:
sat_cal_ch_data.append(sat_thres)
else:
sat_cal_ch_data.append(d)
ch_max = max(sat_cal_ch_data)
ch_min = min(sat_cal_ch_data)
ch_mid = (abs(ch_max) - abs(ch_min))
ch_mean = mean(sat_cal_ch_data)
sat_cal_ch_mean = ch_mid
##################################################
# Plot channel
##################################################
plot_channel(non_cal_ch_data, cal_ch_data, sat_cal_ch_data, non_cal_ch_mean, cal_ch_mean, sat_cal_ch_mean, sat_thres, 32768)
# Make sure all switches are OFF
open_all_channels(fmc)
# Switch AWG OFF
gen.output = False
gen.close()
# Check if an error occured during frequency response test
# if(error != 0):
# raise PtsError('An error occured, check log for details.')
except(FmcAdc100mSpecOperationError, FmcAdc100mOperationError, CalibrBoxOperationError) as e:
raise PtsError("Test failed: %s" % e)
print ""
print "==> End of test%02d" % TEST_NB
print "================================================================================"
end_test_time = time.time()
print "Test%02d elapsed time: %.2f seconds\n" % (TEST_NB, end_test_time-start_test_time)
if __name__ == '__main__' :
main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment