Commit 116d47aa authored by Federico Vaga's avatar Federico Vaga

tests: improve the integration tests

Signed-off-by: Federico Vaga's avatarFederico Vaga <federico.vaga@cern.ch>
parent e6deb071
import os
import PyMockTurtle
import pytest
import time
@pytest.fixture(scope="module")
def devid():
......@@ -17,7 +18,15 @@ def cfg():
cfg.app_id = 0xd330d330
cfg.n_cpu = 2
cfg.smem_size = 0x00002000
cfg.hmq_width = 0
cfg.n_hmq[0] = 2
cfg.n_hmq[1] = 2
cfg.n_rmq[0] = 1
cfg.n_rmq[1] = 1
cfg.hmq[0].sizes = 0x0
cfg.hmq[1].sizes = 0
cfg.hmq[8].sizes = 0
cfg.hmq[9].sizes = 0
return cfg
@pytest.fixture(scope="module", params=range(cfg().n_cpu))
......@@ -31,10 +40,10 @@ def trtl_device(request):
return dev
@pytest.fixture(scope="module", params=range(cfg().n_cpu))
def trtl_cpu(request):
dev = PyMockTurtle.TrtlDevice(devid())
yield dev.cpu[request.param]
dev.cpu[request.param].disable()
def trtl_cpu(request, trtl_device):
trtl_device.cpu[request.param].disable()
yield trtl_device.cpu[request.param]
trtl_device.cpu[request.param].disable()
@pytest.fixture(scope="module")
def trtl_shm():
......@@ -42,6 +51,27 @@ def trtl_shm():
return dev.shm
@pytest.fixture(scope="module")
def trtl_hmq():
dev = PyMockTurtle.TrtlDevice(devid())
return dev
def trtl_firmware_dir():
return os.path.dirname(os.path.abspath(os.path.realpath(__file__)))
@pytest.fixture(scope="module", params=[2**x for x in range(5)])
def trtl_msg(request):
"""This provides a list of test messages. Each test message contains
two values (so len = 2): a sequence number, the maximum number in the
sequence. These two values in the last message are equal
"""
msg_list = []
for val in range(1, request.param + 1):
msg = PyMockTurtle.TrtlMessage()
msg.header.len = 2
msg.payload[0] = val
msg.payload[1] = request.param
msg_list.append(msg)
return msg_list
# @pytest.fixture(scope="module")
# def trtl_hmq():
# dev = PyMockTurtle.TrtlDevice(devid()) for cpu in dev.cpu:
# for hmq in cpu.hmq:
# yield hmq
......@@ -2,6 +2,10 @@
DIRS := serial
DIRS += cpu-loop
DIRS += config_rom
DIRS += hmq-async-recv
DIRS += hmq-async-send
DIRS += hmq-sync
DIRS += hmq-purge
all clean cleanall modules install modules_install: $(DIRS)
......
......@@ -3,22 +3,31 @@
int main()
{
int i, n_word = sizeof(struct trtl_config_rom) / 4;
int i, k, n_word = sizeof(struct trtl_config_rom) / 4, size, cpu = 0;
const struct trtl_config_rom *cfgrom = trtl_config_rom_get();
const uint32_t *data = (const uint32_t *)cfgrom;
uint32_t *msg, *hdr;
pr_debug("CONFIG ROM\r\n");
mq_claim(TRTL_HMQ, 0);
hdr = mq_map_out_header(TRTL_HMQ, 0);
msg = mq_map_out_buffer(TRTL_HMQ, 0);
for (i = 0; i < n_word; i++) {
msg[i] = data[i];
pr_debug("%p = 0x%08"PRIx32"\r\n", &data[i], data[i]);
cpu = 0;
size = TRTL_CONFIG_ROM_MQ_SIZE_PAYLOAD(cfgrom->hmq[cpu * TRTL_MAX_CPU].sizes);
for (i = 0; i < (n_word / size) + 1; ++i) {
mq_claim(TRTL_HMQ, 0);
hdr = mq_map_out_header(TRTL_HMQ, 0);
msg = mq_map_out_buffer(TRTL_HMQ, 0);
for (k = 0; k < size; k++) {
if (i * size + k >= n_word)
break;
msg[k] = data[i * size + k];
pr_debug("%p = 0x%08"PRIx32"\r\n",
&data[i * size + k],
data[i * size + k]);
}
hdr[0] = size - 1; /* last valid index */
mq_send(TRTL_HMQ, 0);
}
hdr[0] = n_word - 1; /* last valid index */
mq_send(TRTL_HMQ, 0);
return 0;
}
mainmenu "hmq_async_recv test configuration"
comment "Project specific configuration"
config FPGA_APPLICATION_ID
int "FPGA application ID"
default 0
help
Help text
config RT_APPLICATION_ID
int "RT application ID"
default 0
help
Help text
# include Mock Turtle's Kconfig
source "Kconfig.mt"
OBJS = hmq-async-recv.o
OBJS += # add other object files that you need
OUTPUT = fw-hmq-async-recv
TRTL ?= ../../../
TRTL_SW = $(TRTL)/software
CFLAGS_OPT = -O0 # disable optimization
include $(TRTL_SW)/rt/Makefile
#
# Automatically generated file; DO NOT EDIT.
# config_rom test configuration
#
#
# Project specific configuration
#
CONFIG_FPGA_APPLICATION_ID=0
CONFIG_RT_APPLICATION_ID=0
#
# Mock Turtle configuration
#
#
# Mock Turtle framework configuration
#
# CONFIG_MOCKTURTLE_FRAMEWORK_ENABLE is not set
#
# Mock Turtle library configuration
#
# CONFIG_MOCKTURTLE_LIBRARY_PRINT_ENABLE is not set
#include <mockturtle-rt.h>
#include <hw/mockturtle_config.h>
int main()
{
int i, n_word, depth, slot, cpu, hmq;
const struct trtl_config_rom *cfgrom = trtl_config_rom_get();
volatile uint32_t *msg;
uint32_t sizes;
volatile struct trtl_hmq_header *hdr;
pr_debug("ASYNC MESSAGES RECV\r\n");
cpu = 0;
for (hmq = 0; hmq < cfgrom->n_hmq[cpu]; ++hmq)
mq_purge(TRTL_HMQ, hmq);
for (hmq = 0; hmq < cfgrom->n_hmq[cpu]; ++hmq) {
sizes = cfgrom->hmq[cpu * TRTL_MAX_MQ_CHAN + hmq].sizes;
n_word = TRTL_CONFIG_ROM_MQ_SIZE_PAYLOAD(sizes);
depth = TRTL_CONFIG_ROM_MQ_SIZE_ENTRIES(sizes);
for (slot = 0; slot < depth; ++slot) {
pr_debug("\tcpu: %d, hmq: %d, len: %d\r\n",
cpu, hmq, n_word);
mq_claim(TRTL_HMQ, hmq);
hdr = mq_map_out_header(TRTL_HMQ, hmq);
msg = mq_map_out_buffer(TRTL_HMQ, hmq);
for (i = 0; i < n_word; i++)
msg[i] = i;
hdr->len = n_word - 1; /* last valid index */
mq_send(TRTL_HMQ, hmq);
}
}
return 0;
}
mainmenu "hmq_async_send test configuration"
comment "Project specific configuration"
config FPGA_APPLICATION_ID
int "FPGA application ID"
default 0
help
Help text
config RT_APPLICATION_ID
int "RT application ID"
default 0
help
Help text
# include Mock Turtle's Kconfig
source "Kconfig.mt"
OBJS = hmq-async-send.o
OBJS += # add other object files that you need
OUTPUT = fw-hmq-async-send
TRTL ?= ../../../
TRTL_SW = $(TRTL)/software
CFLAGS_OPT = -O0 # disable optimization
include $(TRTL_SW)/rt/Makefile
#
# Automatically generated file; DO NOT EDIT.
# config_rom test configuration
#
#
# Project specific configuration
#
CONFIG_FPGA_APPLICATION_ID=0
CONFIG_RT_APPLICATION_ID=0
#
# Mock Turtle configuration
#
#
# Mock Turtle framework configuration
#
# CONFIG_MOCKTURTLE_FRAMEWORK_ENABLE is not set
#
# Mock Turtle library configuration
#
CONFIG_MOCKTURTLE_LIBRARY_PRINT_ENABLE=y
# CONFIG_MOCKTURTLE_LIBRARY_PRINT_DEBUG_ENABLE is not set
CONFIG_MOCKTURTLE_LIBRARY_PRINT_ERROR_ENABLE=y
#include <mockturtle-rt.h>
#include <hw/mockturtle_config.h>
int main()
{
int cpu, hmq;
const struct trtl_config_rom *cfgrom = trtl_config_rom_get();
volatile uint32_t *msg;
volatile struct trtl_hmq_header *hdr;
int wait = 0, max_wait_cycle, count;
pr_debug("ASYNC MESSAGES SEND\r\n");
cpu = 0;
for (hmq = 0; hmq < cfgrom->n_hmq[cpu]; ++hmq) {
wait = (1 << hmq);
hdr = mq_map_in_header(TRTL_HMQ, hmq);
msg = mq_map_in_buffer(TRTL_HMQ, hmq);
count = 1;
max_wait_cycle = 0xFFFF;
while (1) {
/* Wait incoming message */
while ((hmq_poll_in() & wait) != wait) {
if (--max_wait_cycle == 0) {
pr_error("\tNO MESSAGE PENDING h:%d, cnt:0x%x\r\n",
hmq, count);
return -1;
}
}
/* validate message */
if (hdr->len != 1 || msg[0] != count) {
pr_error("\th: %d, l:%ld, d[0]:0x%lx, cnt:0x%x\r\n",
hmq, hdr->len, msg[0], count);
mq_discard(TRTL_HMQ, hmq);
return -1;
}
if (msg[1] < count) {
pr_error("\th:%d, l:%ld d[1]:0x%lx, cnt:0x%x\r\n",
hmq, hdr->len, msg[1], count);
mq_discard(TRTL_HMQ, hmq);
return -1;
}
/* stop on the last one */
if (msg[0] == msg[1]) {
mq_discard(TRTL_HMQ, hmq);
break;
}
/* goto the next one */
mq_discard(TRTL_HMQ, hmq);
count++;
}
}
pp_printf("OK\r\n");
return 0;
}
mainmenu "hmq_purge test configuration"
comment "Project specific configuration"
config FPGA_APPLICATION_ID
int "FPGA application ID"
default 0
help
Help text
config RT_APPLICATION_ID
int "RT application ID"
default 0
help
Help text
# include Mock Turtle's Kconfig
source "Kconfig.mt"
OBJS = hmq-purge.o
OBJS += # add other object files that you need
OUTPUT = fw-hmq-purge
TRTL ?= ../../../
TRTL_SW = $(TRTL)/software
CFLAGS_OPT = -O0 # disable optimization
include $(TRTL_SW)/rt/Makefile
#
# Automatically generated file; DO NOT EDIT.
# config_rom test configuration
#
#
# Project specific configuration
#
CONFIG_FPGA_APPLICATION_ID=0
CONFIG_RT_APPLICATION_ID=0
#
# Mock Turtle configuration
#
#
# Mock Turtle framework configuration
#
# CONFIG_MOCKTURTLE_FRAMEWORK_ENABLE is not set
#
# Mock Turtle library configuration
#
CONFIG_MOCKTURTLE_LIBRARY_PRINT_ENABLE=y
# CONFIG_MOCKTURTLE_LIBRARY_PRINT_DEBUG_ENABLE is not set
CONFIG_MOCKTURTLE_LIBRARY_PRINT_ERROR_ENABLE=y
#include <mockturtle-rt.h>
#include <hw/mockturtle_config.h>
int main()
{
int cpu, hmq;
const struct trtl_config_rom *cfgrom = trtl_config_rom_get();
pr_debug("PURGE MESSAGES\r\n");
pr_debug("Fill the HMQ while CPU is not running\r\n");
cpu = 0;
for (hmq = 0; hmq < cfgrom->n_hmq[cpu]; ++hmq) {
if ((hmq_poll_in() & (1 << hmq)) != (1 << hmq)) {
pr_error("HMQ-%d after poll: 0x%lx\r\n",
hmq, hmq_poll_in());
return -1;
}
mq_purge(TRTL_HMQ, hmq);
if ((hmq_poll_in() & (1 << hmq)) == (1 << hmq)) {
pr_error("HMQ-%d after poll: 0x%lx\r\n",
hmq, hmq_poll_in());
return -1;
}
}
if (hmq_poll_in() != 0) {
pr_error("END poll: 0x%lx\r\n", hmq_poll_in());
return -1;
}
pp_printf("OK\r\n");
return 0;
}
mainmenu "hmq_sync test configuration"
comment "Project specific configuration"
config FPGA_APPLICATION_ID
int "FPGA application ID"
default 0
help
Help text
config RT_APPLICATION_ID
int "RT application ID"
default 0
help
Help text
# include Mock Turtle's Kconfig
source "Kconfig.mt"
OBJS = hmq-sync.o
OBJS += # add other object files that you need
OUTPUT = fw-hmq-sync
TRTL ?= ../../../
TRTL_SW = $(TRTL)/software
CFLAGS_OPT = -O0 # disable optimization
include $(TRTL_SW)/rt/Makefile
#
# Automatically generated file; DO NOT EDIT.
# config_rom test configuration
#
#
# Project specific configuration
#
CONFIG_FPGA_APPLICATION_ID=0
CONFIG_RT_APPLICATION_ID=0
#
# Mock Turtle configuration
#
#
# Mock Turtle framework configuration
#
# CONFIG_MOCKTURTLE_FRAMEWORK_ENABLE is not set
#
# Mock Turtle library configuration
#
CONFIG_MOCKTURTLE_LIBRARY_PRINT_ENABLE=y
# CONFIG_MOCKTURTLE_LIBRARY_PRINT_DEBUG_ENABLE is not set
CONFIG_MOCKTURTLE_LIBRARY_PRINT_ERROR_ENABLE=y
#include <mockturtle-rt.h>
#include <hw/mockturtle_config.h>
int main()
{
int cpu, hmq, wait = 0, max_wait_cycle, count;
const struct trtl_config_rom *cfgrom = trtl_config_rom_get();
volatile uint32_t *msg_r, *msg_s;
volatile struct trtl_hmq_header *hdr_r, *hdr_s;
pr_debug("SYNC MESSAGES\r\n");
cpu = 0;
for (hmq = 0; hmq < cfgrom->n_hmq[cpu]; ++hmq)
mq_purge(TRTL_HMQ, hmq);
for (hmq = 0; hmq < cfgrom->n_hmq[cpu]; ++hmq) {
wait = (1 << hmq);
hdr_r = mq_map_in_header(TRTL_HMQ, hmq);
msg_r = mq_map_in_buffer(TRTL_HMQ, hmq);
hdr_s = mq_map_out_header(TRTL_HMQ, hmq);
msg_s = mq_map_out_buffer(TRTL_HMQ, hmq);
count = 1;
max_wait_cycle = 0xFFFFFFFF;
while (1) {
/* Wait incoming message */
while ((hmq_poll_in() & wait) != wait) {
if (--max_wait_cycle == 0) {
pr_error("\tNO MESSAGE PENDING h:%d, cnt:0x%x\r\n",
hmq, count);
return -1;
}
}
/* validate message */
if (hdr_r->len != 1 || msg_r[0] != count) {
pr_error("\th: %d, l:%ld, d[0]:0x%lx, cnt:0x%x\r\n",
hmq, hdr_r->len, msg_r[0], count);
mq_discard(TRTL_HMQ, hmq);
return -1;
}
if (msg_r[1] < count) {
pr_error("\th:%d, l:%ld d[1]:0x%lx, cnt:0x%x\r\n",
hmq, hdr_r->len, msg_r[1], count);
mq_discard(TRTL_HMQ, hmq);
return -1;
}
mq_claim(TRTL_HMQ, hmq);
hdr_s->len = hdr_r->len;
hdr_s->sync_id = hdr_r->sync_id;
memcpy(msg_s, msg_r, (hdr_r->len + 1) * 4);
mq_discard(TRTL_HMQ, hmq);
mq_send(TRTL_HMQ, hmq);
count++;
}
}
return 0;
}
......@@ -19,8 +19,6 @@ class TestConfig(object):
assert trtl_device.rom.n_cpu == cfg.n_cpu
def test_valid_softcpu(self, trtl_device, trtl_cpu, devid, cfg):
trtl_hmq = PyMockTurtle.TrtlHmq(devid, 0, PyMockTurtle.TrtlHmq.FLAGS_DIR_OUT)
trtl_cpu.disable()
trtl_cpu.load_application_file(firmware_file_config())
trtl_cpu.enable()
......@@ -28,9 +26,7 @@ class TestConfig(object):
# Give time to the soft-CPU to build and send the message
time.sleep(0.1)
msg = trtl_hmq.recv_msg()
del trtl_hmq
msg = trtl_cpu.hmq[0].recv_msg()
assert msg.payload[0] == cfg.signature
assert msg.payload[5] == cfg.app_id
......
......@@ -14,20 +14,19 @@ class TestCPU(object):
pass
def test_exist(self, trtl_cpu):
path = "/dev/mockturtle/trtl-{:04x}-cpu-{:02d}".format(trtl_cpu.device_id,
trtl_cpu.cpu_index)
path = "/dev/mockturtle/trtl-{:04x}-{:02d}".format(trtl_cpu.trtl_dev.device_id,
trtl_cpu.idx_cpu)
assert True == os.path.exists(path)
def test_disable(self, trtl_cpu):
def test_enable_disable(self, trtl_cpu):
trtl_cpu.enable()
assert True == trtl_cpu.is_enable()
trtl_cpu.disable()
assert False == trtl_cpu.is_enable()
def test_enable(self, trtl_cpu):
trtl_cpu.enable()
assert True == trtl_cpu.is_enable()
def test_load(self, trtl_cpu):
trtl_cpu.disable()
trtl_cpu.load_application_file(firmware_file_loop())
trtl_cpu.enable()
......
import os
import PyMockTurtle
import pytest
import serial
import time
@pytest.fixture
def trtl_binary_hmq_purge(trtl_firmware_dir):
return os.path.join(trtl_firmware_dir,
"firmware/hmq-purge/fw-hmq-purge.bin")
@pytest.fixture
def trtl_binary_hmq_async_send(trtl_firmware_dir):
return os.path.join(trtl_firmware_dir,
"firmware/hmq-async-send/fw-hmq-async-send.bin")
@pytest.fixture
def trtl_binary_hmq_async_recv(trtl_firmware_dir):
return os.path.join(trtl_firmware_dir,
"firmware/hmq-async-recv/fw-hmq-async-recv.bin")
@pytest.fixture
def trtl_binary_hmq_sync(trtl_firmware_dir):
return os.path.join(trtl_firmware_dir,
"firmware/hmq-sync/fw-hmq-sync.bin")
class TestHmq(object):
def test_async_send(self, trtl_hmq):
pass
confirm = 'OK\r\n'
def test_purge(self, trtl_cpu, trtl_binary_hmq_purge):
if trtl_cpu.idx_cpu != 0:
return
trtl_cpu.load_application_file(trtl_binary_hmq_purge)
# fill HMQ
for hmq in trtl_cpu.hmq:
hmq.flush()
msg = PyMockTurtle.TrtlMessage()
msg.header.len = 1
msg.payload[0] = 0x55555555
hmq.send_msg(msg)
# Run and check
with serial.Serial("/dev/ttyTRTL{:d}".format(trtl_cpu.idx_cpu)) as ser:
ser.reset_input_buffer()
trtl_cpu.enable()
time.sleep(1)
assert ser.in_waiting >= len(self.confirm)
assert self.confirm == ser.read(ser.in_waiting).decode()
def test_purge_overflow(self, trtl_cpu, trtl_binary_hmq_purge):
if trtl_cpu.idx_cpu != 0:
return
trtl_cpu.load_application_file(trtl_binary_hmq_purge)
# Fill HMQ
for hmq in trtl_cpu.hmq:
idx_cfg = trtl_cpu.idx_cpu * PyMockTurtle.TRTL_CONFIG_ROM_MAX_CPU + hmq.idx_hmq
# write more that what the HMQ can store
for n in range(trtl_cpu.trtl_dev.rom.hmq[idx_cfg].entries):
msg = PyMockTurtle.TrtlMessage()
msg.header.len = 1
msg.payload[0] = 0xAAAAAAAA
hmq.send_msg(msg)
# Run and check
with serial.Serial("/dev/ttyTRTL{:d}".format(trtl_cpu.idx_cpu)) as ser:
ser.reset_input_buffer()
trtl_cpu.enable()
time.sleep(1)
assert ser.in_waiting >= len(self.confirm)
assert self.confirm == ser.read(ser.in_waiting).decode()
def test_async_send(self, trtl_cpu, trtl_msg, trtl_binary_hmq_async_send):
"""It sends the test messages on all available HMQ.
The test is successful when we read "OK" from the serial console
"""
if trtl_cpu.idx_cpu != 0:
return
trtl_cpu.load_application_file(trtl_binary_hmq_async_send)
for hmq in trtl_cpu.hmq:
hmq.flush()
for msg in trtl_msg:
hmq.send_msg(msg)
with serial.Serial("/dev/ttyTRTL{:d}".format(trtl_cpu.idx_cpu)) as ser:
ser.reset_input_buffer()
trtl_cpu.enable()
time.sleep(1)
assert ser.in_waiting >= len(self.confirm)
assert self.confirm == ser.read(ser.in_waiting).decode()
def test_async_recv(self, trtl_cpu, trtl_binary_hmq_async_recv):
"""It receivess messages on all available HMQ.
The test is successful when we recevie all the expected messages
"""
if trtl_cpu.idx_cpu != 0:
return
trtl_cpu.load_application_file(trtl_binary_hmq_async_recv)
for hmq in trtl_cpu.hmq:
hmq.flush()
trtl_cpu.enable()
time.sleep(0.1)
for hmq in trtl_cpu.hmq:
idx_cfg = trtl_cpu.idx_cpu * PyMockTurtle.TRTL_CONFIG_ROM_MAX_CPU + hmq.idx_hmq
for n in range(trtl_cpu.trtl_dev.rom.hmq[idx_cfg].entries):
msg = hmq.recv_msg()
payload = list(msg.payload)
for i, val in enumerate(payload):
if i >= msg.header.len:
break
assert i == val
def test_sync(self, trtl_cpu, trtl_msg, trtl_binary_hmq_sync):
"""It sends the test messages on all available HMQ.
The test is successful when what we read back is the same we sent
"""
if trtl_cpu.idx_cpu != 0:
return
trtl_cpu.load_application_file(trtl_binary_hmq_sync)
trtl_cpu.enable()
time.sleep(0.1)
for hmq in trtl_cpu.hmq:
hmq.flush()
def test_async_recv(self, trtl_hmq):
pass
if hmq.idx_hmq != 0:
continue
def test_sync(self, trtl_hmq):
pass
for msg in trtl_msg:
print(msg.payload[0], msg.payload[1])
msg_r = hmq.sync_msg(msg)
assert msg_r == msg
import os
import pytest
import serial
import stat
import time
@pytest.fixture
def firmware_file_serial():
testdir = os.path.dirname(os.path.abspath(os.path.realpath(__file__)))
return os.path.join(testdir, "firmware/serial/fw-serial.bin")
def firmware_file_serial(trtl_firmware_dir):
return os.path.join(trtl_firmware_dir, "firmware/serial/fw-serial.bin")
@pytest.fixture
def firmware_output():
......@@ -33,17 +33,15 @@ class TestSerial(object):
assert st.st_mode & stat.S_IRGRP != 0
@pytest.mark.parametrize("msg_ref", enumerate(firmware_output()))
def test_print(self, trtl_cpu, msg_ref):
def test_print(self, trtl_cpu, msg_ref, firmware_file_serial):
trtl_cpu.disable()
trtl_cpu.load_application_file(firmware_file_serial())
trtl_cpu.load_application_file(firmware_file_serial)
f = os.open("/dev/ttyTRTL{:d}".format(trtl_cpu.cpu_index),
os.O_RDONLY | os.O_NONBLOCK)
with serial.Serial("/dev/ttyTRTL{:d}".format(trtl_cpu.idx_cpu)) as ser:
trtl_cpu.enable()
time.sleep(1) # wait for all the characters
trtl_cpu.enable()
time.sleep(1) # wait for all the characters
nbyte = len(msg_ref[1])
msg = os.read(f, nbyte).decode()
os.close(f)
nbyte = len(msg_ref[1])
msg = ser.read(nbyte).decode()
assert msg_ref[1] == msg
assert msg_ref[1] == msg
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment