Commit 7703aa91 authored by Alén Arias Vázquez's avatar Alén Arias Vázquez 😎

complete design RMQ Fifo

parent 8335be0c
......@@ -21,9 +21,12 @@ TOPLEVEL:=rmq_stream_fifo
MODULE:=tb_rmq_stream_fifo
RTL_LIBRARY?=work
# Define variables
# Define value variable for CI
CDC_ENABLE=$(shell expr $(shell date +'%M') % 2)
# Define variables, possible to modify using make g_CDC_ENABLE=0
g_DATA_WIDTH?=32
g_CDC_ENABLE?=1
g_CDC_ENABLE?=$(CDC_ENABLE)
g_FIFO_DEPTH?=64
g_show_ahead?=true
......
"""
Test RMQ STREAM FIFO
"""
import cocotb
import numpy as np
import time
import random
import math
from cocotb.clock import Clock
from cocotb.triggers import RisingEdge, FallingEdge, Timer
from cocotb.regression import TestFactory
from cocotb.result import *
from rmq import RMQMaster, RMQSlave
# ------------------------------------------------------------------------------
# fill data
def GetRMQFrame(header, data_len, data_width, randomize):
data = []
mask = 2**data_width - 1
data.append(header & mask)
data.append(~(header) & mask)
for i in range(data_len):
if randomize:
data.append(random.randint(0, (2**data_width)-1) & mask)
else:
if i % 8 == 0:
data.append(0xDEADC0DE & mask)
elif i % 8 == 1:
data.append(0xCAFED00D & mask)
elif i % 8 == 2:
data.append(0xDEADCAFE & mask)
elif i % 8 == 3:
data.append(0xBADC0FFE & mask)
elif i % 8 == 4:
data.append(0xBAADF00D & mask)
elif i % 8 == 5:
data.append(0xDEFEC8ED & mask)
elif i % 8 == 6:
data.append(0xDEADBEEF & mask)
elif i % 8 == 7:
data.append(0xFEEDBABE & mask)
return data
# ------------------------------------------------------------------------------
# Setup clock signal
def setup_clock(signal_name, period, units="ns"):
cocotb.start_soon(Clock(signal_name, period, units=units).start())
# ------------------------------------------------------------------------------
# Main Test
@cocotb.coroutine
async def test(dut, n_rst_cycles, data_len, randomize, ready_clk_off):
g_CDC = dut.g_CDC_ENABLE.value
data_width = dut.g_data_width.value
rmq_snk_msg = GetRMQFrame(header=0xFACEB00C, data_len=data_len, data_width=data_width, randomize=randomize)
m_rmq = RMQMaster(dut=dut, clk=dut.rmq_snk_clk_i, if_name='snk', header_size = 1)
s_rmq = RMQSlave(dut=dut, clk=dut.rmq_src_clk_i, if_name='src', header_size = 1)
if g_CDC == 1:
dut._log.warning("Clock domain crossing enable")
setup_clock(signal_name=dut.rmq_snk_clk_i, period=10)
setup_clock(signal_name=dut.rmq_src_clk_i, period=3*10)
else:
dut._log.warning("Clock domain crossing disable")
setup_clock(signal_name=dut.rmq_snk_clk_i, period=10)
setup_clock(signal_name=dut.rmq_src_clk_i, period=10)
cocotb.start_soon(reset(dut, n_rst_cycles))
await delay(dut.rmq_snk_clk_i, 1)
while dut.rst_n_i.value != 1:
await RisingEdge(dut.rmq_snk_clk_i)
cocotb.start_soon(m_rmq.input_data_rmq())
cocotb.start_soon(s_rmq.output_data_rmq())
cocotb.start_soon(s_rmq.tready_behaviour(max_cycles_off=ready_clk_off, randomize=randomize))
await delay(dut.rmq_snk_clk_i, 20)
m_rmq.send_data(rmq_snk_msg)
while not (s_rmq.rmq_request_c == 1):
await RisingEdge(dut.rmq_snk_clk_i)
rmq_src_msg = s_rmq.get_data()
if len(rmq_src_msg) != len(rmq_snk_msg):
dut._log.error("Expected length {} --- Read length {}".format(len(rmq_snk_msg),len(rmq_src_msg)))
dut._log.error("Length is not coherent")
raise SimFailure()
else:
dut._log.info("RMQ message length is coherent")
if (np.array_equal(rmq_src_msg, rmq_snk_msg)):
dut._log.info("RMQ message integrity is correct")
else:
for i in range(len(rmq_snk_msg)):
dut._log.warning("Data tx: {} ----- Data rx: {}".format(hex(rmq_snk_msg[i]),hex(rmq_src_msg[i])))
dut._log.error("Incossitence between tx and rx")
raise SimFailure()
await delay(dut.rmq_snk_clk_i, 10)
dut._log.info("Testbench finished successfully")
# ------------------------------------------------------------------------------
# Factory
factory = TestFactory(test)
factory.add_option("n_rst_cycles", [11])
factory.add_option("data_len", [4, 11, 17, 32, 71])
factory.add_option("randomize", [False, True])
factory.add_option("ready_clk_off", [1, 3, 7])
factory.generate_tests()
# ------------------------------------------------------------------------------
# Reset
@cocotb.coroutine
def reset(dut, n_rst_cycles):
dut._log.info("Asserting Sytem Reset")
dut.rst_n_i.setimmediatevalue(0)
dut.rmq_src_config_i.adr.setimmediatevalue(0)
dut.rmq_src_config_i.dat.setimmediatevalue(0)
dut.rmq_src_config_i.we.setimmediatevalue(0)
dut.rmq_snk_config_i.adr.setimmediatevalue(0)
dut.rmq_snk_config_i.dat.setimmediatevalue(0)
dut.rmq_snk_config_i.we.setimmediatevalue(0)
for i in range(n_rst_cycles):
yield RisingEdge(dut.rmq_snk_clk_i)
dut.rst_n_i.value = 1
yield RisingEdge(dut.rmq_snk_clk_i)
dut._log.info("Deasserting Sytem Reset")
# ------------------------------------------------------------------------------
# delay
@cocotb.coroutine
def delay(signal_name, n_cycles):
for i in range(n_cycles):
yield RisingEdge(signal_name)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment