Commit 4e1b1600 authored by Matthieu Cattin's avatar Matthieu Cattin

fmcadc_csr, test32: Extend decimation register to 32 bits. Add a decimation test.

parent 791a0072
......@@ -39,8 +39,7 @@ FMCADC100M_CSR=['FMC ADC 100MS/s core registers',{
'RESERVED':[16, 'Reserved', 0xFFFF]}],
'TRIG_POS':[0x18, 'Trigger address register', {}],
'SR':[0x1C, 'Sample rate', {
'DECI':[0, 'Sample rate decimation factor', 0xFFFF],
'RESERVED':[16, 'Reserved', 0xFFFF]}],
'DECI':[0, 'Sample rate decimation factor', 0xFFFFFFFF]}],
'PRE_SAMPLES':[0x20, 'Pre-trigger samples', {}],
'POST_SAMPLES':[0x24, 'Post-trigger samples', {}],
'SAMPLES_CNT':[0x28, 'Samples counter', {}],
......
#! /usr/bin/env python
# coding: utf8
# Copyright CERN, 2011
# Author: Matthieu Cattin <matthieu.cattin@cern.ch>
# Licence: GPL v2 or later.
# Website: http://www.ohwr.org
# Last modifications: 30/5/2012
# Import system modules
import sys
import time
import os
# Add common modules and libraries location to path
sys.path.append('../../../')
sys.path.append('../../../gnurabbit/python/')
sys.path.append('../../../common/')
# Import common modules
from ptsexcept import *
import rr
# Import specific modules
from fmc_adc_spec import *
from fmc_adc import *
from numpy import *
from pylab import *
from calibr_box import *
import find_usb_tty
from PAGE.Agilent33250A import *
from PAGE.SineWaveform import *
import scipy.optimize as optimize
import scipy.fftpack as fftpack
"""
test32: Test decimation
Note: Requires test00.py to run first to load the firmware!
"""
LOAD_BITSTREAM = False
NB_CHANNELS = 4
ADC_SAMP_FREQ = 100E6
AWG_SET_SLEEP = 0.3
SSR_SET_SLEEP = 0.05
BOX_SET_SLEEP = 0.01
ACQ_TIMEOUT = 100
PRE_TRIG_SAMPLES = 10
POST_TRIG_SAMPLES = 500
NB_SHOTS = 1
ACQ_LENGTH = 500 # in samples
DECIM_LIST = [1,2,3,4,5,6,7,8,9,10,100,1000,10000,100000,1000000]
def open_all_channels(fmc):
for i in range(1,NB_CHANNELS+1):
fmc.set_input_range(i, 'OPEN')
time.sleep(SSR_SET_SLEEP)
def fmc_adc_init(spec, fmc):
print "Initialise FMC board.\n"
# Reset offset DACs
fmc.dc_offset_reset()
# Make sure all switches are OFF
open_all_channels(fmc)
# Set software trigger
fmc.set_soft_trig()
# Set acquisition
fmc.set_decimation(1)
fmc.set_pre_trig_samples(PRE_TRIG_SAMPLES)
fmc.set_post_trig_samples(POST_TRIG_SAMPLES)
fmc.set_shots(NB_SHOTS)
# Converts two's complement hex to signed
def hex2signed(value):
if(value & 0x8000):
return -((~value & 0xFFFF) + 1)
else:
return value
# Converts digital value to volts
def digital2volt(value, full_scale, nb_bit):
return float(value) * float(full_scale)/2**nb_bit
def acq_channels(fmc, carrier, adc_fs, pause):
# Make sure no acquisition is running
fmc.stop_acq()
time.sleep(pause)
# Start acquisition
fmc.start_acq()
time.sleep(pause)
# Trigger
fmc.sw_trig()
# Wait end of acquisition
timeout = 0
while('IDLE' != fmc.get_acq_fsm_state()):
time.sleep(.1)
timeout += 1
if(ACQ_TIMEOUT < timeout):
print "Acquisition timeout. Missing trigger?."
print "Acq FSm state: %s"%fmc.get_acq_fsm_state()
return 1
# Retrieve data trough DMA
trig_pos = fmc.get_trig_pos()
# Enable "DMA done" iinterrupt
carrier.set_irq_en_mask(0x1)
# Read ACQ_LENGTH samples after the trigger for all channels
channels_data = carrier.get_data((trig_pos<<3), ACQ_LENGTH*8)
# Disable "DMA done" iinterrupt
carrier.set_irq_en_mask(0x0)
channels_data = [hex2signed(item) for item in channels_data]
channels_data = [digital2volt(item,adc_fs,16) for item in channels_data]
return channels_data
def plot_channels(ch_data, ch_mean, ylimit):
sample = arange(len(ch_data[0]))
plot(sample, ch_data[0], 'b', label='Channel 1')
plot(sample, ch_data[1], 'g', label='Channel 2')
plot(sample, ch_data[2], 'm', label='Channel 3')
plot(sample, ch_data[3], 'c', label='Channel 4')
plot(sample, [ch_mean[0]]*len(sample), 'r')
plot(sample, [ch_mean[1]]*len(sample), 'r')
plot(sample, [ch_mean[2]]*len(sample), 'r')
plot(sample, [ch_mean[3]]*len(sample), 'r')
ylim(-ylimit-(ylimit/10.0), ylimit+(ylimit/10.0))
grid(color='k', linestyle='--', linewidth=1)
legend(loc='upper left')
show()
return 0
def my_sine(x, o, a, f, p):
return o + a * sin(f*x + p)
def fit_sine(x, y, f, verbose=False):
# Guessed amplitude is the max value of input array
guess_ampl = max(y)
# Guessed offset is the mean value of input array
guess_offset = mean(y)
# Guessed frequency is the highest peak in the input array fft
yhat = fftpack.rfft(y)
idx = (yhat**2).argmax()
#freqs = fftpack.rfftfreq(len(y), d = (x[1]-x[0])/(2*pi))
freqs = fftpack.rfftfreq(len(y), d = (x[1]-x[0]))
guess_freq = freqs[idx]
# Guessed phase is zero (doesn't really matters), could make a fisrt rising_edge detection
#tmp = diff(sign(y))
#rising_edges = where(select([tmp>0],[tmp]))[0]
guess_phase = 0.0
guess_params = [guess_offset, guess_ampl, guess_freq*(2*pi), guess_phase]
if verbose:
print("Guessed params => Offset: %f, Amplitude: %f, Frequency: %f, Phase: %f"%(guess_offset, guess_ampl, guess_freq, guess_phase))
# Fit f to x,y input data
(fit_offset, fit_ampl, fit_freq, fit_phase), pcov = optimize.curve_fit(f, x, y, guess_params)
if verbose:
print("Fit params => Offset: %f, Amplitude: %f, Frequency: %f, Phase: %f"%(fit_offset, fit_ampl, fit_freq/(2*pi), fit_phase))
print("pcov:")
print pcov
return fit_offset, fit_ampl, fit_freq/(2*pi), fit_phase
def get_main_freq(x, y, verbose=False):
# Main frequency is the highest peak in the input array fft
yhat = fftpack.rfft(y)
idx = (yhat**2).argmax()
freqs = fftpack.rfftfreq(len(y), d = (x[1]-x[0]))
return freqs[idx]
def main (default_directory='.'):
# Constants declaration
TEST_NB = 32
FMC_ADC_BITSTREAM = '../firmwares/spec_fmcadc100m14b4cha.bin'
FMC_ADC_BITSTREAM = os.path.join(default_directory, FMC_ADC_BITSTREAM)
EXPECTED_BITSTREAM_TYPE = 0x1
# Calibration box vendor and product IDs
BOX_USB_VENDOR_ID = 0x10c4 # Cygnal Integrated Products, Inc.
BOX_USB_PRODUCT_ID = 0xea60 # CP210x Composite Device
# Agilent AWG serial access vendor and product IDs
AWG_USB_VENDOR_ID = 0x0403 # Future Technology Devices International, Ltd
AWG_USB_PRODUCT_ID = 0x6001 # FT232 USB-Serial (UART) IC
AWG_BAUD = 57600
start_test_time = time.time()
print "================================================================================"
print "Test%02d start\n" % TEST_NB
# SPEC object declaration
print "Loading hardware access library and opening device.\n"
spec = rr.Gennum()
# Load FMC ADC firmware
if LOAD_BITSTREAM == True:
print "Loading FMC ADC firmware: %s\n" % FMC_ADC_BITSTREAM
spec.load_firmware(FMC_ADC_BITSTREAM)
time.sleep(2)
# Carrier object declaration (SPEC board specific part)
# Used to check that the firmware is loaded.
try:
carrier = CFmcAdc100mSpec(spec, EXPECTED_BITSTREAM_TYPE)
except FmcAdc100mSpecOperationError as e:
raise PtsCritical("Carrier init failed, test stopped: %s" % e)
# Mezzanine object declaration (FmcAdc100m14b4cha board specific part)
try:
fmc = CFmcAdc100m(spec)
except FmcAdc100mOperationError as e:
raise PtsCritical("Mezzanine init failed, test stopped: %s" % e)
try:
# Others objects declaration
usb_tty = find_usb_tty.CttyUSB()
awg_tty = usb_tty.find_usb_tty(AWG_USB_VENDOR_ID, AWG_USB_PRODUCT_ID)
box_tty = usb_tty.find_usb_tty(BOX_USB_VENDOR_ID, BOX_USB_PRODUCT_ID)
gen = Agilent33250A(device=awg_tty[0], bauds=AWG_BAUD)
sine = SineWaveform()
box = CCalibr_box(box_tty[0])
# Initialise fmc adc
fmc_adc_init(spec, fmc)
# Set UTC
current_time = time.time()
utc_seconds = int(current_time)
fmc.set_utc_second_cnt(utc_seconds)
#print "UTC core seconds counter initialised to : %d" % fmc.get_utc_second_cnt()
utc_coarse = int((current_time - utc_seconds)/8E-9)
fmc.set_utc_coarse_cnt(utc_coarse)
#print "UTC core coarse counter initialised to : %d" % fmc.get_utc_coarse_cnt()
# Print configuration
#fmc.print_adc_core_config()
# Print ADC config
#fmc.print_adc_config()
# Acquisition parameters
ACQ_PAUSE = .1 # pause between acq. stop and start, start and trigger
IN_RANGE = '1V'
IN_TERM = 'ON'
ADC_FS = {'10V':10.0, '1V':1.0, '100mV':0.1}
for DECIMATION in DECIM_LIST:
# Set decimation
print("\n------------------------------------------------")
print("Set decimation: %d"%DECIMATION)
fmc.set_decimation(DECIMATION)
# Set sine params
sine.frequency = 1E6 / DECIMATION
sine.amplitude = 0.8 * ADC_FS[IN_RANGE]
sine.dc = 0
print "\nSine frequency:%.3f Hz amplitude:%2.3fVpp offset:%2.3fV" % (sine.frequency, sine.amplitude, sine.dc)
# Set AWG
gen.connect()
gen.play(sine)
gen.output = True
time.sleep(AWG_SET_SLEEP)
print("")
channels_data = [[],[],[],[]]
for ch in range(NB_CHANNELS):
print "Acquiring channel %d"%(ch+1)
# Configure analogue input
fmc.set_input_range(ch+1, IN_RANGE)
fmc.set_input_term(ch+1, IN_TERM)
time.sleep(SSR_SET_SLEEP)
# connect AWG to current channel
box.select_output_ch(ch+1)
time.sleep(BOX_SET_SLEEP)
# Perform an acquisition
acq_data = acq_channels(fmc, carrier, ADC_FS[IN_RANGE], ACQ_PAUSE)
channels_data[ch] = acq_data[ch::4]
# Calculate mean for each channel data
ch_mean = []
for ch in range(NB_CHANNELS):
ch_mean.append(mean(channels_data[ch]))
# Plot all channels
#plot_channels(channels_data, ch_mean, (ADC_FS[IN_RANGE]/2))
# Check input sine frequency
y = channels_data[0]
x = arange(len(y))
x = x * 1/ADC_SAMP_FREQ * DECIMATION
fit_offset, fit_ampl, fit_freq, fit_phase = fit_sine(x, y, my_sine, verbose=False)
main_freq = fit_freq
#main_freq = get_main_freq(x, y)
#print("\nSine fit: offset=%f, ampl=%f, freq=%f, phase=%f"%(fit_offset, fit_ampl, fit_freq, fit_phase))
print("\nMeasured input sine frequency: %f Hz"%(main_freq))
error = 0
tol = 0.001 * sine.frequency
print("\nFrequency error tolerance: %.3f Hz"%tol)
if (main_freq - sine.frequency) < tol:
print("\n ==> Decimation is working fine!")
else:
print("\nDecimation test failed. ###########")
print("Measured frequency is outside tolerance: got=%f expect=%f"%(main_freq-sine.frequency, tol))
error += 1
# Make sure all switches are OFF
open_all_channels(fmc)
# Switch AWG OFF
gen.output = False
gen.close()
except(FmcAdc100mSpecOperationError, FmcAdc100mOperationError, CalibrBoxOperationError) as e:
raise PtsError("Test failed: %s" % e)
print ""
print "==> End of test%02d" % TEST_NB
print "================================================================================"
end_test_time = time.time()
print "Test%02d elapsed time: %.2f seconds\n" % (TEST_NB, end_test_time-start_test_time)
# Check if an error occured during frequency response test
if(error != 0):
raise PtsError('An error occured, check log for details.')
if __name__ == '__main__' :
main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment