Skip to content
Snippets Groups Projects
Commit 915c3d3b authored by Dónal Murray's avatar Dónal Murray
Browse files

Add front end generator to read specific stripped-down file format to match test setup

parent b4f71b13
Branches
No related merge requests found
#!/usr/bin/env python3
# simple txt filereader frontend for HEVserver
# currently dedicated to very specific file format matching victor's sample.txt
# author Dónal Murray <donal.murray@cern.ch>
import threading
import commsConstants
import time
import numpy as np
from typing import List
from collections import deque
import logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
class hevfromtxt():
def __init__(self, inputFile):
# use input file for testing
h = np.loadtxt('share/sample.txt',skiprows = 1, delimiter = ',')
self._pressure = h[:,1].tolist()
self._flow = h[:,2].tolist()
self._volume = h[:,3].tolist()
self._length = len(self._pressure)
self._pos = 0 # position within sample
# received queue and observers to be notified on update
self._payloadrecv = deque(maxlen = 16)
self._observers = []
sendingWorker = threading.Thread(target=self.generate, daemon=True)
sendingWorker.start()
def generate(self) -> None:
while True:
# grab next array from filedump
self._pos = self._pos + 1 if self._pos + 1 < self._length else 0
payload = commsConstants.dataFormat()
# directly setting private member variables in this edge case
payload._version = payload._RPI_VERSION
payload._pressure_buffer = self._pressure[self._pos]
payload._pressure_inhale = self._volume[self._pos]
payload._temperature_buffer = self._flow[self._pos]
self.payloadrecv = payload
time.sleep(1)
# callback to dependants to read the received payload
@property
def payloadrecv(self):
return self._payloadrecv
@payloadrecv.setter
def payloadrecv(self, payload):
self._payloadrecv.append(payload)
logging.debug(f"Pushed {payload} to FIFO")
for callback in self._observers:
# peek at the leftmost item, don't pop until receipt confirmed
callback(self._payloadrecv[0])
def writePayload(self, payload):
logging.info(payload)
def bind_to(self, callback):
self._observers.append(callback)
def pop_payloadrecv(self):
# from callback. confirmed receipt, pop value
poppedval = self._payloadrecv.popleft()
logging.debug(f"Popped {poppedval} from FIFO")
if len(self._payloadrecv) > 0:
# purge full queue if Dependant goes down when it comes back up
for callback in self._observers:
callback(self._payloadrecv[0])
\ No newline at end of file
......@@ -9,13 +9,14 @@ import time
import threading
import argparse
import svpi
import hevfromtxt
import commsControl
from commsConstants import payloadType, command_codes, alarm_codes, commandFormat
from collections import deque
from serial.tools import list_ports
from typing import List
import logging
logging.basicConfig(level=logging.INFO,
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s')
class HEVPacketError(Exception):
......@@ -201,8 +202,14 @@ if __name__ == "__main__":
# check if input file was specified
if args.inputFile != '':
# initialise frond end generator from file
lli = svpi.svpi(args.inputFile)
if args.inputFile[-1-3:] == '.txt':
# assume sample.txt format
lli = hevfromtxt.hevfromtxt(args.inputFile)
else:
# assume hex dump
lli = svpi.svpi(args.inputFile)
logging.info(f"Serving data from {args.inputFile}")
else:
# get arduino serial port
for port in list_ports.comports():
......@@ -218,8 +225,9 @@ if __name__ == "__main__":
# initialise low level interface
try:
lli = commsControl.commsControl(port=port_device)
logging.info(f"Serving data from device {port_device}")
except NameError:
print("Arduino not connected")
logging.error("Arduino not connected")
exit(1)
hevsrv = HEVServer(lli)
......
This diff is collapsed.
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment