Commit e4c55497 authored by Dimitris Lampridis's avatar Dimitris Lampridis

Merge branch 'orson-python-v2' into 'master'

Python interface v2

See merge request be-cem-edl/fec/hardware-modules/rf-att-4cha!5
parents c5c55293 79ce4657
......@@ -16,3 +16,4 @@ install:
install -m 0775 $(LIBNAME).a $(LIBNAME).so $(INSTALL_ROOT)/lib
install -m 0775 $(NAME)test $(INSTALL_ROOT)/bin
install -m 0775 $(LIBNAME).h usb-enum.h $(INSTALL_ROOT)/include
pip install . --target $(INSTALL_ROOT)/lib
......@@ -43,8 +43,15 @@ static struct {
int oattnusb_open(int lun)
{
/* not implemented: ignore lun by the moment */
return -ENOSYS;
struct oattnusb_devarray *list;
list = oattnusb_get_devarray(OATTNUSB_VENDOR_ID, OATTNUSB_DEVICE_ID);
if (lun < 0 || lun >= list->ndevs) {
return -1;
}
return oattnusb_open_by_serial(list->devs[lun].serial);
}
int oattnusb_open_by_serial(const char *serial)
......
......@@ -131,6 +131,13 @@ int oattnusb_set_relay(int fd, int channel, enum oattnusb_value_enum val);
*/
int oattnusb_get_nchannels(int fd);
/**
* @brief Displays available devices
*
* @return 0 on success.
*/
int oattnusb_list_devices(void);
/* liboattnusb version string */
extern const char * const liboattnusb_version_s;
#ifdef __cplusplus
......
#!/usr/bin/env python3
# SPDX-FileCopyrightText: 2023 CERN (home.cern)
#!/bin/bash
# SPDX-FileCopyrightText: 2024 CERN (home.cern)
#
# SPDX-License-Identifier: LGPL-2.1-or-later
# vim: et ts=8 sw=4 sts=4 ai
import cmd
import readline
import os.path
import pdb
from ctypes import *
class Dev(Structure):
_fields_ = [
( 'lun', c_int ),
( 'busnum', c_char * 6 ),
( 'devnum', c_char * 6 ),
( 'portnum', c_char * 6 ),
( 'busport', c_char * 16 ),
( 'devno', c_char * 10 ),
( 'serial', c_char * 48 ),
( 'path', c_char * 256 ),
( 'unused', c_int * 4 ),
]
class DevArray(Structure):
_fields_ = [
( 'ndevs', c_int ),
( 'devs', POINTER(Dev) ),
]
class OattnUsbTest(cmd.Cmd):
VENDOR_ID = 0x1556 # CERN USB Vendor ID
DEVICE_ID = 0x0443 # oau USB Device ID
FTDI_VENDOR_ID = 0x403 # FTDI Vendor ID
FTDI_DEVICE_ID = 0x6010 # FTDI Product ID
attn_values = {
'-40dB' : 0,
'-20dB' : 1,
'0dB' : 3,
}
attn_labels = {
0 : '-40dB',
1 : '-20dB',
3 : '0dB',
}
intro = 'Oasis USB Attenuator CLI. Type "help" or "?" to list commands\n'
prompt = 'oatt> '
def __init__(self):
cmd.Cmd.__init__(self)
self.lun = None
self.fd = None
self.nchannels = None
self.load_solib()
self.devices = self.fill_devarray(self.VENDOR_ID, self.DEVICE_ID)
def load_solib(self):
"""try to load attn dyn library from script dir or ../lib."""
self.solib = './liboattnusb.so'
scriptdir = os.path.dirname(os.path.realpath(__file__))
rootdir = os.path.dirname(scriptdir)
libdir = os.path.join(rootdir, 'lib', self.solib)
self.history = os.path.expanduser('~/.oattnusb_history')
try:
self.api = CDLL(self.solib)
except OSError as e:
pass
try:
self.api = CDLL(libdir)
except OSError as e:
print('cannot load oattn solib, exiting')
exit()
def fill_devarray(self, vid, pid):
self.api.oattnusb_get_devarray.restype = POINTER(DevArray)
self.devarray = self.api.oattnusb_get_devarray(vid, pid)
ndevs = self.devarray.contents.ndevs
devs = self.devarray.contents.devs
devices = []
for i in range(ndevs):
dev = devs[i]
devices.append({
'lun' : dev.lun,
'busnum' : str(dev.busnum, 'utf8'),
'devnum' : str(dev.devnum, 'utf8'),
'portnum' : str(dev.portnum, 'utf8'),
'busport' : str(dev.busport, 'utf8'),
'devno' : str(dev.devno, 'utf8'),
'serial' : str(dev.serial, 'utf8'),
'path' : str(dev.path , 'utf8')})
return devices
def do_list(self, arg):
"""list devices and their access methods."""
for i, dev in enumerate(self.devices):
print('{}:\tl:{}\tserial:{}\tbusdev:{}/{}\tbusport:{}'.format(i,
dev['lun'], dev['serial'], dev['busnum'], dev['devnum'],
dev['busport']))
def do_open(self, arg):
"""open device by serial number."""
try:
serial = self.devices[0]['serial'] if not arg else arg
except IndexError as e:
print('no devices found, cannot open')
return
fd = self.api.oattnusb_open_by_serial(bytes(serial, 'utf8'))
if fd < 0:
print('cannot open device {}'.format(serial))
return
self.fd = fd
self.nchannels = self.read_nchannels()
print('current device: serial {}, fd {}, {} channels'.format(
serial, fd, self.nchannels))
def do_close(self, arg):
"""close connection to current device."""
if self.fd is None:
print('no active connection')
return
err = self.api.oattnusb_close(self.fd)
if err < 0:
print('could not close fd {}'.format(self.fd))
return
self.fd = None
self.nchannels = None
def read_nchannels(self):
err = self.api.oattnusb_get_nchannels(self.fd)
if err < 0:
print('error {} getting number of channels'.format(err))
return
return err
def do_nchannels(self, arg):
"""number of channels of current device."""
if self.nchannels is None:
print('no active connection')
return
print('{} channels'.format(self.nchannels))
def do_get_attn(self, arg):
"""display attenuation value of each channel."""
if self.fd is None:
print('no active connection')
return
for i in range(1, self.nchannels+1):
attn = self.api.oattnusb_get_relay(self.fd, i)
print('ch {}: attn {}'.format(i, self.attn_labels[attn]))
def do_set_attn(self, args):
"""set attenuation value of channel."""
try:
ch, attn = args.strip().split()
ch = int(ch)
attn = self.attn_values[attn]
except ValueError as e:
print('usage: set_attn ch {-40dB|-20dB|0dB}')
print(' ch = 1..{}'.format(self.nchannels))
return
except KeyError as e:
print('invalid attenuation value')
print('usage: set_attn ch {-40dB|-20dB|0dB}')
print(' NB: attenuation labels MUST be literally correct')
return
if self.fd is None:
print('no active connection')
return
err = self.api.oattnusb_set_relay(self.fd, ch, attn);
if err < 0:
print('error setting attenuation {} for channel {}'.format(attn, ch))
return
def do_reset(self, args):
"""reset all channel attns to the default (-40dB)."""
if self.fd is None or self.nchannels is None:
print('no active connection')
return
for ch in range(1, self.nchannels+1):
err = self.api.oattnusb_set_relay(self.fd, ch, self.attn_values['-40dB'])
if err < 0:
print('error resetting attenuation in channel {}'.format(ch))
return
def do_eeprom_program(self, args):
"""EXPERT: program the EEPROM (you will be asked to provide a serial number)."""
try:
bus, dev = [int(arg) for arg in args.split()]
except ValueError as e:
print('usage: eeprom_program bus dev')
return
except Exception as e:
raise
for device in self.devices:
if (bus, dev) == (int(device['busnum']), int(device['devnum'])):
break
else:
# Also check if there could be any unprogrammed devices which
# still have the default FTDI VID/PID
ftdi_devices = self.fill_devarray(self.FTDI_VENDOR_ID, self.FTDI_DEVICE_ID)
for device in ftdi_devices:
if (bus, dev) == (int(device['busnum']), int(device['devnum'])):
break
else:
print('device bus:{} dev:{} not found'.format(bus, dev))
return
serial = input('Please provide the serial number to program: ')
print('WARNING: about to erase and program the EEPROM with new serial number [{}]'
' into dev with old serial: [{}] at busdev: {}/{}, busport: {}'.format(
serial, device['serial'],
device['busnum'], device['devnum'], device['busport']))
if input('are you sure? (yes/N) ') != 'yes':
print('ok, giving up.')
return
self.api.oau_factory_program_eeprom.argtypes = [c_int, c_int, c_char_p]
ret = self.api.oau_factory_program_eeprom(bus, dev, bytes(str(serial), 'ascii'))
if (ret != 0):
print('EEPROM programming error detected')
else:
print('new serial recorded!')
print('quitting, please restart the test program')
return True
def do_version(self, arg):
"""display software version."""
version = c_char_p.in_dll(self.api, "liboattnusb_version_s")
version = str(version.value, 'utf8')
print(version)
def do_EOF(self, arg):
return True
def my_preloop(self):
try:
readline.read_history_file(self.history)
except FileNotFoundError:
readline.write_history_file(self.history)
def my_postloop(self):
readline.write_history_file(self.history)
# catch Ctrl-C and drop me back into cli
def cmdloop_intr(self, intro=None):
self.my_preloop()
while True:
try:
self.cmdloop()
break
except KeyboardInterrupt as e:
self.intro = None
print('^C')
self.my_postloop()
do_q = do_EOF
do_h = cmd.Cmd.do_help
do_ls = do_list
if __name__ == '__main__':
cli = OattnUsbTest()
cli.cmdloop_intr()
SCRIPT_DIR=$(dirname "${BASH_SOURCE[0]}")
PYTHONPATH=$SCRIPT_DIR/../lib python3 -m pyoattnusb
# SPDX-FileCopyrightText: 2024 CERN (home.cern)
#
# SPDX-License-Identifier: LGPL-2.1-or-later
from ctypes import *
import os.path
class Dev(Structure):
_fields_ = [
( 'lun', c_int ),
( 'busnum', c_char * 6 ),
( 'devnum', c_char * 6 ),
( 'portnum', c_char * 6 ),
( 'busport', c_char * 16 ),
( 'devno', c_char * 10 ),
( 'serial', c_char * 48 ),
( 'path', c_char * 256 ),
( 'unused', c_int * 4 ),
]
class DevArray(Structure):
_fields_ = [
( 'ndevs', c_int ),
( 'devs', POINTER(Dev) ),
]
class Manager:
VENDOR_ID = 0x1556 # CERN USB Vendor ID
DEVICE_ID = 0x0443 # oau USB Device ID
FTDI_VENDOR_ID = 0x403 # FTDI Vendor ID
FTDI_DEVICE_ID = 0x6010 # FTDI Product ID
def __init__(self):
self.api = None
self._load_solib()
self._devices = self._get_devarray(self.VENDOR_ID, self.DEVICE_ID)
def _load_solib(self):
"""try to load attn dyn library from script dir or ../lib."""
solib = 'liboattnusb.so'
scriptdir = os.path.dirname(os.path.realpath(__file__))
rootdir = os.path.dirname(scriptdir)
paths = (solib,
os.path.join(rootdir, 'lib', solib),
os.path.join(rootdir, '..', 'lib', solib))
for p in paths:
try:
self.api = CDLL(p)
break
except OSError as e:
pass
if self.api is None:
raise RuntimeError('cannot load oattn solib, exiting')
self.api.oau_factory_program_eeprom.argtypes = [c_int, c_int, c_char_p]
def _get_devarray(self, vid, pid):
self.api.oattnusb_get_devarray.restype = POINTER(DevArray)
self.devarray = self.api.oattnusb_get_devarray(vid, pid)
ndevs = self.devarray.contents.ndevs
devs = self.devarray.contents.devs
devices = []
for i in range(ndevs):
dev = devs[i]
devices.append({
'lun' : dev.lun,
'busnum' : str(dev.busnum, 'utf8'),
'devnum' : str(dev.devnum, 'utf8'),
'portnum' : str(dev.portnum, 'utf8'),
'busport' : str(dev.busport, 'utf8'),
'devno' : str(dev.devno, 'utf8'),
'serial' : str(dev.serial, 'utf8'),
'path' : str(dev.path , 'utf8')})
return devices
def devices(self):
return self._devices
def get_device(self, serial):
return Device(self.api, serial)
def version(self):
version = c_char_p.in_dll(self.api, "liboattnusb_version_s")
return str(version.value, 'utf8')
def eeprom_program(self, bus, dev, serial):
ret = self.api.oau_factory_program_eeprom(bus, dev, bytes(str(serial), 'ascii'))
if (ret != 0):
raise RuntimeError('EEPROM programming error detected')
class Device:
attn_values = {
'-40dB' : 0,
'-20dB' : 1,
'0dB' : 3,
}
attn_labels = {
0 : '-40dB',
1 : '-20dB',
3 : '0dB',
}
def __init__(self, api, serial):
self.api = api
self.fd = self.api.oattnusb_open_by_serial(bytes(serial, 'utf8'))
if self.fd < 0:
raise RuntimeError('cannot open device {}'.format(serial))
self.nchannels = self.api.oattnusb_get_nchannels(self.fd)
if self.nchannels <= 0:
raise RuntimeError('error {} getting number of channels'.format(err))
self.serial = serial
def __del__(self):
if self.fd is not None and self.fd > 0:
err = self.api.oattnusb_close(self.fd)
if err < 0:
raise RuntimeError('could not close fd {}'.format(self.fd))
def get_attn(self, ch):
if ch < 1 or ch > self.nchannels:
raise RuntimeError('channels must be in range 1-{0}'.format(self.nchannels))
attn = self.api.oattnusb_get_relay(self.fd, ch)
if attn < 0:
raise RuntimeError('error getting attenuation for channel {}'.format(ch))
return attn
def set_attn(self, ch, attn):
if ch < 1 or ch > self.nchannels:
raise RuntimeError('channels must be in range 1-{0}'.format(self.nchannels))
if attn not in self.attn_values.values():
raise RuntimeError('invalid attenuation value: {0}'.format(attn))
err = self.api.oattnusb_set_relay(self.fd, ch, attn);
if err < 0:
raise RuntimeError('error setting attenuation {} for channel {}'.format(attn_labels[attn], ch))
#!/usr/bin/env python3
# SPDX-FileCopyrightText: 2023-2024 CERN (home.cern)
#
# SPDX-License-Identifier: LGPL-2.1-or-later
# vim: et ts=8 sw=4 sts=4 ai
import cmd
import readline
import os.path
import pyoattnusb
class OattnUsbTest(cmd.Cmd):
intro = 'Oasis USB Attenuator CLI. Type "help" or "?" to list commands\n'
prompt = 'oatt> '
def conn_checker(func):
def wrapper(self=None, *args, **kwargs):
if self.device is None:
print('no active connection')
return
return func(self, *args, **kwargs)
return wrapper
def __init__(self):
cmd.Cmd.__init__(self)
self.device = None
self.device_mgr = pyoattnusb.Manager()
self.history = os.path.expanduser('~/.oattnusb_history')
def do_list(self, arg):
"""list devices and their access methods."""
for i, dev in enumerate(self.device_mgr.devices()):
print('{}:\tl:{}\tserial:{}\tbusdev:{}/{}\tbusport:{}'.format(i,
dev['lun'], dev['serial'], dev['busnum'], dev['devnum'],
dev['busport']))
def do_open(self, arg):
"""open device by serial number."""
try:
serial = self.device_mgr.devices()[0]['serial'] if not arg else arg
except IndexError as e:
print('no devices found, cannot open')
return
try:
self.device = None # close previously opened device
self.device = self.device_mgr.get_device(serial)
print('current device: serial {}, fd {}, {} channels'.format(
self.device.serial, self.device.fd, self.device.nchannels))
except BaseException as e:
print(e)
@conn_checker
def do_close(self, arg):
"""close connection to current device."""
self.device = None
@conn_checker
def do_nchannels(self, arg):
"""number of channels of current device."""
print('{} channels'.format(self.device.nchannels))
@conn_checker
def do_get_attn(self, arg):
"""display attenuation value of each channel."""
for i in range(1, self.device.nchannels + 1):
attn = self.device.get_attn(i)
print('ch {}: attn {}'.format(i, pyoattnusb.Device.attn_labels[attn]))
@conn_checker
def do_set_attn(self, args):
"""set attenuation value of channel."""
try:
ch, attn = args.strip().split()
ch = int(ch)
attn = pyoattnusb.Device.attn_values[attn]
except ValueError as e:
print('usage: set_attn ch {-40dB|-20dB|0dB}')
print(' ch = 1..{}'.format(self.device.nchannels))
return
except KeyError as e:
print('invalid attenuation value')
print('usage: set_attn ch {-40dB|-20dB|0dB}')
print(' NB: attenuation labels MUST be literally correct')
return
try:
self.device.set_attn(ch, attn)
except BaseException as e:
print(e)
@conn_checker
def do_reset(self, args):
"""reset all channel attns to the default (-40dB)."""
for ch in range(1, self.device.nchannels + 1):
self.device.set_attn(ch, pyoattnusb.Device.attn_values['-40dB'])
def do_eeprom_program(self, args):
"""EXPERT: program the EEPROM (you will be asked to provide a serial number)."""
try:
bus, dev = [int(arg) for arg in args.split()]
except ValueError as e:
print('usage: eeprom_program bus dev')
return
except Exception as e:
raise
for device in self.device_mgr.devices():
if (bus, dev) == (int(device['busnum']), int(device['devnum'])):
break
else:
# Also check if there could be any unprogrammed devices which
# still have the default FTDI VID/PID
ftdi_devices = self.device_mgr._get_devarray(self.device_mgr.FTDI_VENDOR_ID, self.device_mgr.FTDI_DEVICE_ID)
for device in ftdi_devices:
if (bus, dev) == (int(device['busnum']), int(device['devnum'])):
break
else:
print('device bus:{} dev:{} not found'.format(bus, dev))
return
serial = input('Please provide the serial number to program: ')
print('WARNING: about to erase and program the EEPROM with new serial number [{}]'
' into dev with old serial: [{}] at busdev: {}/{}, busport: {}'.format(
serial, device['serial'],
device['busnum'], device['devnum'], device['busport']))
if input('are you sure? (yes/N) ') != 'yes':
print('ok, giving up.')
return
try:
self.device_mgr.eeprom_program(bus, dev, serial)
except BaseException as e:
print(e)
print('new serial recorded!')
print('quitting, please restart the test program')
return True
def do_version(self, arg):
"""display software version."""
print(self.device_mgr.version())
def do_EOF(self, arg):
return True
def my_preloop(self):
try:
readline.read_history_file(self.history)
except FileNotFoundError:
readline.write_history_file(self.history)
def my_postloop(self):
readline.write_history_file(self.history)
# catch Ctrl-C and drop me back into cli
def cmdloop_intr(self, intro=None):
self.my_preloop()
while True:
try:
self.cmdloop()
break
except KeyboardInterrupt as e:
self.intro = None
print('^C')
self.my_postloop()
do_q = do_EOF
do_h = cmd.Cmd.do_help
do_ls = do_list
if __name__ == '__main__':
cli = OattnUsbTest()
cli.cmdloop_intr()
# SPDX-FileCopyrightText: 2024 CERN (home.cern)
#
# SPDX-License-Identifier: LGPL-2.1-or-later
from setuptools import setup, find_packages
import subprocess
try:
version = subprocess.check_output(['git', 'describe', '--tags']).decode().strip()
except Exception:
version = '0.0.0'
setup(
name='pyoattnusb',
version=version,
packages=find_packages(),
author='Dimitris Lampridis',
author_email='dimitris.lampridis@cern.ch',
description='RF OASIS Attenuator control library',
url='https://gitlab.cern.ch/be-cem-edl/fec/hardware-modules/rf-att-4cha',
classifiers=[
'Programming Language :: Python :: 3',
'License :: OSI Approved :: GNU Lesser General Public License v2 or later (LGPLv2+)',
'Operating System :: OS Independent',
],
)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment