Commit 4dfbb86c authored by Federico Vaga's avatar Federico Vaga

tst: add dma() fixture for simpler and faster tests

Signed-off-by: Federico Vaga's avatarFederico Vaga <federico.vaga@cern.ch>
parent 80e2f825
...@@ -12,6 +12,12 @@ def spec(): ...@@ -12,6 +12,12 @@ def spec():
spec_dev = PySPEC(pytest.pci_id) spec_dev = PySPEC(pytest.pci_id)
yield spec_dev yield spec_dev
@pytest.fixture(scope="class")
def dma():
spec = PySPEC(pytest.pci_id)
with spec.dma() as spec_dma:
yield spec_dma
def pytest_addoption(parser): def pytest_addoption(parser):
parser.addoption("--pci-id", parser.addoption("--pci-id",
required=True, help="SPEC PCI Identifier") required=True, help="SPEC PCI Identifier")
......
...@@ -29,97 +29,87 @@ class TestDma(object): ...@@ -29,97 +29,87 @@ class TestDma(object):
with spec_c.dma() as dma2: with spec_c.dma() as dma2:
pass pass
def test_dma_no_buffer(self, spec): def test_dma_no_buffer(self, dma):
""" """
The read/write will return immediatelly if asked to perform The read/write will return immediatelly if asked to perform
0-length transfer. 0-length transfer.
""" """
with spec.dma() as dma: data = dma.read(0, 0)
data = dma.read(0, 0) assert len(data) == 0
assert len(data) == 0 count = dma.write(0, b"")
assert count == 0
with spec.dma() as dma:
count = dma.write(0, b"")
assert count == 0
@pytest.mark.parametrize("buffer_size", @pytest.mark.parametrize("buffer_size",
[2**i for i in range(3, 22)]) [2**i for i in range(3, 22)])
def test_dma_read(self, spec, buffer_size): def test_dma_read(self, dma, buffer_size):
""" """
We just want to see if the DMA engine reports errors. Test the We just want to see if the DMA engine reports errors. Test the
engine with different sizes, but same offset (default: engine with different sizes, but same offset (default:
0x0). On the engine side we will get several transfers 0x0). On the engine side we will get several transfers
(scatterlist) depending on the size. (scatterlist) depending on the size.
""" """
with spec.dma() as dma: data1 = dma.read(0, buffer_size)
data1 = dma.read(0, buffer_size) data2 = dma.read(0, buffer_size)
data2 = dma.read(0, buffer_size)
assert len(data1) == buffer_size assert len(data1) == buffer_size
assert len(data2) == buffer_size assert len(data2) == buffer_size
assert data1 == data2 assert data1 == data2
@pytest.mark.parametrize("buffer_size", @pytest.mark.parametrize("buffer_size",
[2**i for i in range(3, 22)]) [2**i for i in range(3, 22)])
def test_dma_write(self, spec, buffer_size): def test_dma_write(self, dma, buffer_size):
""" """
We just want to see if the DMA engine reports errors. Test the We just want to see if the DMA engine reports errors. Test the
engine with different sizes, but same offset (default: engine with different sizes, but same offset (default:
0x0). On the engine side we will get several transfers 0x0). On the engine side we will get several transfers
(scatterlist) depending on the size. (scatterlist) depending on the size.
""" """
with spec.dma() as dma: count = dma.write(0, b"\x00" * buffer_size)
count = dma.write(0, b"\x00" * buffer_size) assert count == buffer_size
assert count == buffer_size
@pytest.mark.parametrize("ddr_offset", @pytest.mark.parametrize("ddr_offset",
[2**i for i in range(2, int(math.log2(PySPEC.DDR_SIZE)))]) [2**i for i in range(2, int(math.log2(PySPEC.DDR_SIZE)))])
@pytest.mark.parametrize("unaligned", range(1, PySPEC.DDR_ALIGN)) @pytest.mark.parametrize("unaligned", range(1, PySPEC.DDR_ALIGN))
def test_dma_unaligned_offset_read(self, spec, ddr_offset, unaligned): def test_dma_unaligned_offset_read(self, dma, ddr_offset, unaligned):
""" """
The DDR access is 4byte aligned. The DDR access is 4byte aligned.
""" """
with spec.dma() as dma: with pytest.raises(OSError) as error:
with pytest.raises(OSError) as error: dma.read(ddr_offset + unaligned, 16)
dma.read(ddr_offset + unaligned, 16)
@pytest.mark.parametrize("ddr_offset", @pytest.mark.parametrize("ddr_offset",
[2**i for i in range(2, int(math.log2(PySPEC.DDR_SIZE)))]) [2**i for i in range(2, int(math.log2(PySPEC.DDR_SIZE)))])
@pytest.mark.parametrize("unaligned", range(1, PySPEC.DDR_ALIGN)) @pytest.mark.parametrize("unaligned", range(1, PySPEC.DDR_ALIGN))
def test_dma_unaligned_offset_write(self, spec, ddr_offset, unaligned): def test_dma_unaligned_offset_write(self, dma, ddr_offset, unaligned):
""" """
The DDR access is 4byte aligned. The DDR access is 4byte aligned.
""" """
with spec.dma() as dma: with pytest.raises(OSError) as error:
with pytest.raises(OSError) as error: dma.write(ddr_offset + unaligned, b"\x00" * 16)
dma.write(ddr_offset + unaligned, b"\x00" * 16)
@pytest.mark.parametrize("ddr_offset", @pytest.mark.parametrize("ddr_offset",
[2**i for i in range(2, int(math.log2(PySPEC.DDR_SIZE)))]) [2**i for i in range(2, int(math.log2(PySPEC.DDR_SIZE)))])
@pytest.mark.parametrize("unaligned", range(1, PySPEC.DDR_ALIGN)) @pytest.mark.parametrize("unaligned", range(1, PySPEC.DDR_ALIGN))
def test_dma_unaligned_size_read(self, spec, ddr_offset, unaligned): def test_dma_unaligned_size_read(self, dma, ddr_offset, unaligned):
""" """
The DDR access is 4byte aligned. The DDR access is 4byte aligned.
""" """
with spec.dma() as dma: with pytest.raises(OSError) as error:
with pytest.raises(OSError) as error: dma.read(ddr_offset, (16 + unaligned))
dma.read(ddr_offset, (16 + unaligned))
@pytest.mark.parametrize("ddr_offset", @pytest.mark.parametrize("ddr_offset",
[2**i for i in range(2, int(math.log2(PySPEC.DDR_SIZE)))]) [2**i for i in range(2, int(math.log2(PySPEC.DDR_SIZE)))])
@pytest.mark.parametrize("unaligned", range(1, PySPEC.DDR_ALIGN)) @pytest.mark.parametrize("unaligned", range(1, PySPEC.DDR_ALIGN))
def test_dma_unaligned_size_write(self, spec, ddr_offset, unaligned): def test_dma_unaligned_size_write(self, dma, ddr_offset, unaligned):
""" """
The DDR access is 4byte aligned. The DDR access is 4byte aligned.
""" """
with spec.dma() as dma: with pytest.raises(OSError) as error:
with pytest.raises(OSError) as error: dma.write(ddr_offset, b"\x00" * (16 + unaligned))
dma.write(ddr_offset, b"\x00" * (16 + unaligned))
@pytest.mark.parametrize("split", [2**i for i in range(3, 14)]) @pytest.mark.parametrize("split", [2**i for i in range(3, 14)])
@pytest.mark.parametrize("ddr_offset", [0x0, ]) @pytest.mark.parametrize("ddr_offset", [0x0, ])
@pytest.mark.parametrize("buffer_size", [2**14, ]) @pytest.mark.parametrize("buffer_size", [2**14, ])
def test_dma_split_read(self, spec, buffer_size, ddr_offset, split, capsys): def test_dma_split_read(self, dma, buffer_size, ddr_offset, split, capsys):
""" """
Write and read back buffers using DMA. We test different combinations Write and read back buffers using DMA. We test different combinations
of offset and size. Here we atrificially split the **read** in small of offset and size. Here we atrificially split the **read** in small
...@@ -129,17 +119,16 @@ class TestDma(object): ...@@ -129,17 +119,16 @@ class TestDma(object):
that we can perform transfers (read) of "any" size. that we can perform transfers (read) of "any" size.
""" """
data = bytes([random.randrange(0, 0xFF, 1) for i in range(buffer_size)]) data = bytes([random.randrange(0, 0xFF, 1) for i in range(buffer_size)])
with spec.dma() as dma: dma.write(ddr_offset, data)
dma.write(ddr_offset, data) data_rb = b""
data_rb = b"" for offset in range(0, buffer_size, split):
for offset in range(0, buffer_size, split): data_rb += dma.read(ddr_offset + offset, split)
data_rb += dma.read(ddr_offset + offset, split) assert data == data_rb
assert data == data_rb
@pytest.mark.parametrize("split", [2**i for i in range(3, 14)]) @pytest.mark.parametrize("split", [2**i for i in range(3, 14)])
@pytest.mark.parametrize("ddr_offset", [0x0, ]) @pytest.mark.parametrize("ddr_offset", [0x0, ])
@pytest.mark.parametrize("buffer_size", [2**14, ]) @pytest.mark.parametrize("buffer_size", [2**14, ])
def test_dma_split_write(self, spec, buffer_size, ddr_offset, split): def test_dma_split_write(self, dma, buffer_size, ddr_offset, split):
""" """
Write and read back buffers using DMA. We test different combinations Write and read back buffers using DMA. We test different combinations
of offset and size. Here we atrificially split the **write** in small of offset and size. Here we atrificially split the **write** in small
...@@ -149,15 +138,14 @@ class TestDma(object): ...@@ -149,15 +138,14 @@ class TestDma(object):
that we can perform transfers (write) of "any" size. that we can perform transfers (write) of "any" size.
""" """
data = bytes([random.randrange(0, 0xFF, 1) for i in range(buffer_size)]) data = bytes([random.randrange(0, 0xFF, 1) for i in range(buffer_size)])
with spec.dma() as dma: for offset in range(0, buffer_size, split):
for offset in range(0, buffer_size, split): dma.write(offset, data[offset:min(offset+split, buffer_size)])
dma.write(offset, data[offset:min(offset+split, buffer_size)]) data_rb = dma.read(0x0, buffer_size)
data_rb = dma.read(0x0, buffer_size) assert data == data_rb
assert data == data_rb
@pytest.mark.parametrize("split", [2**i for i in range(3, 14)]) @pytest.mark.parametrize("split", [2**i for i in range(3, 14)])
@pytest.mark.parametrize("buffer_size", [2**14, ]) @pytest.mark.parametrize("buffer_size", [2**14, ])
def test_dma_split(self, spec, buffer_size, split): def test_dma_split(self, dma, buffer_size, split):
""" """
Write and read back buffers using DMA. We test different combinations Write and read back buffers using DMA. We test different combinations
of offset and size. Here we atrificially split transfers in small of offset and size. Here we atrificially split transfers in small
...@@ -167,26 +155,24 @@ class TestDma(object): ...@@ -167,26 +155,24 @@ class TestDma(object):
that we can perform transfers of "any" size. that we can perform transfers of "any" size.
""" """
data = bytes([random.randrange(0, 0xFF, 1) for i in range(buffer_size)]) data = bytes([random.randrange(0, 0xFF, 1) for i in range(buffer_size)])
with spec.dma() as dma: for offset in range(0, buffer_size, split):
for offset in range(0, buffer_size, split): dma.write(offset, data[offset:min(offset+split, buffer_size)])
dma.write(offset, data[offset:min(offset+split, buffer_size)]) for offset in range(0, buffer_size, split):
for offset in range(0, buffer_size, split): data_rb = dma.read(offset, split)
data_rb = dma.read(offset, split) assert data[offset:min(offset+split, buffer_size)] == data_rb
assert data[offset:min(offset+split, buffer_size)] == data_rb
@pytest.mark.parametrize("segment_size", [2**i for i in range(3, 20)]) @pytest.mark.parametrize("segment_size", [2**i for i in range(3, 20)])
def test_dma_scatterlist_read(self, spec, segment_size): def test_dma_scatterlist_read(self, dma, segment_size):
""" """
Enforce a scatterlist on known size to read 1MiB. Enforce a scatterlist on known size to read 1MiB.
""" """
buffer_size = 2**20 buffer_size = 2**20
data = bytes([random.randrange(0, 0xFF, 1) for i in range(buffer_size)]) data = bytes([random.randrange(0, 0xFF, 1) for i in range(buffer_size)])
with spec.dma() as dma: dma.write(0, data)
dma.write(0, data) assert data == dma.read(0, len(data), segment_size)
assert data == dma.read(0, len(data), segment_size)
@pytest.mark.parametrize("segment_size", [2**i for i in range(3, 12)]) @pytest.mark.parametrize("segment_size", [2**i for i in range(3, 12)])
def test_dma_scatterlist_write(self, spec, segment_size): def test_dma_scatterlist_write(self, dma, segment_size):
""" """
Enforce a scatterlist on known size to write 1MiB. Enforce a scatterlist on known size to write 1MiB.
Remember: on write the scatterlist is enforced by the driver Remember: on write the scatterlist is enforced by the driver
...@@ -194,9 +180,8 @@ class TestDma(object): ...@@ -194,9 +180,8 @@ class TestDma(object):
""" """
buffer_size = 2**20 buffer_size = 2**20
data = bytes([random.randrange(0, 0xFF, 1) for i in range(buffer_size)]) data = bytes([random.randrange(0, 0xFF, 1) for i in range(buffer_size)])
with spec.dma() as dma: dma.write(0, data, segment_size)
dma.write(0, data, segment_size) assert data == dma.read(0, len(data))
assert data == dma.read(0, len(data))
@pytest.mark.parametrize("ddr_offset", @pytest.mark.parametrize("ddr_offset",
[0x0] + \ [0x0] + \
...@@ -205,7 +190,7 @@ class TestDma(object): ...@@ -205,7 +190,7 @@ class TestDma(object):
@pytest.mark.parametrize("buffer_size", @pytest.mark.parametrize("buffer_size",
[2**i for i in range(int(math.log2(PySPEC.DDR_ALIGN)) + 1, 22)] + \ [2**i for i in range(int(math.log2(PySPEC.DDR_ALIGN)) + 1, 22)] + \
[random.randrange(PySPEC.DDR_ALIGN * 2, 4096, PySPEC.DDR_ALIGN) for x in range(random_repetitions)]) [random.randrange(PySPEC.DDR_ALIGN * 2, 4096, PySPEC.DDR_ALIGN) for x in range(random_repetitions)])
def test_dma(self, spec, ddr_offset, buffer_size): def test_dma(self, dma, ddr_offset, buffer_size):
""" """
Write and read back buffers using DMA. We test different combinations Write and read back buffers using DMA. We test different combinations
of offset and size. Here we try to perform transfers as large as of offset and size. Here we try to perform transfers as large as
...@@ -215,12 +200,11 @@ class TestDma(object): ...@@ -215,12 +200,11 @@ class TestDma(object):
pytest.skip("DDR Overflow!") pytest.skip("DDR Overflow!")
data = bytes([random.randrange(0, 0xFF, 1) for i in range(buffer_size)]) data = bytes([random.randrange(0, 0xFF, 1) for i in range(buffer_size)])
with spec.dma() as dma: dma.write(ddr_offset, data)
dma.write(ddr_offset, data) data_rb = dma.read(ddr_offset, buffer_size)
data_rb = dma.read(ddr_offset, buffer_size) assert data == data_rb
assert data == data_rb
def test_dma_reg_zero(self, spec): def test_dma_reg_zero(self, dma):
""" """
Regression test. Regression test.
It happend that after 256Bytes the received data is just It happend that after 256Bytes the received data is just
...@@ -228,10 +212,9 @@ class TestDma(object): ...@@ -228,10 +212,9 @@ class TestDma(object):
among the first power of two values among the first power of two values
""" """
data = bytes([random.randrange(0, 0xFF, 1) for i in range(1024)]) data = bytes([random.randrange(0, 0xFF, 1) for i in range(1024)])
with spec.dma() as dma: dma.write(0, data)
dma.write(0, data) for i in range(1000000):
for i in range(1000000): assert data == dma.read(0, len(data))
assert data == dma.read(0, len(data))
@pytest.mark.skipif(pytest.cfg_bitstream is None, @pytest.mark.skipif(pytest.cfg_bitstream is None,
reason="We need a bitstream to reflash") reason="We need a bitstream to reflash")
...@@ -244,14 +227,13 @@ class TestDma(object): ...@@ -244,14 +227,13 @@ class TestDma(object):
it may take a long time before it happens again. it may take a long time before it happens again.
""" """
spec.program_fpga(pytest.cfg_bitstream) spec.program_fpga(pytest.cfg_bitstream)
with spec.dma() as dma: data = b"0123456789abcdefghilmnopqrstuvwx"
data = b"0123456789abcdefghilmnopqrstuvwx" dma.write(0, data)
dma.write(0, data) for i in range(10000000):
for i in range(10000000): try:
try: assert data[0:4] == dma.read(0, 4)
assert data[0:4] == dma.read(0, 4) except OSError as error:
except OSError as error: assert False, "Failed after {:d} transfers".format(i)
assert False, "Failed after {:d} transfers".format(i)
@pytest.mark.parametrize("dma_alloc_size", @pytest.mark.parametrize("dma_alloc_size",
[2**20 * x for x in range(1, 5)]) [2**20 * x for x in range(1, 5)])
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment