Commit 33768135 authored by Projects's avatar Projects

Moved Python code to a separate directory

parent 3c9a71c6
#!/usr/bin/env python3
# SPDX-FileCopyrightText: 2023-2024 CERN (home.cern)
#
# SPDX-License-Identifier: LGPL-2.1-or-later
# vim: et ts=8 sw=4 sts=4 ai
import cmd
import readline
import os.path
import pyoattnusb
class OattnUsbTest(cmd.Cmd):
intro = 'Oasis USB Attenuator CLI. Type "help" or "?" to list commands\n'
prompt = 'oatt> '
def conn_checker(func):
def wrapper(self=None, *args, **kwargs):
if self.device is None:
print('no active connection')
return
return func(self, *args, **kwargs)
return wrapper
def __init__(self):
cmd.Cmd.__init__(self)
self.device = None
self.device_mgr = pyoattnusb.Manager()
self.history = os.path.expanduser('~/.oattnusb_history')
def do_list(self, arg):
"""list devices and their access methods."""
for i, dev in enumerate(self.device_mgr.devices()):
print('{}:\tl:{}\tserial:{}\tbusdev:{}/{}\tbusport:{}'.format(i,
dev['lun'], dev['serial'], dev['busnum'], dev['devnum'],
dev['busport']))
def do_open(self, arg):
"""open device by serial number."""
try:
serial = self.device_mgr.devices()[0]['serial'] if not arg else arg
except IndexError as e:
print('no devices found, cannot open')
return
try:
self.device = None # close previously opened device
self.device = self.device_mgr.get_device(serial)
print('current device: serial {}, fd {}, {} channels'.format(
self.device.serial, self.device.fd, self.device.nchannels))
except BaseException as e:
print(e)
@conn_checker
def do_close(self, arg):
"""close connection to current device."""
self.device = None
@conn_checker
def do_nchannels(self, arg):
"""number of channels of current device."""
print('{} channels'.format(self.device.nchannels))
@conn_checker
def do_get_attn(self, arg):
"""display attenuation value of each channel."""
for i in range(1, self.device.nchannels + 1):
attn = self.device.get_attn(i)
print('ch {}: attn {}'.format(i, pyoattnusb.Device.attn_labels[attn]))
@conn_checker
def do_set_attn(self, args):
"""set attenuation value of channel."""
try:
ch, attn = args.strip().split()
ch = int(ch)
attn = pyoattnusb.Device.attn_values[attn]
except ValueError as e:
print('usage: set_attn ch {-40dB|-20dB|0dB}')
print(' ch = 1..{}'.format(self.device.nchannels))
return
except KeyError as e:
print('invalid attenuation value')
print('usage: set_attn ch {-40dB|-20dB|0dB}')
print(' NB: attenuation labels MUST be literally correct')
return
try:
self.device.set_attn(ch, attn)
except BaseException as e:
print(e)
@conn_checker
def do_reset(self, args):
"""reset all channel attns to the default (-40dB)."""
for ch in range(1, self.device.nchannels + 1):
self.device.set_attn(ch, pyoattnusb.Device.attn_values['-40dB'])
def do_eeprom_program(self, args):
"""EXPERT: program the EEPROM (you will be asked to provide a serial number)."""
try:
bus, dev = [int(arg) for arg in args.split()]
except ValueError as e:
print('usage: eeprom_program bus dev')
return
except Exception as e:
raise
for device in self.device_mgr.devices():
if (bus, dev) == (int(device['busnum']), int(device['devnum'])):
break
else:
# Also check if there could be any unprogrammed devices which
# still have the default FTDI VID/PID
ftdi_devices = self.device_mgr._get_devarray(self.device_mgr.FTDI_VENDOR_ID, self.device_mgr.FTDI_DEVICE_ID)
for device in ftdi_devices:
if (bus, dev) == (int(device['busnum']), int(device['devnum'])):
break
else:
print('device bus:{} dev:{} not found'.format(bus, dev))
return
serial = input('Please provide the serial number to program: ')
print('WARNING: about to erase and program the EEPROM with new serial number [{}]'
' into dev with old serial: [{}] at busdev: {}/{}, busport: {}'.format(
serial, device['serial'],
device['busnum'], device['devnum'], device['busport']))
if input('are you sure? (yes/N) ') != 'yes':
print('ok, giving up.')
return
try:
self.device_mgr.eeprom_program(bus, dev, serial)
except BaseException as e:
print(e)
print('new serial recorded!')
print('quitting, please restart the test program')
return True
def do_version(self, arg):
"""display software version."""
print(self.device_mgr.version())
def do_EOF(self, arg):
return True
def my_preloop(self):
try:
readline.read_history_file(self.history)
except FileNotFoundError:
readline.write_history_file(self.history)
def my_postloop(self):
readline.write_history_file(self.history)
# catch Ctrl-C and drop me back into cli
def cmdloop_intr(self, intro=None):
self.my_preloop()
while True:
try:
self.cmdloop()
break
except KeyboardInterrupt as e:
self.intro = None
print('^C')
self.my_postloop()
do_q = do_EOF
do_h = cmd.Cmd.do_help
do_ls = do_list
if __name__ == '__main__':
cli = OattnUsbTest()
cli.cmdloop_intr()
#!/bin/sh
python3 -m pyoattnusb
#!/usr/bin/env python3
# SPDX-FileCopyrightText: 2023-2024 CERN (home.cern)
#
# SPDX-License-Identifier: LGPL-2.1-or-later
# vim: et ts=8 sw=4 sts=4 ai
import cmd
import readline
import os.path
import pyoattnusb
class OattnUsbTest(cmd.Cmd):
intro = 'Oasis USB Attenuator CLI. Type "help" or "?" to list commands\n'
prompt = 'oatt> '
def conn_checker(func):
def wrapper(self=None, *args, **kwargs):
if self.device is None:
print('no active connection')
return
return func(self, *args, **kwargs)
return wrapper
def __init__(self):
cmd.Cmd.__init__(self)
self.device = None
self.device_mgr = pyoattnusb.Manager()
self.history = os.path.expanduser('~/.oattnusb_history')
def do_list(self, arg):
"""list devices and their access methods."""
for i, dev in enumerate(self.device_mgr.devices()):
print('{}:\tl:{}\tserial:{}\tbusdev:{}/{}\tbusport:{}'.format(i,
dev['lun'], dev['serial'], dev['busnum'], dev['devnum'],
dev['busport']))
def do_open(self, arg):
"""open device by serial number."""
try:
serial = self.device_mgr.devices()[0]['serial'] if not arg else arg
except IndexError as e:
print('no devices found, cannot open')
return
try:
self.device = None # close previously opened device
self.device = self.device_mgr.get_device(serial)
print('current device: serial {}, fd {}, {} channels'.format(
self.device.serial, self.device.fd, self.device.nchannels))
except BaseException as e:
print(e)
@conn_checker
def do_close(self, arg):
"""close connection to current device."""
self.device = None
@conn_checker
def do_nchannels(self, arg):
"""number of channels of current device."""
print('{} channels'.format(self.device.nchannels))
@conn_checker
def do_get_attn(self, arg):
"""display attenuation value of each channel."""
for i in range(1, self.device.nchannels + 1):
attn = self.device.get_attn(i)
print('ch {}: attn {}'.format(i, pyoattnusb.Device.attn_labels[attn]))
@conn_checker
def do_set_attn(self, args):
"""set attenuation value of channel."""
try:
ch, attn = args.strip().split()
ch = int(ch)
attn = pyoattnusb.Device.attn_values[attn]
except ValueError as e:
print('usage: set_attn ch {-40dB|-20dB|0dB}')
print(' ch = 1..{}'.format(self.device.nchannels))
return
except KeyError as e:
print('invalid attenuation value')
print('usage: set_attn ch {-40dB|-20dB|0dB}')
print(' NB: attenuation labels MUST be literally correct')
return
try:
self.device.set_attn(ch, attn)
except BaseException as e:
print(e)
@conn_checker
def do_reset(self, args):
"""reset all channel attns to the default (-40dB)."""
for ch in range(1, self.device.nchannels + 1):
self.device.set_attn(ch, pyoattnusb.Device.attn_values['-40dB'])
def do_eeprom_program(self, args):
"""EXPERT: program the EEPROM (you will be asked to provide a serial number)."""
try:
bus, dev = [int(arg) for arg in args.split()]
except ValueError as e:
print('usage: eeprom_program bus dev')
return
except Exception as e:
raise
for device in self.device_mgr.devices():
if (bus, dev) == (int(device['busnum']), int(device['devnum'])):
break
else:
# Also check if there could be any unprogrammed devices which
# still have the default FTDI VID/PID
ftdi_devices = self.device_mgr._get_devarray(self.device_mgr.FTDI_VENDOR_ID, self.device_mgr.FTDI_DEVICE_ID)
for device in ftdi_devices:
if (bus, dev) == (int(device['busnum']), int(device['devnum'])):
break
else:
print('device bus:{} dev:{} not found'.format(bus, dev))
return
serial = input('Please provide the serial number to program: ')
print('WARNING: about to erase and program the EEPROM with new serial number [{}]'
' into dev with old serial: [{}] at busdev: {}/{}, busport: {}'.format(
serial, device['serial'],
device['busnum'], device['devnum'], device['busport']))
if input('are you sure? (yes/N) ') != 'yes':
print('ok, giving up.')
return
try:
self.device_mgr.eeprom_program(bus, dev, serial)
except BaseException as e:
print(e)
print('new serial recorded!')
print('quitting, please restart the test program')
return True
def do_version(self, arg):
"""display software version."""
print(self.device_mgr.version())
def do_EOF(self, arg):
return True
def my_preloop(self):
try:
readline.read_history_file(self.history)
except FileNotFoundError:
readline.write_history_file(self.history)
def my_postloop(self):
readline.write_history_file(self.history)
# catch Ctrl-C and drop me back into cli
def cmdloop_intr(self, intro=None):
self.my_preloop()
while True:
try:
self.cmdloop()
break
except KeyboardInterrupt as e:
self.intro = None
print('^C')
self.my_postloop()
do_q = do_EOF
do_h = cmd.Cmd.do_help
do_ls = do_list
if __name__ == '__main__':
cli = OattnUsbTest()
cli.cmdloop_intr()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment