Commit 0ba13e1e authored by Matthieu Cattin's avatar Matthieu Cattin

test44: Add external trigger and datapath delay difference test.

parent 239434ba
#! /usr/bin/env python
# coding: utf8
# Copyright CERN, 2011
# Author: Matthieu Cattin <matthieu.cattin@cern.ch>
# Licence: GPL v2 or later.
# Website: http://www.ohwr.org
# Last modifications: 30/5/2012
# Import system modules
import sys
import time
import os
# Add common modules and libraries location to path
sys.path.append('../../../')
sys.path.append('../../../gnurabbit/python/')
sys.path.append('../../../common/')
# Import common modules
from ptsexcept import *
import rr
# Import specific modules
from fmc_adc_spec import *
from fmc_adc import *
from numpy import *
from pylab import *
import matplotlib.colors as colors
from calibr_box import *
import find_usb_tty
from PAGE.Agilent33250A import *
from PAGE.SineWaveform import *
"""
test44: Tests external trigger and datapath delay difference.
For this test, connect a pulse (e.g AWG sync output)
on the external trigger input and chain it to channel 4.
"""
NB_CHANNELS = 4
AWG_SET_SLEEP = 0.3
SSR_SET_SLEEP = 0.05
BOX_SET_SLEEP = 0.01
ACQ_TIMEOUT = 10
# Acquisition parameters
NB_ACQ = 20
ACQ_PAUSE = 0.1 # pause between acq. stop and start, start and trigger
IN_RANGE = '10V'
IN_TERM = 'OFF'
ADC_FS = {'10V':10.0, '1V':1.0, '100mV':0.1}
CHANNEL = 4
PRE_TRIG_SAMPLES = 2
POST_TRIG_SAMPLES = 10
NB_SHOTS = 1
BYTES_PER_SAMPLE = 2
TRIG_THRES_VOLT = 0.007
TRIG_THRES_FILT = 20
TRIG_DEL = 0 # in samples
TRIG_TIMETAG_BYTES = 16
RANGES = ['10V', '1V', '100mV']
def open_all_channels(fmc):
for i in range(1,NB_CHANNELS+1):
fmc.set_input_range(i, 'OPEN')
time.sleep(SSR_SET_SLEEP)
def fmc_adc_init(spec, fmc):
print "Initialise FMC board.\n"
# Reset offset DACs
fmc.dc_offset_reset()
# Make sure all switches are OFF
open_all_channels(fmc)
# Set acquisition
fmc.set_pre_trig_samples(PRE_TRIG_SAMPLES)
fmc.set_post_trig_samples(POST_TRIG_SAMPLES)
fmc.set_shots(NB_SHOTS)
# Converts two's complement hex to signed
def hex2signed(value):
if(value & 0x8000):
return -((~value & 0xFFFF) + 1)
else:
return (value & 0xFFFF)
# Converts digital value to volts
def digital2volt(value, full_scale, nb_bit):
return float(value) * float(full_scale)/2**nb_bit
# Converts volts to digital value
def volt2digital_without_offset(value, full_scale, nb_bit):
if(value > (2**nb_bit)/2 - 1):
value = (2**nb_bit)/2 - 1
if(value < -((2**nb_bit)/2)):
value = -((2**nb_bit)/2)
digital = (value) * 2**nb_bit/full_scale
#print('volt2digital: %2.9f > %2.9f')%(value,digital)
return int(digital)
# Converts hex gain value to float
def gain2float(value):
dec = (value & 0x8000) >> 15
frac = value & 0x7FFF
return (float) (dec + (frac * 1.0/2**15))
def get_corr_values(fmc, ch):
off_corr = fmc.get_adc_offset_corr(ch)
print("\nOffset corr:0x%04X (%d)"%(off_corr, hex2signed(off_corr)))
gain_corr = fmc.get_adc_gain_corr(ch)
print("Gain corr :0x%04X (%1.6f)"%(gain_corr, gain2float(gain_corr)))
def acq_channels(fmc, carrier, adc_fs, pause):
# Make sure no acquisition is running
fmc.stop_acq()
time.sleep(pause)
# Start acquisition
fmc.start_acq()
time.sleep(pause)
# Trigger
#fmc.sw_trig()
# Wait end of acquisition
timeout = 0
while('IDLE' != fmc.get_acq_fsm_state()):
time.sleep(.1)
timeout += 1
if(ACQ_TIMEOUT < timeout):
print "Acquisition timeout. Missing trigger?."
print "Acq FSm state: %s"%fmc.get_acq_fsm_state()
sys.exit()
# Retrieve data trough DMA
trig_pos = fmc.get_trig_pos()
# Enable "DMA done" interrupt
carrier.enable_dma_done_irq()
# Read samples for all channels + trigger timetag
data_length = ((PRE_TRIG_SAMPLES + 1 + POST_TRIG_SAMPLES) * NB_CHANNELS * BYTES_PER_SAMPLE) + TRIG_TIMETAG_BYTES
channels_data = carrier.get_data((trig_pos-(PRE_TRIG_SAMPLES*8)), data_length)
trig_timetag = []
data = []
for i in range(8):
data.append(channels_data.pop(-1))
for i in range(0,8,2):
trig_timetag.append(((data[i] << 16) + data[i+1]))
# Disable "DMA done" interrupt
carrier.disable_dma_done_irq()
#print("raw hex data: 0x%08X"%(channels_data[0]))
channels_data = [hex2signed(item) for item in channels_data]
#print("signed data : 0x%08X (%d)"%(channels_data[0], channels_data[0]))
channels_data = [digital2volt(item,adc_fs,16) for item in channels_data]
return channels_data, trig_timetag
def plot_channel(ch_data, trig_pos, col, trig_del):
#cmap = cm.get_cmap('YlOrRd')
cmap = cm.get_cmap('autumn')
cnorm = colors.Normalize(vmin=0, vmax=NB_ACQ)
mapcol = cm.ScalarMappable(norm=cnorm, cmap=cmap)
sample = range(-trig_pos, len(ch_data)-trig_pos)
plot(sample, ch_data, linestyle='-', color=mapcol.to_rgba(col))
ylim_min = -0.5
ylim_max = 2.5
ylim(ylim_min, ylim_max)
grid(color='k', linestyle=':', linewidth=1)
vlines(0, ylim_min, ylim_max, color='#AA0000', linestyles='solid')
xlabel('Samples')
ylabel('Voltage [V]')
title('External hardware trigger, delay=%d'%trig_del)
#legend(loc='upper left')
#draw()
#show()
return 0
def main (default_directory='.'):
# Constants declaration
TEST_NB = 44
FMC_ADC_BITSTREAM = '../firmwares/spec_fmcadc100m14b4cha.bin'
FMC_ADC_BITSTREAM = os.path.join(default_directory, FMC_ADC_BITSTREAM)
EXPECTED_BITSTREAM_TYPE = 0x1
# Calibration box vendor and product IDs
BOX_USB_VENDOR_ID = 0x10c4 # Cygnal Integrated Products, Inc.
BOX_USB_PRODUCT_ID = 0xea60 # CP210x Composite Device
# Agilent AWG serial access vendor and product IDs
AWG_USB_VENDOR_ID = 0x0403 # Future Technology Devices International, Ltd
AWG_USB_PRODUCT_ID = 0x6001 # FT232 USB-Serial (UART) IC
AWG_BAUD = 57600
EEPROM_BIN_FILENAME = "eeprom_content.out"
EEPROM_BIN_FILENAME = os.path.join(default_directory, EEPROM_BIN_FILENAME)
EEPROM_SIZE = 8192 # in Bytes
CALIBR_BIN_FILENAME = "calibration_data.bin"
CALIBR_BIN_FILENAME = os.path.join(default_directory, CALIBR_BIN_FILENAME)
start_test_time = time.time()
print "================================================================================"
print "Test%02d start\n" % TEST_NB
# SPEC object declaration
print "Loading hardware access library and opening device.\n"
spec = rr.Gennum()
# Load FMC ADC firmware
print "Loading FMC ADC firmware: %s\n" % FMC_ADC_BITSTREAM
if(os.path.isfile(FMC_ADC_BITSTREAM)):
spec.load_firmware(FMC_ADC_BITSTREAM)
time.sleep(2)
else:
raise PtsCritical("Firmware file \"%s\" is missing, test stopped." % FMC_ADC_BITSTREAM)
# Carrier object declaration (SPEC board specific part)
# Used to check that the firmware is loaded.
try:
carrier = CFmcAdc100mSpec(spec, EXPECTED_BITSTREAM_TYPE)
except FmcAdc100mSpecOperationError as e:
raise PtsCritical("Carrier init failed, test stopped: %s" % e)
# Mezzanine object declaration (FmcAdc100m14b4cha board specific part)
try:
fmc = CFmcAdc100m(spec)
except FmcAdc100mOperationError as e:
raise PtsCritical("Mezzanine init failed, test stopped: %s" % e)
try:
"""
# Others objects declaration
usb_tty = find_usb_tty.CttyUSB()
awg_tty = usb_tty.find_usb_tty(AWG_USB_VENDOR_ID, AWG_USB_PRODUCT_ID)
box_tty = usb_tty.find_usb_tty(BOX_USB_VENDOR_ID, BOX_USB_PRODUCT_ID)
gen = Agilent33250A(device=awg_tty[0], bauds=AWG_BAUD)
sine = SineWaveform()
box = CCalibr_box(box_tty[0])
"""
# Initialise fmc adc
fmc_adc_init(spec, fmc)
# Use data pattern instead of ADC data
#fmc.testpat_en(0x1FFF) # max
#fmc.testpat_en(0x0) # mid
#fmc.testpat_en(0x2000) # min
# Set UTC
current_time = time.time()
utc_seconds = int(current_time)
fmc.set_utc_second_cnt(utc_seconds)
#print "UTC core seconds counter initialised to : %d" % fmc.get_utc_second_cnt()
utc_coarse = int((current_time - utc_seconds)/8E-9)
fmc.set_utc_coarse_cnt(utc_coarse)
#print "UTC core coarse counter initialised to : %d" % fmc.get_utc_coarse_cnt()
# Print ADC config
#fmc.print_adc_config()
##################################################
# Configure sampling clock
##################################################
"""
fmc.print_si570_config()
time.sleep(3)
fs_clk = fmc.get_samp_freq()
print("Sampling frequency: %d Hz"%fs_clk)
"""
print("\Reduce sampling frequency")
fmc.si570.wr_reg(0x89,(1<<4)) # freeze DCO
fmc.si570.set_hs_div(3)
#fmc.si570.set_n1_div(19)
fmc.si570.rd_reg(0x89)
fmc.si570.wr_reg(0x89,0) # unfreeze DCO
fmc.si570.wr_reg(0x87,(1<<6)) # NewFreq
fmc.print_si570_config()
time.sleep(3)
fs_clk = fmc.get_samp_freq()
print("Sampling frequency: %d Hz"%fs_clk)
print("\nReset sampling frequency to 100MHz")
fmc.si570.recall_nvm()
fmc.print_si570_config()
time.sleep(3)
fs_clk = fmc.get_samp_freq()
print("Sampling frequency: %d Hz"%fs_clk)
#sys.exit()
##################################################
# Configure trigger
##################################################
trig_hw_sel = 1 # 1=external
trig_hw_pol = 0 # 0=rising
trig_hw_en = 1
trig_sw_en = 0
trig_channel = CHANNEL
trig_thres = volt2digital_without_offset(TRIG_THRES_VOLT, ADC_FS[IN_RANGE], 16)
#fmc.set_trig_config(trig_hw_sel, trig_hw_pol, trig_hw_en, trig_sw_en, trig_channel, trig_thres, TRIG_DEL, TRIG_THRES_FILT)
fmc.set_trig_config(trig_hw_sel, trig_hw_pol, trig_hw_en, trig_sw_en, trig_channel, trig_thres, TRIG_DEL)
print("==================================================")
print("Channel: %d\nInput range: %s\nInput term: %s"%(CHANNEL, IN_RANGE, IN_TERM))
fmc.print_trig_config()
print("==================================================")
# Print configuration
#time.sleep(3)
#fmc.print_adc_core_config()
##################################################
# Set awg sine params
##################################################
#sine.frequency = 20E3
#sine.amplitude = 0.4 * ADC_FS[IN_RANGE]
#sine.dc = 0
#print "\nSine frequency:%3.3fMHz amplitude:%2.3fVp offset:%2.3fV" % (sine.frequency/1E6, sine.amplitude, sine.dc)
# Set AWG
#gen.connect()
#gen.play(sine)
#gen.output = True
#time.sleep(AWG_SET_SLEEP)
##################################################
# Configure analogue input
##################################################
fmc.set_input_range(CHANNEL, IN_RANGE)
fmc.set_input_term(CHANNEL, IN_TERM)
time.sleep(SSR_SET_SLEEP)
# connect AWG to current channel
#box.select_output_ch(CHANNEL)
#time.sleep(BOX_SET_SLEEP)
##################################################
# Apply gain and offset correction
##################################################
# Get ADC and DAC offset and gain correction parameters
#print "\nRead calibration data from FMC EEPROM:"
adc_corr_data = {'10V':{'offset':[],'gain':[],'temp':0},
'1V':{'offset':[],'gain':[],'temp':0},
'100mV':{'offset':[],'gain':[],'temp':0}}
dac_corr_data = {'10V':{'offset':[],'gain':[],'temp':0},
'1V':{'offset':[],'gain':[],'temp':0},
'100mV':{'offset':[],'gain':[],'temp':0}}
# Read entire EEPROM
#print("Read all eeprom content.")
eeprom_data_read = fmc.sys_i2c_eeprom_read(0, EEPROM_SIZE)
# Write EEPROM data to binary file
#print("Write eeprom content to file (binary): %s"%(EEPROM_BIN_FILENAME))
f_eeprom = open(EEPROM_BIN_FILENAME, "wb")
for byte in eeprom_data_read:
f_eeprom.write(chr(byte))
f_eeprom.close()
# Get calibration data
#print("Extract calibration binary file to: %s"%(CALIBR_BIN_FILENAME))
cmd = 'sdb-read -e 0x200 ' + EEPROM_BIN_FILENAME + ' calib > ' + CALIBR_BIN_FILENAME
#print("Exctract calibration binary file, cmd: %s"%(cmd))
os.system(cmd)
#print "Get calibration data from binary file."
calibr_data = []
f_calibr_data = open(CALIBR_BIN_FILENAME, "rb")
try:
byte = f_calibr_data.read(1)
while byte != "":
calibr_data.append(ord(byte))
byte = f_calibr_data.read(1)
finally:
f_eeprom.close()
# Re-arrange correction data into 16-bit number (from bytes)
eeprom_corr_data = []
for i in range(0,len(calibr_data),2):
eeprom_corr_data.append((calibr_data[i+1] << 8) + (calibr_data[i]))
#print "0x%04X" % eeprom_corr_data[-1]
#print "Calibration data length (16-bit): %d" % len(eeprom_corr_data)
#print "Correction data from eeprom:"
#print "\nGet ADC correction parameters:"
for RANGE in RANGES:
for ch in range(NB_CHANNELS):
adc_corr_data[RANGE]['offset'].append(hex2signed(eeprom_corr_data.pop(0)))
for ch in range(NB_CHANNELS):
adc_corr_data[RANGE]['gain'].append(eeprom_corr_data.pop(0))
adc_corr_data[RANGE]['temp'] = eeprom_corr_data.pop(0)/100.0
"""
for ranges in adc_corr_data.iteritems():
print "%s:" % ranges[0]
for corr in ranges[1].iteritems():
print " - %6s: " % corr[0],
if type(corr[1]) is list:
for val in corr[1]:
print "0x%04X (%6d) " % (val, val),
else:
print "%2.3f " % corr[1],
print ""
print ""
"""
#print "\nGet DAC correction parameters:"
for RANGE in RANGES:
for ch in range(NB_CHANNELS):
dac_corr_data[RANGE]['offset'].append(hex2signed(eeprom_corr_data.pop(0)))
for ch in range(NB_CHANNELS):
dac_corr_data[RANGE]['gain'].append(eeprom_corr_data.pop(0))
dac_corr_data[RANGE]['temp'] = eeprom_corr_data.pop(0)/100.0
"""
for ranges in dac_corr_data.iteritems():
print "%s:" % ranges[0]
for corr in ranges[1].iteritems():
print " - %6s: " % corr[0],
if type(corr[1]) is list:
for val in corr[1]:
print "%6d " % val,
else:
print "%2.3f " % corr[1],
print ""
print ""
"""
# Write DAC gain and offset correction value to fmc class
#print "\nApply DAC correction\n"
fmc.set_dac_corr(dac_corr_data)
g = adc_corr_data[IN_RANGE]['gain'][CHANNEL-1]
o = adc_corr_data[IN_RANGE]['offset'][CHANNEL-1]
#g = 0x8000
#o = 0
#print "\nApply ADC offset correction: gain=0x%04X, offset=0x%04X" %(g, o)
fmc.set_adc_gain_offset_corr(CHANNEL, g, o)
# print correction values from fpga
get_corr_values(fmc, CHANNEL)
##################################################
# Acquire channel and print
##################################################
print "\nAcquiring channel %d" % CHANNEL
acq_nb = NB_ACQ
for nb in range(acq_nb):
acq_data, trig_timetag = acq_channels(fmc, carrier, ADC_FS[IN_RANGE], ACQ_PAUSE)
ch_data = acq_data[CHANNEL-1::4]
#print("Number of samples: %d"%(len(ch_data)))
##################################################
# Plot channel
##################################################
trig_pos = PRE_TRIG_SAMPLES
#col = float(nb)/float(acq_nb-1)
col = nb
#print("Acqisition: %3d, color: %1.4f"%(nb, col))
plot_channel(ch_data, trig_pos, col, TRIG_DEL)
show()
# Make sure all switches are OFF
open_all_channels(fmc)
# Switch AWG OFF
#gen.output = False
#gen.close()
# Check if an error occured during frequency response test
# if(error != 0):
# raise PtsError('An error occured, check log for details.')
except(FmcAdc100mSpecOperationError, FmcAdc100mOperationError, CalibrBoxOperationError) as e:
raise PtsError("Test failed: %s" % e)
print ""
print "==> End of test%02d" % TEST_NB
print "================================================================================"
end_test_time = time.time()
print "Test%02d elapsed time: %.2f seconds\n" % (TEST_NB, end_test_time-start_test_time)
if __name__ == '__main__' :
main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment