Commit 56933687 authored by Federico Vaga's avatar Federico Vaga

Merge branch '18-fix-pytest' into 'master'

Resolve "fix pytest"

Closes #18

See merge request be-cem-edl/fec/hardware-modules/fmc-delay-1ns-8cha!9
parents d16139eb 2723db59
...@@ -8,6 +8,8 @@ import select ...@@ -8,6 +8,8 @@ import select
import time import time
import os import os
from PyFmcFineDelay import FmcFineDelay, FmcFineDelayTime from PyFmcFineDelay import FmcFineDelay, FmcFineDelayTime
import random
@pytest.fixture(scope="function") @pytest.fixture(scope="function")
def fmcfd(): def fmcfd():
...@@ -17,11 +19,17 @@ def fmcfd(): ...@@ -17,11 +19,17 @@ def fmcfd():
yield fd yield fd
for ch in fd.chan: for ch in fd.chan:
ch.disable() ch.disable()
@pytest.fixture(scope="function", params=pytest.channels) @pytest.fixture(scope="function", params=pytest.channels)
def fmcfd_chan(request, fmcfd): def fmcfd_chan(request, fmcfd):
yield fmcfd.chan[request.param] yield fmcfd.chan[request.param]
fmcfd.chan[request.param].disable()
MIN_COUNT = 1
MAX_COUNT = 0xFFFF
pulse_train = [(200000, 10000000000)]
@pytest.fixture(scope="function") @pytest.fixture(scope="function")
def fmcfd_tdc(request, fmcfd): def fmcfd_tdc(request, fmcfd):
...@@ -36,10 +44,12 @@ def fmcfd_tdc(request, fmcfd): ...@@ -36,10 +44,12 @@ def fmcfd_tdc(request, fmcfd):
fmcfd.tdc.enable_input = False fmcfd.tdc.enable_input = False
fmcfd.tdc.enable_tstamp = False fmcfd.tdc.enable_tstamp = False
def timeout_compute(start, period_ps, count): def timeout_compute(start, period_ps, count):
proportional = ((count * period_ps)/1000000000000.0) proportional = ((count * period_ps) / 1000000000000.0)
return time.time() + proportional + start + 5 return time.time() + proportional + start + 5
class TestFmcfdLoop(object): class TestFmcfdLoop(object):
""" """
The test needs a lemo cable (1ns) that connects all outputs The test needs a lemo cable (1ns) that connects all outputs
...@@ -54,61 +64,66 @@ class TestFmcfdLoop(object): ...@@ -54,61 +64,66 @@ class TestFmcfdLoop(object):
chan 4 o-------` chan 4 o-------`
""" """
@pytest.mark.parametrize("count", [10]) def test_output_flush(self, fmcfd, fmcfd_chan, fmcfd_tdc):
@pytest.mark.parametrize("width,period", [(200000, 400000)]) period_ps = 400000
def test_output_flush(self, fmcfd, fmcfd_chan, fmcfd_tdc,
width, period, count):
fmcfd_chan.pulse_generate(fmcfd.time + FmcFineDelayTime(2, 0, 0), fmcfd_chan.pulse_generate(fmcfd.time + FmcFineDelayTime(2, 0, 0),
width, period, count) int(period_ps / 2), period_ps, 1)
assert len(fmcfd_tdc.poll(4000)) > 0 assert len(fmcfd_tdc.poll(4000)) > 0
fmcfd_tdc.flush() fmcfd_tdc.flush()
assert len(fmcfd_tdc.poll(4000)) ==0 assert len(fmcfd_tdc.poll(4000)) == 0
@pytest.mark.parametrize("count", [1, 2, 3, 5, 7, 10, @pytest.mark.parametrize("count", [1, MAX_COUNT] +
100, 1000, 10000, 65535]) [random.randrange(MIN_COUNT + 1,
@pytest.mark.parametrize("width,period", pulse_train) int(MAX_COUNT / 10)) for x in range(8)])
def test_output_counter(self, fmcfd, fmcfd_chan, fmcfd_tdc, def test_output_counter(self, fmcfd, fmcfd_chan, fmcfd_tdc, count):
width, period, count):
""" """
In pulse mode, the Fine-Delay generates the exact number of In pulse mode, the Fine-Delay generates the exact number of
required pulses and we are able to read them all from the input required pulses and we are able to read them all from the input
channel. channel.
""" """
ts = [] ts = []
period_ps = 4500000000
start_s = 2 start_s = 2
start = fmcfd.time + FmcFineDelayTime(start_s, 0, 0) start = fmcfd.time + FmcFineDelayTime(start_s, 0, 0)
fmcfd_chan.pulse_generate(start, width, period, count) fmcfd_chan.pulse_generate(start, int(period_ps / 2), period_ps, count)
timeout = timeout_compute(start_s, period, count) timeout = timeout_compute(start_s, period_ps, count)
while len(ts) < count and time.time() < timeout: while len(ts) < count and time.time() < timeout:
if len(fmcfd_tdc.poll()) == 0: if len(fmcfd_tdc.poll()) == 0:
continue continue
t = fmcfd_tdc.read(100, os.O_NONBLOCK) try:
t = fmcfd_tdc.read(100, os.O_NONBLOCK)
except BlockingIOError:
t = fmcfd_tdc.read(100, os.O_NONBLOCK)
assert len(t) > 0 assert len(t) > 0
ts = ts + t ts = ts + t
assert len(ts) == count assert len(ts) == count
assert len(fmcfd_tdc.poll(int(period / 1000000000.0))) == 0 assert len(fmcfd_tdc.poll(int(period_ps / 1000000000.0))) == 0
del ts del ts
@pytest.mark.parametrize("count", [10000]) @pytest.mark.parametrize("count", [random.randrange(1000, 10000)])
@pytest.mark.parametrize("width,period", pulse_train) def test_input_sequence_number(self,fmcfd_chan, fmcfd_tdc, count):
def test_input_sequence_number(self, capsys, fmcfd_chan, fmcfd_tdc,
width, period, count):
""" """
The input channel has time-stamps with increasing sequence number The input channel has time-stamps with increasing sequence number
with step 1. with step 1.
""" """
period_ps = 4500000000
pending = count pending = count
ts = [] ts = []
start_s = 2 start_s = 2
start = FmcFineDelayTime(start_s, 0, 0) start = FmcFineDelayTime(start_s, 0, 0)
fmcfd_chan.pulse_generate(fmcfd_chan.dev.time + start, fmcfd_chan.pulse_generate(fmcfd_chan.dev.time + start,
width, period, count) int(period_ps / 2), period_ps, count)
timeout = timeout_compute(start_s, period, count) timeout = timeout_compute(start_s, period_ps, count)
while pending > 0 and time.time() < timeout: while pending > 0 and time.time() < timeout:
if len(fmcfd_tdc.poll()) == 0: if len(fmcfd_tdc.poll()) == 0:
continue continue
t = fmcfd_tdc.read(pending, os.O_NONBLOCK) try:
t = fmcfd_tdc.read(100, os.O_NONBLOCK)
except BlockingIOError:
t = fmcfd_tdc.read(100, os.O_NONBLOCK)
assert len(t) > 0 assert len(t) > 0
ts.extend(t) ts.extend(t)
pending -= len(t) pending -= len(t)
...@@ -121,34 +136,24 @@ class TestFmcfdLoop(object): ...@@ -121,34 +136,24 @@ class TestFmcfdLoop(object):
"i:{:d}, cur: {:s}, prev: {:s}".format(i, str(ts[i]), "i:{:d}, cur: {:s}, prev: {:s}".format(i, str(ts[i]),
str(prev_ts)) str(prev_ts))
@pytest.mark.parametrize("start_rel", [FmcFineDelayTime(0, 78125000, 0), # + 0.0625s @pytest.mark.parametrize("start_rel", [FmcFineDelayTime(random.randrange(0, 60),
FmcFineDelayTime(0, 15625000, 0), # + 0.125s random.randrange(0, 125000000),
FmcFineDelayTime(0, 31250000, 0), # + 0.25s 0) for i in range(10)])
FmcFineDelayTime(0, 62500000, 0), # + 0.5s
FmcFineDelayTime(1, 0, 0), # + 1s
FmcFineDelayTime(1, 78125000, 0), # + 1.0625s
FmcFineDelayTime(1, 15625000, 0), # + 1.125s
FmcFineDelayTime(1, 31250000, 0), # + 1.25s
FmcFineDelayTime(1, 62500000, 0), # + 1.5s
FmcFineDelayTime(2, 0, 0), # + 2s
FmcFineDelayTime(60, 0, 0), # + 60s
FmcFineDelayTime(120, 0, 0), # + 120s
])
@pytest.mark.parametrize("wr", [False, True]) @pytest.mark.parametrize("wr", [False, True])
@pytest.mark.parametrize("count", [1]) def test_output_input_start(self, fmcfd_chan, fmcfd_tdc, wr, start_rel):
@pytest.mark.parametrize("width,period", pulse_train)
def test_output_input_start(self, fmcfd_chan, fmcfd_tdc,
wr, start_rel, width, period, count):
""" """
The output channel generates a pulse at a given time and the input The output channel generates a pulse at a given time and the input
channel timestamps it. The two times must be almost the same excluding channel timestamps it. The two times must be almost the same excluding
the propagation time (cable length). the propagation time (cable length).
""" """
period_ps = 1000000
count = 1
fmcfd_chan.dev.whiterabbit_mode = wr fmcfd_chan.dev.whiterabbit_mode = wr
ts = [] ts = []
start = fmcfd_chan.dev.time + start_rel start = fmcfd_chan.dev.time + start_rel
fmcfd_chan.pulse_generate(start, width, period, count) fmcfd_chan.pulse_generate(start, int(period_ps / 2), period_ps, count)
assert len(fmcfd_tdc.poll(int(float(start_rel) * 1000) + 2000)) > 0 assert len(fmcfd_tdc.poll(int(float(start_rel) * 1000) + 2000)) > 0
ts = fmcfd_tdc.read(count, os.O_NONBLOCK) ts = fmcfd_tdc.read(count, os.O_NONBLOCK)
assert len(ts) == count assert len(ts) == count
...@@ -156,17 +161,9 @@ class TestFmcfdLoop(object): ...@@ -156,17 +161,9 @@ class TestFmcfdLoop(object):
assert ts[0].coarse - start.coarse <= 3 # there is < 3ns cable assert ts[0].coarse - start.coarse <= 3 # there is < 3ns cable
@pytest.mark.parametrize("width,period_ps", [(200000, 1000000), @pytest.mark.parametrize("period_ps", [random.randrange(400000, 1000000000000)])
(200000, 10000000),
(200000, 100000000),
(200000, 1000000000),
(200000, 10000000000),
(200000, 100000000000),
(200000, 1000000000000),
])
@pytest.mark.parametrize("count", [10]) @pytest.mark.parametrize("count", [10])
def test_output_period(self, fmcfd_chan, fmcfd_tdc, def test_output_period(self, fmcfd_chan, fmcfd_tdc, period_ps, count):
width, period_ps, count):
""" """
The test produces pulses on the given channels and catch them using The test produces pulses on the given channels and catch them using
the on board TDC. The period between two timestamps must be as close the on board TDC. The period between two timestamps must be as close
...@@ -175,7 +172,7 @@ class TestFmcfdLoop(object): ...@@ -175,7 +172,7 @@ class TestFmcfdLoop(object):
ts = [] ts = []
start = fmcfd_chan.dev.time + FmcFineDelayTime(2, 0, 0, 0, 0) start = fmcfd_chan.dev.time + FmcFineDelayTime(2, 0, 0, 0, 0)
fmcfd_chan.pulse_generate(start, width, period_ps, count) fmcfd_chan.pulse_generate(start, int(period_ps / 2), period_ps, count)
time.sleep(2 + (count * period_ps) / 1000000000000.0) time.sleep(2 + (count * period_ps) / 1000000000000.0)
assert len(fmcfd_tdc.poll(10000)) > 0 assert len(fmcfd_tdc.poll(10000)) > 0
ts = fmcfd_tdc.read(count, os.O_NONBLOCK) ts = fmcfd_tdc.read(count, os.O_NONBLOCK)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment