From 8898bf87abd74b3fefa7e40727e593ded335214e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=B3nal=20Murray?= <donal.murray@cern.ch> Date: Sat, 11 Apr 2020 21:25:23 +0100 Subject: [PATCH] Reading from a looped dump byte array file now works again. Add latched alarm handling --- raspberry-dataserver/commsConstants.py | 37 ++++++- raspberry-dataserver/commsControl.py | 2 +- raspberry-dataserver/hevclient.py | 29 +++--- raspberry-dataserver/hevserver.py | 89 ++++++++++------ raspberry-dataserver/svpi.py | 136 +++++++++++++++++-------- 5 files changed, 201 insertions(+), 92 deletions(-) diff --git a/raspberry-dataserver/commsConstants.py b/raspberry-dataserver/commsConstants.py index 3a69ca2d..0207a097 100644 --- a/raspberry-dataserver/commsConstants.py +++ b/raspberry-dataserver/commsConstants.py @@ -1,5 +1,8 @@ from struct import Struct from enum import Enum, auto +import logging +logging.basicConfig(level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s') class payloadType(Enum): payloadAlarm = auto() @@ -104,6 +107,8 @@ class dataFormat(BaseFormat): # for receiving dataFormat from microcontroller # fill the struct from a byteArray, def fromByteArray(self, byteArray): + logging.debug(f"Byte array of size {len(byteArray)}") + logging.debug(f"Byte array of size {byteArray}") self._byteArray = byteArray (self._version, self._fsm_state, @@ -184,8 +189,15 @@ class commandFormat(BaseFormat): self._type = payloadType.payloadCmd self._version = 0 - self._cmdCode = 0 + self._cmdCode = 0 self._param = 0 + + def __repr__(self): + return f"""{{ + "version" : {self._version}, + "cmdCode" : {self._cmdCode}, + "param" : {self._param} +}}""" def fromByteArray(self, byteArray): self._byteArray = byteArray @@ -200,6 +212,14 @@ class commandFormat(BaseFormat): self._cmdCode, self._param ) + + def getDict(self): + data = { + "version" : self._version, + "cmdCode" : self._cmdCode, + "param" : self._param + } + return data class command_codes(Enum): CMD_START = 1 @@ -218,6 +238,13 @@ class alarmFormat(BaseFormat): self._version = 0 self._alarmCode = 0 self._param = 0 + + def __repr__(self): + return f"""{{ + "version" : {self._version}, + "alarmCode" : {self._alarmCode}, + "param" : {self._param} +}}""" def fromByteArray(self, byteArray): self._byteArray = byteArray @@ -231,6 +258,14 @@ class alarmFormat(BaseFormat): self._alarmCode, self._param ) + + def getDict(self): + data = { + "version" : self._version, + "alarmCode" : self._alarmCode, + "param" : self._param + } + return data class alarm_codes(Enum): ALARM_START = 1 diff --git a/raspberry-dataserver/commsControl.py b/raspberry-dataserver/commsControl.py index 291a7ea5..4ba141f2 100644 --- a/raspberry-dataserver/commsControl.py +++ b/raspberry-dataserver/commsControl.py @@ -15,7 +15,7 @@ from collections import deque import binascii import logging -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') # communication class that governs talking between devices diff --git a/raspberry-dataserver/hevclient.py b/raspberry-dataserver/hevclient.py index 4a978440..e13d744f 100755 --- a/raspberry-dataserver/hevclient.py +++ b/raspberry-dataserver/hevclient.py @@ -7,7 +7,7 @@ import asyncio import time import json import threading -from typing import List +from typing import List, Dict import logging logging.basicConfig(level=logging.INFO, format='hevclient %(asctime)s - %(levelname)s - %(message)s') @@ -38,12 +38,11 @@ class HEVClient(object): # grab data from the socket as soon as it is available and dump it in the db while self._polling: data = await reader.read(500) - data = data.decode("utf-8").replace("'",'"') + data = data.decode("utf-8") data = json.loads(data) with self._lock: - self._values = data - #self._alarms = data["alarms"] - #self._thresholds = data["thresholds"] + self._values = data["sensors"] + self._alarms = data["alarms"] # close connection writer.close() @@ -98,7 +97,7 @@ class HEVClient(object): # set a mode and thresholds return asyncio.run(self.send_request("setup", mode=mode, thresholds=thresholds)) - def get_values(self) -> List[float]: + def get_values(self) -> Dict: # get sensor values from db return self._values @@ -114,15 +113,16 @@ if __name__ == "__main__": # Play with sensor values and alarms for i in range(30): - values = hevclient.get_values() - if values is None and i > 0: - i -= 1 - time.sleep(1) - continue - print(f"{values!r}") + values = hevclient.get_values() # returns a dict or None + alarms = hevclient.get_alarms() # returns a list of alarms currently ongoing + if values is None: + i = i+1 if i > 0 else 0 + else: + print(f"Values: {json.dumps(values, indent=4)}") + print(f"Alarms: {alarms}") time.sleep(1) - # set modes and thresholds + # set modes and thresholds (this will change) print(hevclient.set_mode("CPAP")) print(hevclient.set_thresholds([12.3, 45.6, 78.9])) print(hevclient.setup("CPAP", [12.3, 45.6, 78.9])) @@ -131,6 +131,7 @@ if __name__ == "__main__": # print some more values for i in range(10): - print(f"Sensor values: {hevclient.get_values()}") + print(f"Alarms: {hevclient.get_alarms()}") + print(f"Values: {json.dumps(hevclient.get_values(), indent=4)}") print(f"Alarms: {hevclient.get_alarms()}") time.sleep(1) diff --git a/raspberry-dataserver/hevserver.py b/raspberry-dataserver/hevserver.py index ac3845cf..e325cf40 100755 --- a/raspberry-dataserver/hevserver.py +++ b/raspberry-dataserver/hevserver.py @@ -10,6 +10,8 @@ import threading import argparse import svpi import commsControl +from commsConstants import payloadType +from collections import deque from serial.tools import list_ports from typing import List import logging @@ -19,11 +21,9 @@ logging.basicConfig(level=logging.DEBUG, class HEVServer(object): def __init__(self, lli): - self._alarms = '' + self._alarms = [] self._values = None - self._thresholds = [] self._dblock = threading.Lock() # make db threadsafe - self._generator = svpi.svpi() self._lli = lli self._lli.bind_to(self.polling) @@ -37,19 +37,43 @@ class HEVServer(object): def __repr__(self): with self._dblock: - return f"Alarms: {self._alarms}.\nSensor values: {self._values}\nSettings: {self._thresholds}" + return f"Alarms: {self._alarms}.\nSensor values: {self._values}" - def set_input_file(self, input_file): - self._generator.addInputFile(input_file) - def polling(self, payload): # get values when we get a callback from commsControl (lli) logging.debug(f"Payload received: {payload!r}") - # TODO check what type of broadcast it is - with self._dblock: - self._values = payload.getDict() + # check if it is data or alarm + payload_type = payload.getType() + if payload_type == payloadType.payloadData: + # pass data to db + with self._dblock: + self._values = payload.getDict() + elif payload_type == payloadType.payloadAlarm: + alarm_map = { + 0: "manual", + 1: "gas supply", + 2: "apnea", + 3: "expired minute volume", + 4: "upper pressure limit", + 5: "power failure", + } + new_alarm = payload.getDict() + param = new_alarm["param"] + if new_alarm["alarmCode"] == 2: + # alarm stop, delete from list + with self._dblock: + self._alarms.remove(alarm_map[param]) + + elif new_alarm["alarmCode"] == 1: + # alarm start, add to list + with self._dblock: + self._alarms.append(alarm_map[param]) + + # let broadcast thread know there is data to send with self._dvlock: self._datavalid.set() + + # pop from lli queue self._lli.pop_payloadrecv() async def handle_request(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None: @@ -66,7 +90,7 @@ class HEVServer(object): logging.debug(f"{addr!r} requested to change to mode {mode!r}") # send via protocol and prepare reply - if self._generator.setMode(mode): + if self._lli.setMode(mode): packet = f"""{{"type": "ackmode", "mode": \"{mode}\"}}""".encode() else: packet = f"""{{"type": "nack"}}""".encode() @@ -76,7 +100,7 @@ class HEVServer(object): f"{addr!r} requested to set thresholds to {thresholds!r}") # send via protocol - payload = self._generator.setThresholds(thresholds) + payload = self._lli.setThresholds(thresholds) # prepare reply packet = f"""{{"type": "ackthresholds", "thresholds": \"{payload}\"}}""".encode() elif request["type"] == "setup": @@ -87,8 +111,8 @@ class HEVServer(object): f"{addr!r} requested to set thresholds to {thresholds!r}") # send via protocol and prepare reply - if self._generator.setMode(mode): - self._generator.setThresholds(thresholds) + if self._lli.setMode(mode): + self._lli.setThresholds(thresholds) packet = f"""{{"type": "ack", "mode": \"{mode}\", "thresholds": \"{thresholds}\"}}""".encode() else: packet = f"""{{"type": "nack"}}""".encode() @@ -111,15 +135,17 @@ class HEVServer(object): continue # take lock of db and prepare packet with self._dblock: - sensors: List[float] = self._values - alarms: str = self._alarms - thresholds: List[float] = self._thresholds - #broadcast_packet = f"""{{ "type": "broadcast", "sensors": {sensors}, "alarms": "{alarms}", "thresholds": {thresholds}}}""" - broadcast_packet = sensors - logging.info(f"Send: {broadcast_packet}") + values: List[float] = self._values + alarms = self._alarms if len(self._alarms) > 0 else None + + broadcast_packet = {} + broadcast_packet["sensors"] = values + broadcast_packet["alarms"] = alarms # add alarms key/value pair + + logging.info(f"Send: {json.dumps(broadcast_packet,indent=4)}") try: - writer.write(f"{broadcast_packet}".encode()) + writer.write(json.dumps(broadcast_packet).encode()) await writer.drain() except (ConnectionResetError, BrokenPipeError): # Connection lost, stop trying to broadcast and free up socket @@ -173,7 +199,6 @@ if __name__ == "__main__": #parser to allow us to pass arguments to hevserver parser = argparse.ArgumentParser(description='Arguments to run hevserver') parser.add_argument('--inputFile', type=str, default = '', help='a test file to load data') - parser.add_argument('--randGen', action='store_true', help='use a front end emulator which generates random values') args = parser.parse_args() # get arduino serial port @@ -181,18 +206,20 @@ if __name__ == "__main__": if "ARDUINO" in port.manufacturer.upper(): port = port.device - # initialise low level interface and dataserver - try: - lli = commsControl.commsControl(port=port) - except NameError: - print("Arduino not connected") - exit(1) + # check if input file was specified + if args.inputFile != '': + # initialise frond end generator from file + lli = svpi.svpi(args.inputFile) + else: + # initialise low level interface + try: + lli = commsControl.commsControl(port=port) + except NameError: + print("Arduino not connected") + exit(1) hevsrv = HEVServer(lli) - # check if input file was specified - if args.inputFile != '': - hevsrv.set_input_file(args.inputFile) # serve forever while True: pass diff --git a/raspberry-dataserver/svpi.py b/raspberry-dataserver/svpi.py index cc8c4e53..41d48d95 100755 --- a/raspberry-dataserver/svpi.py +++ b/raspberry-dataserver/svpi.py @@ -5,60 +5,78 @@ import time import numpy as np -from typing import List +import argparse +from collections import deque +import commsFormat +import threading +import commsConstants +from typing import List, Dict import logging logging.basicConfig(level=logging.INFO, - format='svpi %(asctime)s - %(levelname)s - %(message)s') + format='%(asctime)s - %(levelname)s - %(message)s') -class svpi: - def __init__(self): +class svpi(): + def __init__(self, inputFile): # use input file for testing - self.input = None - # store current line in the file so we know when to stop - self.cur_line = None - # flag to check if we can continue to generate numbers - self.run = True + self._input = open(inputFile, 'rb') + # dump file to variable + self._bytestore = bytearray(self._input.read()) + self._pos = 0 # position inside bytestore + self._currentAlarm = None + # received queue and observers to be notified on update + self._payloadrecv = deque(maxlen = 16) + self._observers = [] + sendingWorker = threading.Thread(target=self.generate, daemon=True) + sendingWorker.start() - def getValues(self) -> List[float]: - # check for file - if (self.input): - sensor_value: List[float] = [ float(v) for v in self.cur_line.replace('\n','').split(',') ] - # grab next line, if it's empty we stop the run - self.cur_line = self.input.readline() - if (self.cur_line == ''): - self.input.close() - self.run = False - else: - # All sensor readings 32 bit floats - sensor_value: List[float] = np.random.uniform(0.0, 1000.0, 6).tolist() - return sensor_value + def generate(self) -> None: + while True: + # check for an alarm + alarm = self.getAlarms() + if alarm is not None: + byteArray = alarm + payload = commsConstants.alarmFormat() + else: + # grab next array from filedump + fullArray = self._bytestore[0+self._pos*27:27+self._pos*27] + # current byte dump (20200411) has wrong format 27 bytes, new format expects 26. snip out second byte + byteArray = fullArray[:1] + fullArray[2:] + # go to next byte array. if at the end, loop + self._pos = self._pos + 1 if self._pos < 99 else 0 + payload = commsConstants.dataFormat() + + payload.fromByteArray(byteArray) + self.payloadrecv = payload + time.sleep(1) - def addInputFile(self, fileName): - #open file and read first line - self.input = open(fileName, 'r') - self.cur_line = self.input.readline() - def getAlarms(self) -> List[str]: - # give a random alarm a twentieth of the time - alarms = { - 0: "manual", - 1: "gas supply", - 2: "apnea", - 3: "expired minute volume", - 4: "upper pressure limit", - 5: "power failure", - } + # give/cancel a random alarm a twentieth of the time + #alarms = { + # 0: "manual", + # 1: "gas supply", + # 2: "apnea", + # 3: "expired minute volume", + # 4: "upper pressure limit", + # 5: "power failure", + #} if np.random.randint(0, 20) == 0: - return [alarms[np.random.randint(0, 6)]] - return ["none"] - + if self._currentAlarm is None: + # send alarm + alarm = np.random.randint(0, 6) + self._currentAlarm = alarm + return bytearray((0xA0,0x01,0x00,0x00,0x00,alarm)) + else: + # stop previous alarm + alarm = self._currentAlarm + self._currentAlarm = None + return bytearray((0xA0,0x02,0x00,0x00,0x00,alarm)) + return None def getThresholds(self) -> List[float]: # All thresholds 32 bit floats thresholds: List[float] = np.random.uniform(0.0, 1000.0, 3).tolist() return thresholds - def setMode(self, mode: str) -> bool: # setting a mode - just print it if mode in ("PRVC", "SIMV-PC", "CPAP"): @@ -68,15 +86,43 @@ class svpi: logging.error(f"Requested mode {mode} does not exist") return False - def setThresholds(self, thresholds: List[float]) -> str: # setting thresholds - just print them logging.info(f"Setting thresholds {thresholds}") return thresholds + # callback to dependants to read the received payload + @property + def payloadrecv(self): + return self._payloadrecv + + @payloadrecv.setter + def payloadrecv(self, payload): + self._payloadrecv.append(payload) + logging.debug(f"Pushed {payload} to FIFO") + for callback in self._observers: + # peek at the leftmost item, don't pop until receipt confirmed + callback(self._payloadrecv[0]) + + def bind_to(self, callback): + self._observers.append(callback) + + def pop_payloadrecv(self): + # from callback. confirmed receipt, pop value + poppedval = self._payloadrecv.popleft() + logging.debug(f"Popped {poppedval} from FIFO") + if len(self._payloadrecv) > 0: + # purge full queue if Dependant goes down when it comes back up + for callback in self._observers: + callback(self._payloadrecv[0]) + if __name__ == "__main__": - generator = svpi() - while generator.run: - print(generator.getValues()) - time.sleep(0.5) + #parser to allow us to pass arguments to hevserver + parser = argparse.ArgumentParser(description='Arguments to run hevserver') + parser.add_argument('--inputFile', type=str, default = '', help='a test file to load data') + args = parser.parse_args() + + generator = svpi(args.inputFile) + while True: + pass -- GitLab