Commit 4d5c269a authored by Dónal Murray's avatar Dónal Murray

Simplify hevclient internals

parent 42df77bf
Pipeline #1502 failed with stages
...@@ -31,7 +31,7 @@ import time ...@@ -31,7 +31,7 @@ import time
import json import json
import threading import threading
from typing import List, Dict, Union from typing import List, Dict, Union
from CommsCommon import PayloadFormat, PAYLOAD_TYPE from CommsCommon import PAYLOAD_TYPE
import logging import logging
logging.basicConfig( logging.basicConfig(
...@@ -45,80 +45,71 @@ class HEVPacketError(Exception): ...@@ -45,80 +45,71 @@ class HEVPacketError(Exception):
class HEVClient(object): class HEVClient(object):
def __init__(self, polling=True): def __init__(self, polling=True):
super(HEVClient, self).__init__() super().__init__()
self._alarms = [] # db for alarms # alarms latch
self._personal = None # db for personal data self._alarms = []
self._fastdata = None # db for sensor values
self._readback = None # db for sensor values # Last packet of each type
self._cycle = None # db for sensor values self._last_packet = {
self._target = None # db for sensor values "PERSONAL": None,
self._logmsg = None # db for sensor values "DATA": None,
self._thresholds = None # db for sensor values "READBACK": None,
self._thresholds = [] # db for threshold settings "CYCLE": None,
self._polling = polling # keep reading data into db "TARGET": None,
self._lock = threading.Lock() # lock for the database "LOGMSG": None,
"THRESHOLDS": None,
}
# lock for the last packet storage
self._lock = threading.Lock()
# start polling in another thread unless told otherwise # start polling in another thread unless told otherwise
self._polling = polling
if self._polling: if self._polling:
self.start_polling() self.start_polling()
def start_polling(self): def start_polling(self):
"""start worker thread to update db in the background""" """start worker thread to get data in the background"""
worker = threading.Thread(target=self.start_client, daemon=True) worker = threading.Thread(target=self.start_client, daemon=True)
worker.start() worker.start()
def start_client(self) -> None: def start_client(self) -> None:
"""synchronous wrapper for polling()"""
asyncio.run(self.polling()) asyncio.run(self.polling())
async def polling(self) -> None: async def polling(self) -> None:
"""open persistent connection with server""" """open persistent connection with server"""
writer = None writer = None
data = None
while True: while True:
try: try:
reader, writer = await asyncio.open_connection("127.0.0.1", 54320) reader, writer = await asyncio.open_connection("127.0.0.1", 54320)
# grab data from the socket as soon as it is available and dump it in the db
while self._polling: while self._polling:
try: try:
# grab data from the socket as soon as it is available
data = await reader.readuntil(separator=b"\0") data = await reader.readuntil(separator=b"\0")
data = data[:-1] # snip off nullbyte # snip off nullbyte and parse json
data = data[:-1]
payload = json.loads(data.decode("utf-8")) payload = json.loads(data.decode("utf-8"))
if payload["type"] == "keepalive": payload_type = payload["type"]
# Still alive
# check that it's valid
if payload_type == "keepalive":
# drop silently - without it the hevserver can go down unnoticed
continue continue
elif payload["type"] == "DATA": elif payload_type in self._last_packet:
with self._lock: # store it in the right place and pass it on
self._fastdata = payload["DATA"]
elif payload["type"] == "READBACK":
with self._lock:
self._readback = payload["READBACK"]
elif payload["type"] == "CYCLE":
with self._lock:
self._cycle = payload["CYCLE"]
elif payload["type"] == "TARGET":
with self._lock:
self._target = payload["TARGET"]
elif payload["type"] == "PERSONAL":
with self._lock: with self._lock:
self._personal = payload["PERSONAL"] self._last_packet[payload_type] = payload[payload_type]
elif payload["type"] == "LOGMSG": elif payload_type in [pt.name for pt in PAYLOAD_TYPE]:
with self._lock: # valid payload type but unimplemented in hevclient
self._logmsg = payload["LOGMSG"] # passing lets us still use alarm information vs continue
elif payload["type"] == "THRESHOLDS":
with self._lock:
self._thresholds = payload["THRESHOLDS"]
elif payload["type"] == "ALARM":
with self._lock:
self._alarms = payload["ALARM"]
elif payload["type"] in [pt.name for pt in PAYLOAD_TYPE]:
# payload type is a valid payload type
pass pass
else: else:
# invalid packet - something seriously wrong with the hevserver
raise HEVPacketError("Invalid broadcast type") raise HEVPacketError("Invalid broadcast type")
self._alarms = payload["alarms"] self._alarms = payload["alarms"]
# self._personal = payload["personal"]
self.get_updates(payload) # callback function to be overridden self.get_updates(payload) # callback function to be overridden
except json.decoder.JSONDecodeError: except json.decoder.JSONDecodeError:
logging.warning(f"Could not decode packet: {data}") logging.warning(f"Could not decode packet: {data}")
...@@ -140,6 +131,7 @@ class HEVClient(object): ...@@ -140,6 +131,7 @@ class HEVClient(object):
def get_updates(self, payload) -> None: def get_updates(self, payload) -> None:
"""Overrideable function called after receiving data from the socket, with that data as an argument""" """Overrideable function called after receiving data from the socket, with that data as an argument"""
logging.debug(payload)
pass pass
async def _send_request( async def _send_request(
...@@ -213,39 +205,32 @@ class HEVClient(object): ...@@ -213,39 +205,32 @@ class HEVClient(object):
# def send_personal(self, personal: Dict[str, str]=None ) -> bool: # def send_personal(self, personal: Dict[str, str]=None ) -> bool:
def send_personal(self, personal: str) -> bool: def send_personal(self, personal: str) -> bool:
# acknowledge alarm to remove it from the hevserver list # send personal information
return asyncio.run(self._send_request("PERSONAL", personal=personal)) return asyncio.run(self._send_request("PERSONAL", personal=personal))
# Getters to check last value of each datatype - debugging only
def get_values(self) -> Dict: def get_values(self) -> Dict:
# get sensor values from db return self._last_packet["DATA"]
return self._fastdata
def get_readback(self) -> Dict: def get_readback(self) -> Dict:
# get readback from db return self._last_packet["READBACK"]
return self._readback
def get_cycle(self) -> Dict: def get_cycle(self) -> Dict:
# get cycle data from db return self._last_packet["CYCLE"]
return self._cycle
def get_personal(self) -> Dict: def get_personal(self) -> Dict:
# get personal data from db return self._last_packet["PERSONAL"]
return self._personal
def get_logmsg(self) -> Dict: def get_logmsg(self) -> Dict:
# get logmsg data from db return self._last_packet["LOGMSG"]
return self._logmsg
def get_target(self) -> Dict: def get_target(self) -> Dict:
# get target data from db return self._last_packet["TARGET"]
return self._target
def get_thresholds(self) -> Dict: def get_thresholds(self) -> Dict:
# get threshold data from db return self._last_packet["THRESHOLDS"]
return self._thresholds
def get_alarms(self) -> List[str]: def get_alarms(self) -> List[str]:
# get alarms from db
return self._alarms return self._alarms
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment