Commit 72812507 authored by Dimitris Lampridis's avatar Dimitris Lampridis

pytest: implemented fd->tdc snake test

adc-lib not used yet to check fmc-adc acquisition
parent ce627e2a
# SPDX-FileCopyrightText: 2022 CERN (home.cern)
# SPDX-License-Identifier: TODO
from PyWrtd import PyWrtd
import time
class WrtdTestNode(PyWrtd):
def __init__(self, node_id):
PyWrtd.__init__(self, node_id)
self.node_id = node_id
self.disable_all_alarms()
self.remove_all_alarms()
self.disable_all_rules()
#self.remove_all_rules()
# temporary because remove_all_rules crashes
rule_count = self.get_attr_int32(self.WRTD_GLOBAL_REP_CAP_ID,
self.WRTD_ATTR_RULE_COUNT)
for i in range(rule_count):
rule = self.get_rule_name(rule_count-i-1)
self.remove_rule(rule)
self.set_attr_bool(self.WRTD_GLOBAL_REP_CAP_ID,
self.WRTD_ATTR_EVENT_LOG_ENABLED,
False)
self.clear_event_log_entries()
self.set_attr_bool(self.WRTD_GLOBAL_REP_CAP_ID,
self.WRTD_ATTR_EVENT_LOG_ENABLED,
True)
def set_attr_tstamp(self, rep_cap_id, id,
seconds = 0, ns = 0, frac = 0):
sec_normalised = seconds
ns_normalised = ns
while (ns_normalised >= 1000000000):
ns_normalised -= 1000000000
sec_normalised += 1
while (ns_normalised < 0):
ns_normalised += 1000000000
sec_normalised -= 1
PyWrtd.set_attr_tstamp(self, rep_cap_id, id,
sec_normalised, ns_normalised, frac)
def add_rule_enabled(self, name, src, dst, ns_delay = 0, repeat = 0):
self.add_rule(name)
self.set_attr_tstamp(name, self.WRTD_ATTR_RULE_DELAY, ns = ns_delay)
self.set_attr_string(name, self.WRTD_ATTR_RULE_SOURCE, src)
self.set_attr_string(name, self.WRTD_ATTR_RULE_DESTINATION, dst)
self.set_attr_int32(name, self.WRTD_ATTR_RULE_REPEAT_COUNT, repeat)
self.set_attr_bool(name, self.WRTD_ATTR_RULE_ENABLED, True)
def check_log_entry(self, pattern, timeout = 1.0):
remaining = timeout
while self.get_attr_bool(self.WRTD_GLOBAL_REP_CAP_ID,
self.WRTD_ATTR_EVENT_LOG_EMPTY):
assert remaining > 0
time.sleep(0.1)
remaining -= 0.1
entry = self.get_next_event_log_entry()
log_fields = [ x.strip() for x in entry.split('|')]
assert len(log_fields) == 6
log = dict()
log['id'] = log_fields[0].lstrip('Id:')
log['seq'] = log_fields[1].lstrip('Seq:')
log['log_tstamp'] = log_fields[2]
log['event_tstamp'] = log_fields[3]
log['log_type'] = log_fields[4]
log['log_reason'] = log_fields[5]
for key in pattern:
assert key in log
assert log[key] == pattern[key]
return log
......@@ -3,37 +3,19 @@
import pytest
from PyWrtd import PyWrtd
def _init_wrtd_node(node_id):
_node = PyWrtd(node_id)
_node.disable_all_alarms()
_node.remove_all_alarms()
_node.disable_all_rules()
#_node.remove_all_rules()
# temporary because remove_all_rules crashes
rule_count = _node.get_attr_int32(_node.WRTD_GLOBAL_REP_CAP_ID,
_node.WRTD_ATTR_RULE_COUNT)
for i in range(rule_count):
rule = _node.get_rule_name(i)
_node.remove_rule(rule)
return _node
from WrtdTestHelper import *
@pytest.fixture()
def adc():
return _init_wrtd_node(pytest.adc_node)
return WrtdTestNode(pytest.adc_node)
@pytest.fixture()
def tdc():
return _init_wrtd_node(pytest.tdc_node)
return WrtdTestNode(pytest.tdc_node)
@pytest.fixture()
def fdelay():
return _init_wrtd_node(pytest.fd_node)
@pytest.fixture(params=[pytest.adc_node, pytest.tdc_node, pytest.fd_node])
def node(request):
return _init_wrtd_node(request.param)
return WrtdTestNode(pytest.fd_node)
class TestWrtdRefDesigns(object):
"""
......@@ -44,7 +26,159 @@ class TestWrtdRefDesigns(object):
node_count = PyWrtd.get_node_count()
assert node_count == 3
def test_node_info(self, adc, tdc, fdelay):
assert adc.get_fw_name(0) == "wrtd-adc-x2"
assert tdc.get_fw_name(0) == "wrtd-tdc-x2"
assert fdelay.get_fw_name(0) == "wrtd-fd-x2"
@pytest.mark.parametrize('node, fw_name', [('adc', 'wrtd-adc-x2'),
('tdc', 'wrtd-tdc-x2'),
('fdelay', 'wrtd-fd-x2')])
def test_fw_name(self, node, fw_name, request):
wrtd = request.getfixturevalue(node)
assert wrtd.get_fw_name(0) == fw_name
# TODO: add tdc-fdel and tdc-adc once alarms have been added to tdc
@pytest.mark.parametrize('tx_node, rx_nodes', [('adc', ['fdelay'])])
def test_alarm_log_and_net(self, tx_node, rx_nodes, request,fdelay):
"""
Schedule an alarm in tx_node, send an event and check the logs to
see that the alarm triggered, the event was sent out and that it was
received by all rx_nodes
"""
transmitter = request.getfixturevalue(tx_node)
transmitter.add_alarm('alarm1')
transmitter.add_rule('rule1')
transmitter.set_attr_tstamp('rule1', transmitter.WRTD_ATTR_RULE_DELAY, ns = 500000)
transmitter.set_attr_string('rule1', transmitter.WRTD_ATTR_RULE_SOURCE, 'alarm1')
transmitter.set_attr_string('rule1', transmitter.WRTD_ATTR_RULE_DESTINATION, 'net0')
transmitter.set_attr_bool('rule1', transmitter.WRTD_ATTR_RULE_ENABLED, True)
ts_alarm = transmitter.get_attr_tstamp(transmitter.WRTD_GLOBAL_REP_CAP_ID,
transmitter.WRTD_ATTR_SYS_TIME)
ts_alarm['ns'] += 10000000
transmitter.set_attr_tstamp('alarm1', transmitter.WRTD_ATTR_ALARM_TIME,
seconds = ts_alarm['seconds'],
ns = ts_alarm['ns'],
frac = ts_alarm['frac'])
transmitter.set_attr_bool('alarm1', transmitter.WRTD_ATTR_ALARM_ENABLED, True)
transmitter.check_log_entry({'id': 'alarm1', 'log_type': 'GENERATED'})
transmitter.check_log_entry({'id': 'net0', 'log_type' : 'NETWORK', 'log_reason': 'TX'})
for rx_node in rx_nodes:
receiver = request.getfixturevalue(rx_node)
receiver.check_log_entry({'id': 'net0', 'log_type': 'NETWORK', 'log_reason': 'RX'})
@pytest.mark.parametrize('channel', [1, 2, 3, 4])
@pytest.mark.parametrize('fmc', [0, 1])
def test_fd_out_tdc_in(self, tdc, fdelay, fmc, channel):
"""
Schedule an alarm on fdelay, generate pulse on output, check that
it goes out and that it is received by the tdc. FD out 4 is wired
to both TDC in 4 and 5.
"""
fdelay.add_alarm('alarm1')
fd_channel = f'LC-O{4*fmc+channel}'
fdelay.add_rule_enabled('rule1', 'alarm1', fd_channel, 500000)
ts_alarm = fdelay.get_attr_tstamp(fdelay.WRTD_GLOBAL_REP_CAP_ID,
fdelay.WRTD_ATTR_SYS_TIME)
ts_alarm['ns'] += 10000000
fdelay.set_attr_tstamp('alarm1', fdelay.WRTD_ATTR_ALARM_TIME,
seconds = ts_alarm['seconds'],
ns = ts_alarm['ns'],
frac = ts_alarm['frac'])
fdelay.set_attr_bool('alarm1', fdelay.WRTD_ATTR_ALARM_ENABLED, True)
fdelay.check_log_entry({'id': 'alarm1', 'log_type': 'GENERATED'})
fdelay.check_log_entry({'id': fd_channel, 'log_type': 'CONSUMED', 'log_reason': 'START'})
fdelay.check_log_entry({'id': fd_channel, 'log_type' : 'CONSUMED', 'log_reason': 'DONE'})
tdc_channel = f'LC-I{5*fmc+channel}'
tdc.check_log_entry({'id': tdc_channel, 'log_type': 'GENERATED'})
if channel == 4:
tdc_channel = f'LC-I{5*(fmc+1)}'
tdc.check_log_entry({'id': tdc_channel, 'log_type' : 'GENERATED'})
@pytest.mark.parametrize('fmc', [0, 1])
def test_fd_tdc_snake_adc_trigin(self, tdc, fdelay, adc, fmc):
"""
Schedule an alarm on fdelay, generate pulse on FD output 1, receive
on TDC input 1, generate event, add rule to generate pulse on FD
output 2, etc. TDC input 4 should cause one last pulse on FD output 1
and TDC input 5 should trigger the ADC over the network.
"""
for channel in [1, 2, 3, 4, 5]:
tdc_channel = f'LC-I{5*fmc+channel}'
rule_name = f'rule.{tdc_channel}'
tdc.add_rule_enabled(rule_name, tdc_channel, f'net{tdc_channel}', 500000, 1)
if channel < 5:
if channel == 4:
fd_channel = f'LC-O{4*fmc+1}'
else:
fd_channel = f'LC-O{4*fmc+channel+1}'
rule_name = f'rule.{fd_channel}'
fdelay.add_rule_enabled(rule_name, f'net{tdc_channel}', fd_channel)
else:
adc_channel = f'LC-O{fmc+1}'
rule_name = f'rule.{adc_channel}'
adc.add_rule_enabled(rule_name, f'net{tdc_channel}', adc_channel)
fdelay.add_alarm('alarm1')
fd_channel = f'LC-O{4*fmc+1}'
fdelay.add_rule_enabled('rule-alarm', 'alarm1', fd_channel, 500000)
ts_alarm = fdelay.get_attr_tstamp(fdelay.WRTD_GLOBAL_REP_CAP_ID,
fdelay.WRTD_ATTR_SYS_TIME)
ts_alarm['ns'] += 10000000
fdelay.set_attr_tstamp('alarm1', fdelay.WRTD_ATTR_ALARM_TIME,
seconds = ts_alarm['seconds'],
ns = ts_alarm['ns'],
frac = ts_alarm['frac'])
fdelay.set_attr_bool('alarm1', fdelay.WRTD_ATTR_ALARM_ENABLED, True)
fdelay.check_log_entry({'id': 'alarm1', 'log_type': 'GENERATED'})
fd_channel = f'LC-O{4*fmc+1}'
fdelay.check_log_entry({'id': fd_channel, 'log_type': 'CONSUMED', 'log_reason': 'START'})
fdelay.check_log_entry({'id': fd_channel, 'log_type': 'CONSUMED', 'log_reason': 'DONE'})
for channel in [1, 2, 3, 4]:
tdc_channel = f'LC-I{5*fmc+channel}'
tdc.check_log_entry({'id': tdc_channel, 'log_type': 'GENERATED'})
event_name = f'net{tdc_channel}'
tdc.check_log_entry({'id': event_name, 'log_type': 'NETWORK', 'log_reason': 'TX'})
fdelay.check_log_entry({'id': event_name, 'log_type': 'NETWORK', 'log_reason': 'RX'})
adc.check_log_entry({'id': event_name, 'log_type': 'NETWORK', 'log_reason': 'RX'})
if channel == 4:
fd_channel = f'LC-O{4*fmc+1}'
else:
fd_channel = f'LC-O{4*fmc+channel+1}'
fdelay.check_log_entry({'id': fd_channel, 'log_type': 'CONSUMED',
'log_reason': 'START'})
# two events will arrive from TDC in 4 and 5 almost simultaneously
if channel == 4:
event_name =f'netLC-I{5*(fmc+1)}'
fdelay.check_log_entry({'id': event_name, 'log_type': 'NETWORK',
'log_reason': 'RX'})
adc.check_log_entry({'id': event_name, 'log_type': 'NETWORK', 'log_reason': 'RX'})
fdelay.check_log_entry({'id': fd_channel, 'log_type': 'CONSUMED', 'log_reason': 'DONE'})
adc_channel = f'LC-O{fmc+1}'
adc.check_log_entry({'id': adc_channel, 'log_type': 'CONSUMED', 'log_reason': 'START'})
adc.check_log_entry({'id': adc_channel, 'log_type': 'CONSUMED', 'log_reason': 'DONE'})
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment