Commit 857520ef authored by Federico Vaga's avatar Federico Vaga

tst: improve test_output_counter

This patch merges two different tests into one to speed up the entire
testing process. Then it improves the merged code.
Signed-off-by: Federico Vaga's avatarFederico Vaga <federico.vaga@cern.ch>
parent 083259ec
...@@ -72,69 +72,40 @@ class TestFmcfdLoop(object): ...@@ -72,69 +72,40 @@ class TestFmcfdLoop(object):
fmcfd_tdc.flush() fmcfd_tdc.flush()
assert len(fmcfd_tdc.poll(1000)) == 0 assert len(fmcfd_tdc.poll(1000)) == 0
@pytest.mark.parametrize("count, period_ps", [(i + 1, 1000000) for i in range(15)] +
@pytest.mark.parametrize("count", [1, MAX_COUNT] + [(MAX_COUNT, 4500000000)])
[random.randrange(MIN_COUNT + 1, def test_output_counter(self, fmcfd, fmcfd_chan, fmcfd_tdc, count, period_ps):
int(MAX_COUNT / 10)) for x in range(8)]) """In pulse mode, the Fine-Delay generates the exact number of
def test_output_counter(self, fmcfd, fmcfd_chan, fmcfd_tdc, count):
"""
In pulse mode, the Fine-Delay generates the exact number of
required pulses and we are able to read them all from the input required pulses and we are able to read them all from the input
channel. channel with an assigned sequential number.
"""
ts = []
period_ps = 4500000000
start_s = 2
start = fmcfd.time + FmcFineDelayTime(start_s, 0, 0)
fmcfd_chan.pulse_generate(start, int(period_ps / 2), period_ps, count)
timeout = timeout_compute(start_s, period_ps, count)
while len(ts) < count and time.time() < timeout:
if len(fmcfd_tdc.poll()) == 0:
continue
try:
t = fmcfd_tdc.read(100, os.O_NONBLOCK)
except BlockingIOError:
t = fmcfd_tdc.read(100, os.O_NONBLOCK)
assert len(t) > 0 Internal TDC used a FIFO, therefore it can't handle a too fast and long
ts = ts + t burst. Maximumg 15 pulses. On VME it is slower due to the bus: a period
assert len(ts) == count of 4.5ms is the best compromise for this test.
assert len(fmcfd_tdc.poll(int(period_ps / 1000000000.0))) == 0
del ts
@pytest.mark.parametrize("count", [random.randrange(1000, 10000)]) We test the extremes to make the test a bit faster.
def test_input_sequence_number(self,fmcfd_chan, fmcfd_tdc, count):
"""
The input channel has time-stamps with increasing sequence number
with step 1.
""" """
period_ps = 4500000000
pending = count
ts = [] ts = []
start_s = 2 start = fmcfd.time + FmcFineDelayTime(0, 1000000, 0)
start = FmcFineDelayTime(start_s, 0, 0) fmcfd_chan.pulse_generate(start, int(period_ps / 2), period_ps, count)
fmcfd_chan.pulse_generate(fmcfd_chan.dev.time + start, fmcfd_tdc.poll(1000)
int(period_ps / 2), period_ps, count) while len(fmcfd_tdc.poll(500)) > 0:
timeout = timeout_compute(start_s, period_ps, count)
while pending > 0 and time.time() < timeout:
if len(fmcfd_tdc.poll()) == 0:
continue
try: try:
t = fmcfd_tdc.read(100, os.O_NONBLOCK) t = fmcfd_tdc.read(100, os.O_NONBLOCK)
assert len(t) > 0
ts = ts + t
except BlockingIOError: except BlockingIOError:
t = fmcfd_tdc.read(100, os.O_NONBLOCK) continue
assert len(t) > 0 assert len(ts) == count
ts.extend(t) assert len(fmcfd_tdc.poll(50)) == 0
pending -= len(t)
assert pending == 0
prev_ts = None prev_ts = None
for i in range(len(ts)): for i in range(len(ts)):
if prev_ts is not None: if prev_ts is not None:
assert ts[i].seq_id == (prev_ts.seq_id + 1) & 0xFFFF,\ assert ts[i].seq_id == (prev_ts.seq_id + 1) & 0xFFFF, f"i:{i}, cur: {ts[i]}, prev: {prev_ts}"
"i:{:d}, cur: {:s}, prev: {:s}".format(i, str(ts[i]), prev_ts = ts[i]
str(prev_ts)) del ts
@pytest.mark.parametrize("start_rel", [FmcFineDelayTime(0, random.randrange(1, 125000000), @pytest.mark.parametrize("start_rel", [FmcFineDelayTime(0, random.randrange(1, 125000000),
0) for i in range(50)] + 0) for i in range(50)] +
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment