Commit a77e93a2 authored by Federico Vaga's avatar Federico Vaga

py: use contextmanager for DMA transfer

Signed-off-by: Federico Vaga's avatarFederico Vaga <federico.vaga@cern.ch>
parent 344c890a
......@@ -11,7 +11,6 @@ from PySPEC import PySPEC
def spec():
spec_dev = PySPEC(pytest.pci_id)
yield spec_dev
spec_dev.dma_stop()
def pytest_addoption(parser):
parser.addoption("--pci-id",
......
......@@ -14,33 +14,31 @@ class TestDma(object):
"""
Users can open and close the DMA channel
"""
spec.dma_start()
spec.dma_stop()
with spec.dma() as dma:
pass
def test_acquisition_release_contention(self, spec):
"""
Refuse simultaneous DMA transfers
"""
spec.dma_start()
spec_c = PySPEC(spec.pci_id)
with pytest.raises(OSError) as error:
spec_c.dma_start()
spec.dma_stop()
with spec.dma() as dma:
spec_c = PySPEC(spec.pci_id)
with pytest.raises(OSError) as error:
with spec_c.dma() as dma2:
pass
def test_dma_no_buffer(self, spec):
"""
The read/write will return immediatelly if asked to perform
0-length transfer.
"""
spec.dma_start()
data = spec.dma_read(0, 0)
spec.dma_stop()
assert len(data) == 0
with spec.dma() as dma:
data = dma.read(0, 0)
assert len(data) == 0
spec.dma_start()
count = spec.dma_write(0, b"")
spec.dma_stop()
assert count == 0
with spec.dma() as dma:
count = dma.write(0, b"")
assert count == 0
@pytest.mark.parametrize("buffer_size",
[2**i for i in range(3, 22)])
......@@ -51,10 +49,9 @@ class TestDma(object):
0x0). On the engine side we will get several transfers
(scatterlist) depending on the size.
"""
spec.dma_start()
data1 = spec.dma_read(0, buffer_size)
data2 = spec.dma_read(0, buffer_size)
spec.dma_stop()
with spec.dma() as dma:
data1 = dma.read(0, buffer_size)
data2 = dma.read(0, buffer_size)
assert len(data1) == buffer_size
assert len(data2) == buffer_size
......@@ -69,10 +66,9 @@ class TestDma(object):
0x0). On the engine side we will get several transfers
(scatterlist) depending on the size.
"""
spec.dma_start()
count = spec.dma_write(0, b"\x00" * buffer_size)
assert count == buffer_size
spec.dma_stop()
with spec.dma() as dma:
count = dma.write(0, b"\x00" * buffer_size)
assert count == buffer_size
@pytest.mark.parametrize("ddr_offset",
[2**i for i in range(2, int(math.log2(PySPEC.DDR_SIZE)))])
......@@ -81,10 +77,9 @@ class TestDma(object):
"""
The DDR access is 4byte aligned.
"""
spec.dma_start()
with pytest.raises(OSError) as error:
spec.dma_read(ddr_offset + unaligned, 16)
spec.dma_stop()
with spec.dma() as dma:
with pytest.raises(OSError) as error:
dma.read(ddr_offset + unaligned, 16)
@pytest.mark.parametrize("ddr_offset",
[2**i for i in range(2, int(math.log2(PySPEC.DDR_SIZE)))])
......@@ -93,10 +88,9 @@ class TestDma(object):
"""
The DDR access is 4byte aligned.
"""
spec.dma_start()
with pytest.raises(OSError) as error:
spec.dma_write(ddr_offset + unaligned, b"\x00" * 16)
spec.dma_stop()
with spec.dma() as dma:
with pytest.raises(OSError) as error:
dma.write(ddr_offset + unaligned, b"\x00" * 16)
@pytest.mark.parametrize("ddr_offset",
[2**i for i in range(2, int(math.log2(PySPEC.DDR_SIZE)))])
......@@ -105,10 +99,9 @@ class TestDma(object):
"""
The DDR access is 4byte aligned.
"""
spec.dma_start()
with pytest.raises(OSError) as error:
spec.dma_read(ddr_offset, (16 + unaligned))
spec.dma_stop()
with spec.dma() as dma:
with pytest.raises(OSError) as error:
dma.read(ddr_offset, (16 + unaligned))
@pytest.mark.parametrize("ddr_offset",
[2**i for i in range(2, int(math.log2(PySPEC.DDR_SIZE)))])
......@@ -117,10 +110,9 @@ class TestDma(object):
"""
The DDR access is 4byte aligned.
"""
spec.dma_start()
with pytest.raises(OSError) as error:
spec.dma_write(ddr_offset, b"\x00" * (16 + unaligned))
spec.dma_stop()
with spec.dma() as dma:
with pytest.raises(OSError) as error:
dma.write(ddr_offset, b"\x00" * (16 + unaligned))
@pytest.mark.parametrize("split", [2**i for i in range(3, 14)])
@pytest.mark.parametrize("ddr_offset", [0x0, ])
......@@ -135,14 +127,13 @@ class TestDma(object):
that we can perform transfers (read) of "any" size.
"""
data = bytes([random.randrange(0, 0xFF, 1) for i in range(buffer_size)])
spec.dma_start()
spec.dma_write(ddr_offset, data)
fail = False
for offset in range(0, buffer_size, split):
data_rb = spec.dma_read(ddr_offset + offset, split)
assert data[offset:min(offset+split, buffer_size)] == data_rb, \
"offset: {:d}".format(offset)
spec.dma_stop()
with spec.dma() as dma:
dma.write(ddr_offset, data)
fail = False
for offset in range(0, buffer_size, split):
data_rb = dma.read(ddr_offset + offset, split)
assert data[offset:min(offset+split, buffer_size)] == data_rb, \
"offset: {:d}".format(offset)
assert fail == False
......@@ -159,13 +150,11 @@ class TestDma(object):
that we can perform transfers (write) of "any" size.
"""
data = bytes([random.randrange(0, 0xFF, 1) for i in range(buffer_size)])
spec.dma_start()
for offset in range(0, buffer_size, split):
spec.dma_write(offset, data[offset:min(offset+split, buffer_size)])
data_rb = spec.dma_read(0x0, buffer_size)
assert data == data_rb, "offset: {:d}".format(offset)
spec.dma_stop()
with spec.dma() as dma:
for offset in range(0, buffer_size, split):
dma.write(offset, data[offset:min(offset+split, buffer_size)])
data_rb = dma.read(0x0, buffer_size)
assert data == data_rb, "offset: {:d}".format(offset)
@pytest.mark.parametrize("split", [2**i for i in range(3, 14)])
@pytest.mark.parametrize("buffer_size", [2**14, ])
......@@ -179,13 +168,12 @@ class TestDma(object):
that we can perform transfers of "any" size.
"""
data = bytes([random.randrange(0, 0xFF, 1) for i in range(buffer_size)])
spec.dma_start()
for offset in range(0, buffer_size, split):
spec.dma_write(offset, data[offset:min(offset+split, buffer_size)])
for offset in range(0, buffer_size, split):
data_rb = spec.dma_read(offset, split)
assert data[offset:min(offset+split, buffer_size)] == data_rb
spec.dma_stop()
with spec.dma() as dma:
for offset in range(0, buffer_size, split):
dma.write(offset, data[offset:min(offset+split, buffer_size)])
for offset in range(0, buffer_size, split):
data_rb = dma.read(offset, split)
assert data[offset:min(offset+split, buffer_size)] == data_rb
@pytest.mark.parametrize("ddr_offset",
[0x0] + \
......@@ -204,10 +192,7 @@ class TestDma(object):
pytest.skip("DDR Overflow!")
data = bytes([random.randrange(0, 0xFF, 1) for i in range(buffer_size)])
spec.dma_start()
spec.dma_write(ddr_offset, data)
data_rb = spec.dma_read(ddr_offset, buffer_size)
spec.dma_stop()
assert data == data_rb
with spec.dma() as dma:
dma.write(ddr_offset, data)
data_rb = dma.read(ddr_offset, buffer_size)
assert data == data_rb
......@@ -6,47 +6,52 @@ SPDX-License-Identifier: LGPL-3.0-or-later
SPDX-FileCopyrightText: 2020 CERN (home.cern)
"""
import os
from contextlib import contextmanager
class PySPEC:
DDR_SIZE = 256 * 1024 * 1024
DDR_ALIGN = 4
class PySPECDMA:
def __init__(self, spec):
self.dma_file = open(os.path.join(spec.debugfs_fpga, "dma"),
"rb+", buffering=0)
def close(self):
self.dma_file.close()
def read(self, offset, size):
"""
Trigger a *device to memory* DMA transfer
"""
self.__seek(offset)
data = []
while size - len(data) > 0:
data += self.dma_file.read(size - len(data))
return bytes(data)
def write(self, offset, data):
"""
Trigger a *device to memory* DMA transfer
"""
self.__seek(offset)
start = 0
while len(data) - start > 0:
start += self.dma_file.write(bytes(data[start:]))
return start
def __seek(self, offset):
self.dma_file.seek(offset)
def __init__(self, pci_id):
self.pci_id = pci_id
self.debugfs = "/sys/kernel/debug/0000:{:s}".format(self.pci_id)
self.debugfs_fpga = os.path.join(self.debugfs, "spec-0000:{:s}".format(self.pci_id))
self.dma = os.path.join(self.debugfs_fpga, "dma")
def __del__(self):
self.dma_stop()
def dma_start(self):
self.dma_file = open(self.dma, "rb+", buffering=0)
def dma_stop(self):
if hasattr(self, "dma_file"):
self.dma_file.close()
del self.dma_file
def dma_read(self, offset, size):
"""
Trigger a *device to memory* DMA transfer
"""
self.__dma_seek(offset)
data = []
while size - len(data) > 0:
data += self.dma_file.read(size - len(data))
return bytes(data)
def dma_write(self, offset, data):
"""
Trigger a *device to memory* DMA transfer
"""
self.__dma_seek(offset)
start = 0
while len(data) - start > 0:
start += self.dma_file.write(bytes(data[start:]))
return start
def __dma_seek(self, offset):
self.dma_file.seek(offset)
@contextmanager
def dma(self):
spec_dma = self.PySPECDMA(self)
try:
yield spec_dma
finally:
spec_dma.close()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment