Newer
Older
Dónal Murray
committed
#!/usr/bin/env python3
# client side for the data server to manage comms between UIs and LLI
#
# Author: Dónal Murray <donal.murray@cern.ch>
import asyncio
import time
import json
import threading
Dónal Murray
committed
from typing import List, Dict
Dónal Murray
committed
import logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
Dónal Murray
committed
polling = True
setflag = False
class HEVClient(object):
def __init__(self):
Dónal Murray
committed
self._alarms = [] # db for alarms
Dónal Murray
committed
self._values = None # db for sensor values
self._thresholds = [] # db for threshold settings
Dónal Murray
committed
self._polling = True # keep reading data into db
self._lock = threading.Lock() # lock for the database
Dónal Murray
committed
Dónal Murray
committed
# start worker thread to update db in the background
worker = threading.Thread(target=self.start_client, daemon=True)
Dónal Murray
committed
worker.start()
async def polling(self) -> None:
Dónal Murray
committed
# open persistent connection with server
Dónal Murray
committed
reader, writer = await asyncio.open_connection("127.0.0.1", 54320)
Dónal Murray
committed
Dónal Murray
committed
# grab data from the socket as soon as it is available and dump it in the db
Dónal Murray
committed
while self._polling:
Dónal Murray
committed
data = await reader.read(600)
try:
payload = json.loads(data.decode("utf-8"))
except json.decoder.JSONDecodeError:
logging.warning(f"Could not decode packet: {data}")
with self._lock:
self._values = payload["sensors"]
self._alarms = payload["alarms"]
Dónal Murray
committed
Dónal Murray
committed
# close connection
Dónal Murray
committed
writer.close()
await writer.wait_closed()
def start_client(self) -> None:
asyncio.run(self.polling())
async def send_request(self, reqtype, cmdtype:str=None, cmd: str=None, param: str=None, alarm: str=None) -> bool:
Dónal Murray
committed
# open connection and send packet
Dónal Murray
committed
reader, writer = await asyncio.open_connection("127.0.0.1", 54321)
Dónal Murray
committed
if reqtype == "cmd":
payload = {
"type": "cmd",
"cmdtype": cmdtype,
Dónal Murray
committed
"cmd": cmd,
"param": param
}
elif reqtype == "alarm":
payload = {
"type": "alarm",
"ack": alarm
}
logging.info(payload)
packet = json.dumps(payload).encode()
writer.write(packet)
Dónal Murray
committed
await writer.drain()
Dónal Murray
committed
# wait for acknowledge
Dónal Murray
committed
data = await reader.read(300)
Dónal Murray
committed
try:
data = json.loads(data.decode("utf-8"))
except json.decoder.JSONDecodeError:
logging.warning(f"Could not decode packet: {data}")
Dónal Murray
committed
Dónal Murray
committed
# close connection to free up socket
Dónal Murray
committed
writer.close()
await writer.wait_closed()
Dónal Murray
committed
# check that acknowledge is meaningful
if data["type"] == "ack":
Dónal Murray
committed
logging.info(f"Request type {reqtype} sent successfully")
Dónal Murray
committed
return True
else:
Dónal Murray
committed
logging.warning(f"Request type {reqtype} failed")
Dónal Murray
committed
return False
def send_cmd(self, cmdtype:str, cmd: str, param: str=None) -> bool:
# send a cmd and wait to see if it's valid
return asyncio.run(self.send_request("cmd", cmdtype=cmdtype, cmd=cmd, param=param))
Dónal Murray
committed
def ack_alarm(self, alarm: str) -> bool:
# acknowledge alarm to remove it from the hevserver list
return asyncio.run(self.send_request("alarm", alarm=alarm))
Dónal Murray
committed
def get_values(self) -> Dict:
Dónal Murray
committed
# get sensor values from db
return self._values
def get_alarms(self) -> List[str]:
Dónal Murray
committed
# get alarms from db
return self._alarms
Dónal Murray
committed
if __name__ == "__main__":
# example implementation
Dónal Murray
committed
# just import hevclient and do something like the following
Dónal Murray
committed
hevclient = HEVClient()
Dónal Murray
committed
time.sleep(2)
print(hevclient.send_cmd("GENERAL", "START"))
Dónal Murray
committed
# Play with sensor values and alarms
Dónal Murray
committed
for i in range(20):
Dónal Murray
committed
values = hevclient.get_values() # returns a dict or None
alarms = hevclient.get_alarms() # returns a list of alarms currently ongoing
if values is None:
i = i+1 if i > 0 else 0
else:
print(f"Values: {json.dumps(values, indent=4)}")
print(f"Alarms: {alarms}")
time.sleep(1)
Dónal Murray
committed
# acknowledge the oldest alarm
try:
hevclient.ack_alarm(alarms[0]) # blindly assume we have one after 40s
except:
logging.info("No alarms received")
time.sleep(1)
print(f"Alarms: {hevclient.get_alarms()}")
Dónal Murray
committed
# send commands:
time.sleep(1)
print("This one will fail since foo is not in the CMD_GENERAL enum:")
print(hevclient.send_cmd("GENERAL", "foo"))
Dónal Murray
committed
# print some more values
for i in range(10):
Dónal Murray
committed
print(f"Values: {json.dumps(hevclient.get_values(), indent=4)}")
Dónal Murray
committed
print(f"Alarms: {hevclient.get_alarms()}")
time.sleep(1)
print(hevclient.send_cmd("GENERAL", "STOP"))