Commit c2337ed4 authored by Federico Vaga's avatar Federico Vaga

Merge branch 'pytest-improvements' into 'master'

improve pytest

See merge request be-cem-edl/fec/hardware-modules/fmc-delay-1ns-8cha!18
parents 78bc0498 a11ec120
# SPDX-FileCopyrightText: 2020 CERN (home.cern)
# SPDX-FileCopyrightText: 2023 CERN (home.cern)
#
# SPDX-License-Identifier: LGPL-2.1-or-later
......@@ -13,6 +13,17 @@ include:
file:
- 'edl-gitlab-ci.yml'
python-style:
variables:
FF_KUBERNETES_HONOR_ENTRYPOINT: 1 # Make Gitlab K8s executors to respect entrypoint in Acc-Py images, https://cern.service-now.com/service-portal?id=ticket&table=incident&n=INC3570437
stage: analyse
image:
name: gitlab-registry.cern.ch/acc-co/devops/python/distribution-images/acc-py_cc7:pro
before_script:
- pip install pycodestyle
script:
- pycodestyle --ignore=E501,W503 pytest/ # W503 because I think is bug in pycodestyle
cppcheck:
stage: analyse
image:
......
"""
SPDX-License-Identifier: LGPL-2.1-or-later
SPDX-FileCopyrightText: 2020 CERN
"""
from pathlib import Path
import pytest
from PyFmcFineDelay import FmcFineDelay
import re
def valid_slot_type(slot):
if re.search(r"(VME|PCI)\.[0-9]+-FMC\.[0-9]+", slot) is None:
raise ValueError()
return slot
def id_from_slot(slot):
carrier, mezzanine = slot.split("-")
carrier_bus, carrier_slot = carrier.split(".")
carrier_slot = int(carrier_slot)
mezzanine_bus, mezzanine_slot = mezzanine.split(".")
mezzanine_slot = int(mezzanine_slot)
if carrier_bus == "PCI":
with open("/run/dynpci") as f:
for line in f.readlines():
dynslot, pciid = line.strip().split(" ")
if int(dynslot) == carrier_slot:
break
pciid = f"0000:{pciid}"
pathfmc = list(Path("/sys/bus/pci/devices").joinpath(pciid)
.glob(f"spec-*/id:*/fmc-fdelay-tdc.*.auto/fmc-slot-*.{mezzanine_slot}"))
elif carrier_bus == "VME":
pathfmc = list(Path("/sys/bus/vme/devices").joinpath(f"slot.{carrier_slot:02d}")
.joinpath(f"vme.{carrier_slot:02d}")
.glob(f"svec-*/svec-*/id:*/fmc-fdelay-tdc.*.auto/fmc-slot-*.{mezzanine_slot}"))
else:
raise ValueError()
assert len(pathfmc) == 1
devname = list(Path(pathfmc[0]).parent.glob("hw-fd-*/fd-*/devname"))
assert len(devname) == 1
with open(devname[0]) as f:
dev_id = int(f.read().strip().split("-")[1], 16)
return dev_id
def pytest_generate_tests(metafunc):
if "dev_id" in metafunc.fixturenames:
metafunc.parametrize("dev_id", pytest.dev_id)
@pytest.fixture(scope="function")
def fmcfd(dev_id):
fd = FmcFineDelay(dev_id)
for ch in fd.chan:
ch.disable()
yield fd
for chan in fd.chan:
chan.disable()
def pytest_addoption(parser):
parser.addoption("--id", type=lambda x: int(x, 16), action='append',
default=[], help="Fmc Fine-Delay Linux Identifier")
parser.addoption("--slot", type=valid_slot_type, action='append',
default=[], help="Fmc Fine-Delay absolute slot (works only for SPEC and SVEC)")
parser.addoption("--channel", type=int, default=[],
action="append", choices=range(FmcFineDelay.CHANNEL_NUMBER),
help="Channel(s) to be used for acquisition tests. Default all channels")
def pytest_configure(config):
pytest.dev_id = config.getoption("--id")
if len(pytest.dev_id) == 0:
pytest.slot = config.getoption("--slot")
if len(pytest.slot) == 0:
print("Missing argument --id or --slot")
raise Exception()
pytest.dev_id = []
for slot in pytest.slot:
pytest.dev_id.append(id_from_slot(slot))
pytest.channels = config.getoption("--channel")
if len(pytest.channels) == 0:
pytest.channels = range(FmcFineDelay.CHANNEL_NUMBER)
# SPDX-FileCopyrightText: 2023 CERN (home.cern)
#
# SPDX-License-Identifier: LGPL-2.1-or-later
pytest
......@@ -8,18 +8,17 @@ import random
from PyFmcFineDelay import FmcFineDelay, FmcFineDelayTime
@pytest.fixture(scope="function", params=range(0, FmcFineDelay.CHANNEL_NUMBER))
def fmcfd_chan(request):
fd = FmcFineDelay(pytest.fd_id)
yield fd.chan[request.param]
def fmcfd_chan(request, fmcfd):
yield fmcfd.chan[request.param]
class TestFmcfdGetterSetter(object):
def test_disable(self, fmcfd):
for chan in fmcfd.chan:
chan.disable()
assert chan.disabled == True
assert chan.disabled is True
@pytest.mark.parametrize("enable", [True, False])
def test_tdc_disable_input(self, fmcfd, enable):
......@@ -52,7 +51,7 @@ class TestFmcfdGetterSetter(object):
(FmcFineDelay.FmcFineDelayChannel.OUT_PULSE_DELAY_MAX_WIDTH_PS,
FmcFineDelay.FmcFineDelayChannel.OUT_PULSE_DELAY_MAX_PERIOD_PS + 1,
1),
])
])
def test_pulse_delay_invalid(self, fmcfd_chan, width, period, count):
"""The pulse generation can't work with invalid parameters"""
with pytest.raises(OSError):
......@@ -77,9 +76,9 @@ class TestFmcfdGetterSetter(object):
(FmcFineDelay.FmcFineDelayChannel.OUT_PULSE_MAX_WIDTH_PS,
FmcFineDelay.FmcFineDelayChannel.OUT_PULSE_MAX_PERIOD_PS + 1,
1),
])
])
def test_pulse_generate_invalid(self, fmcfd_chan, width, period, count):
"""The pulse generation can't work with invalid parameters"""
with pytest.raises(OSError):
fmcfd_chan.pulse_generate(FmcFineDelayTime(1, 0, 0),
width, period, count)
width, period, count)
......@@ -11,16 +11,6 @@ from PyFmcFineDelay import FmcFineDelay, FmcFineDelayTime
import random
@pytest.fixture(scope="function")
def fmcfd():
fd = FmcFineDelay(pytest.fd_id)
for ch in fd.chan:
ch.disable()
yield fd
for ch in fd.chan:
ch.disable()
@pytest.fixture(scope="function", params=pytest.channels)
def fmcfd_chan(request, fmcfd):
yield fmcfd.chan[request.param]
......@@ -32,7 +22,7 @@ MAX_COUNT = 0xFFFF
@pytest.fixture(scope="function")
def fmcfd_tdc(request, fmcfd):
def fmcfd_tdc(fmcfd):
fmcfd.tdc.enable_input = False
fmcfd.tdc.enable_tstamp = False
fmcfd.tdc.termination = False
......@@ -64,81 +54,53 @@ class TestFmcfdLoop(object):
chan 4 o-------`
"""
def test_output_flush(self, fmcfd, fmcfd_chan, fmcfd_tdc):
period_ps = 400000
fmcfd_chan.pulse_generate(fmcfd.time + FmcFineDelayTime(2, 0, 0),
int(period_ps / 2), period_ps, 1)
assert len(fmcfd_tdc.poll(4000)) > 0
@pytest.mark.parametrize("period_ps", [400000])
def test_output_flush(self, fmcfd, fmcfd_chan, fmcfd_tdc, period_ps):
start = fmcfd.time + FmcFineDelayTime(0, 500000, 0)
fmcfd_chan.pulse_generate(start, int(period_ps / 2), period_ps, 1)
assert len(fmcfd_tdc.poll(2000)) > 0
fmcfd_tdc.flush()
assert len(fmcfd_tdc.poll(4000)) == 0
assert len(fmcfd_tdc.poll(1000)) == 0
@pytest.mark.parametrize("count", [1, MAX_COUNT] +
[random.randrange(MIN_COUNT + 1,
int(MAX_COUNT / 10)) for x in range(8)])
def test_output_counter(self, fmcfd, fmcfd_chan, fmcfd_tdc, count):
"""
In pulse mode, the Fine-Delay generates the exact number of
@pytest.mark.parametrize("count, period_ps",
[(i + 1, 1000000) for i in range(15)]
+ [(100, 4500000000), (MAX_COUNT, 4500000000)])
def test_output_counter(self, fmcfd, fmcfd_chan, fmcfd_tdc, count, period_ps):
"""In pulse mode, the Fine-Delay generates the exact number of
required pulses and we are able to read them all from the input
channel.
"""
ts = []
period_ps = 4500000000
start_s = 2
start = fmcfd.time + FmcFineDelayTime(start_s, 0, 0)
fmcfd_chan.pulse_generate(start, int(period_ps / 2), period_ps, count)
timeout = timeout_compute(start_s, period_ps, count)
while len(ts) < count and time.time() < timeout:
if len(fmcfd_tdc.poll()) == 0:
continue
try:
t = fmcfd_tdc.read(100, os.O_NONBLOCK)
except BlockingIOError:
t = fmcfd_tdc.read(100, os.O_NONBLOCK)
channel with an assigned sequential number.
assert len(t) > 0
ts = ts + t
assert len(ts) == count
assert len(fmcfd_tdc.poll(int(period_ps / 1000000000.0))) == 0
del ts
Internal TDC used a FIFO, therefore it can't handle a too fast and long
burst. Maximumg 15 pulses. On VME it is slower due to the bus: a period
of 4.5ms is the best compromise for this test.
@pytest.mark.parametrize("count", [random.randrange(1000, 10000)])
def test_input_sequence_number(self,fmcfd_chan, fmcfd_tdc, count):
We test the extremes to make the test a bit faster.
"""
The input channel has time-stamps with increasing sequence number
with step 1.
"""
period_ps = 4500000000
pending = count
ts = []
start_s = 2
start = FmcFineDelayTime(start_s, 0, 0)
fmcfd_chan.pulse_generate(fmcfd_chan.dev.time + start,
int(period_ps / 2), period_ps, count)
timeout = timeout_compute(start_s, period_ps, count)
while pending > 0 and time.time() < timeout:
if len(fmcfd_tdc.poll()) == 0:
continue
start = fmcfd.time + FmcFineDelayTime(0, 1000000, 0)
fmcfd_chan.pulse_generate(start, int(period_ps / 2), period_ps, count)
fmcfd_tdc.poll(1000)
while len(fmcfd_tdc.poll(500)) > 0:
try:
t = fmcfd_tdc.read(100, os.O_NONBLOCK)
assert len(t) > 0
ts = ts + t
except BlockingIOError:
t = fmcfd_tdc.read(100, os.O_NONBLOCK)
assert len(t) > 0
ts.extend(t)
pending -= len(t)
assert pending == 0
continue
assert len(ts) == count
assert len(fmcfd_tdc.poll(50)) == 0
prev_ts = None
for i in range(len(ts)):
if prev_ts is not None:
assert ts[i].seq_id == (prev_ts.seq_id + 1) & 0xFFFF,\
"i:{:d}, cur: {:s}, prev: {:s}".format(i, str(ts[i]),
str(prev_ts))
assert ts[i].seq_id == (prev_ts.seq_id + 1) & 0xFFFF, f"i:{i}, cur: {ts[i]}, prev: {prev_ts}"
prev_ts = ts[i]
del ts
@pytest.mark.parametrize("start_rel", [FmcFineDelayTime(random.randrange(0, 60),
random.randrange(0, 125000000),
0) for i in range(10)])
@pytest.mark.parametrize("start_rel",
[FmcFineDelayTime(0, random.randrange(1, 125000000), 0) for i in range(50)]
+ [FmcFineDelayTime(1, random.randrange(1, 125000000), 0) for i in range(5)]
+ [FmcFineDelayTime(2, random.randrange(1, 125000000), 0) for i in range(5)])
@pytest.mark.parametrize("wr", [False, True])
def test_output_input_start(self, fmcfd_chan, fmcfd_tdc, wr, start_rel):
"""
......@@ -158,11 +120,17 @@ class TestFmcfdLoop(object):
ts = fmcfd_tdc.read(count, os.O_NONBLOCK)
assert len(ts) == count
assert start.seconds == ts[0].seconds
assert ts[0].coarse - start.coarse <= 3 # there is < 3ns cable
@pytest.mark.parametrize("period_ps", [random.randrange(400000, 1000000000000)])
@pytest.mark.parametrize("count", [10])
assert ts[0].coarse - start.coarse <= 3 # there is < 3ns cable
@pytest.mark.parametrize("period_ps",
[random.randrange(1000000, 10000000) for x in range(128)]
+ [random.randrange(10000000, 100000000) for x in range(64)]
+ [random.randrange(100000000, 1000000000) for x in range(32)]
+ [random.randrange(1000000000, 10000000000) for x in range(16)]
+ [random.randrange(10000000000, 100000000000) for x in range(8)]
+ [random.randrange(100000000000, 1000000000000) for x in range(4)]
)
@pytest.mark.parametrize("count", [15])
def test_output_period(self, fmcfd_chan, fmcfd_tdc, period_ps, count):
"""
The test produces pulses on the given channels and catch them using
......@@ -171,17 +139,16 @@ class TestFmcfdLoop(object):
"""
ts = []
start = fmcfd_chan.dev.time + FmcFineDelayTime(2, 0, 0, 0, 0)
start = fmcfd_chan.dev.time + FmcFineDelayTime(0, 500000, 0, 0, 0)
fmcfd_chan.pulse_generate(start, int(period_ps / 2), period_ps, count)
time.sleep(2 + (count * period_ps) / 1000000000000.0)
assert len(fmcfd_tdc.poll(10000)) > 0
time.sleep(0.02 + (count * period_ps) / 1000000000000.0)
assert len(fmcfd_tdc.poll(100)) > 0
ts = fmcfd_tdc.read(count, os.O_NONBLOCK)
assert len(ts) == count
prev_ts = None
for i in range(len(ts)):
for i in range(len(ts)):
if prev_ts is not None:
assert ts[i].seq_id == prev_ts.seq_id + 1
period_ts = ts[i] - prev_ts
period = FmcFineDelayTime.from_pico(period_ps)
if period > period_ts:
......@@ -189,8 +156,8 @@ class TestFmcfdLoop(object):
else:
diff = period_ts - period
assert diff < FmcFineDelayTime(0, 1, 0), \
"period difference {:s}\n\tcurr: {:s}\n\tprev: {:s}\n\tperi: {:s}".format(str(diff),
str(ts[i]),
str(prev_ts),
str(period_ts))
"period difference {:s}\n\tcurr: {:s}\n\tprev: {:s}\n\tperi: {:s}".format(str(diff),
str(ts[i]),
str(prev_ts),
str(period_ts))
prev_ts = ts[i]
......@@ -5,6 +5,7 @@ SPDX-FileCopyrightText: 2020 CERN
import pytest
class TestFmcfdTemperature(object):
def test_temperature_read(self, fmcfd):
......
......@@ -8,14 +8,15 @@ import random
import time
from PyFmcFineDelay import FmcFineDelayTime
class TestFmcfdTime(object):
def test_whiterabbit_mode(self, fmcfd):
"""It must be possible to toggle the White-Rabbit status"""
fmcfd.whiterabbit_mode = True
assert fmcfd.whiterabbit_mode == True
assert fmcfd.whiterabbit_mode is True
fmcfd.whiterabbit_mode = False
assert fmcfd.whiterabbit_mode == False
assert fmcfd.whiterabbit_mode is False
def test_time_set_fail_wr(self, fmcfd):
"""Time can't be changed when White-Rabbit is enabled"""
......
......@@ -10,6 +10,7 @@ import time
import os
from PyFmcFineDelay import FmcFineDelay, FmcFineDelayTime
@pytest.fixture(scope="function", params=pytest.channels)
def fmcfd_tdc(request):
fd = FmcFineDelay(pytest.fd_id)
......@@ -65,7 +66,7 @@ class TestFmcfdInput(object):
import pdb
pdb.set_trace()
for i in range(len(ts)):
for i in range(len(ts)):
if prev_ts is not None:
assert ts[i].seq_id == prev_ts.seq_id + 1
diff = float(ts[i]) - float(prev_ts)
......
......@@ -8,6 +8,7 @@ import random
import time
from PyFmcFineDelay import FmcFineDelay, FmcFineDelayTime
@pytest.fixture(scope="function", params=pytest.channels)
def fmcfd_chan(request):
fd = FmcFineDelay(pytest.fd_id)
......@@ -29,18 +30,18 @@ class TestFmcfdOutput(object):
print(" period : {:d}ps".format(period))
print(" count : {:d}".format(count))
@pytest.mark.parametrize("width",[50000, # 50ns
60000, # 60ns
70000, # 70ns
80000, # 80ns
90000, # 90ns
500000, # 500ns
1000000, # 1us
500000000, # 500us
1000000000, # 1ms
500000000000, # 500ms
1000000000000, # 1s
])
@pytest.mark.parametrize("width", [50000, # 50ns
60000, # 60ns
70000, # 70ns
80000, # 80ns
90000, # 90ns
500000, # 500ns
1000000, # 1us
500000000, # 500us
1000000000, # 1ms
500000000000, # 500ms
1000000000000, # 1s
])
@pytest.mark.parametrize("count", [1, 3])
def test_pulse_width(self, capsys, fmcfd_chan, width, count):
with capsys.disabled():
......@@ -84,7 +85,7 @@ class TestFmcfdOutput(object):
while True:
fmcfd_chan.pulse_delay(delay, 250000, 500000, 1)
time.sleep(1 + delay/1000000000000.0)
time.sleep(1 + delay / 1000000000000.0)
ret = self.__process_outcome(fmcfd_chan.idx + 1, delay, 250000, 500000, 1)
if ret in ["y", "n", "q"]:
break
......@@ -92,18 +93,18 @@ class TestFmcfdOutput(object):
pytest.skip("Quit test")
assert ret == "y"
@pytest.mark.parametrize("width",[250000, # 250ns
260000, # 260ns
270000, # 270ns
280000, # 280ns
290000, # 290ns
500000, # 500ns
1000000, # 1us
500000000, # 500us
1000000000, # 1ms
500000000000, # 500ms
1000000000000, # 1s
])
@pytest.mark.parametrize("width", [250000, # 250ns
260000, # 260ns
270000, # 270ns
280000, # 280ns
290000, # 290ns
500000, # 500ns
1000000, # 1us
500000000, # 500us
1000000000, # 1ms
500000000000, # 500ms
1000000000000, # 1s
])
@pytest.mark.parametrize("count", [1, 3])
def test_pulse_delay_width(self, capsys, fmcfd_chan, width, count):
fmcfd_chan.dev.tdc.enable_input = True
......@@ -119,7 +120,7 @@ class TestFmcfdOutput(object):
while True:
fmcfd_chan.pulse_delay(width, width, 500000, 1)
time.sleep(1 + delay/1000000000000.0)
time.sleep(1 + delay / 1000000000000.0)
ret = self.__process_outcome(fmcfd_chan.idx + 1, 600000, width, 500000, 1)
if ret in ["y", "n", "q"]:
break
......
"""
SPDX-License-Identifier: LGPL-2.1-or-later
SPDX-FileCopyrightText: 2020 CERN
"""
import pytest
from PyFmcFineDelay import FmcFineDelay
@pytest.fixture(scope="function")
def fmcfd():
fd = FmcFineDelay(pytest.fd_id)
yield fd
for chan in fd.chan:
chan.disable()
def pytest_addoption(parser):
parser.addoption("--fd-id", type=lambda x : int(x, 16),
required=True, help="Fmc Fine-Delay Linux Identifier")
parser.addoption("--channel", type=int, default=[],
action="append", choices=range(FmcFineDelay.CHANNEL_NUMBER),
help="Channel(s) to be used for acquisition tests. Default all channels")
def pytest_configure(config):
pytest.fd_id = config.getoption("--fd-id")
pytest.channels = config.getoption("--channel")
if len(pytest.channels) == 0:
pytest.channels = range(FmcFineDelay.CHANNEL_NUMBER)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment