Commit e61db898 authored by Dónal Murray's avatar Dónal Murray

Add skeleton files to develop pyQT-based native UI

parent f1677204
Pipeline #712 failed
../raspberry-dataserver/CommsCommon.py
\ No newline at end of file
#!/usr/bin/env python3
import logging
import argparse
import sys
import numpy as np
from PySide2.QtCore import Slot
from PySide2.QtWidgets import QMainWindow, QApplication, QHBoxLayout, QVBoxLayout
from hevclient import HEVClient
from hev_main import MainView
logging.basicConfig(level=logging.WARNING, format='%(asctime)s - %(levelname)s - %(message)s')
class NativeUI(HEVClient, QMainWindow):
"""Main application with client logic"""
def __init__(self, *args, **kwargs):
super(NativeUI, self).__init__(*args, **kwargs)
self.setWindowTitle("HEV NativeUI")
self.main_view = MainView()
self.setCentralWidget(self.main_view)
self.statusBar().showMessage("Waiting for data")
self.data = {}
self.target = {}
self.readback = {}
self.cycle = {}
self.battery = {}
self.plots = np.zeros((500, 4))
self.plots[:, 0] = np.arange(500) # fill timestamp with 0-499
def start_client(self):
"""runs in other thread - works as long as super goes last and nothing
else is blocking. If something more than a one-shot process is needed
then use async
"""
# call for all the targets and personal details
# when starting the web app so we always have some in the db
self.send_cmd("GET_TARGETS", "PC_AC")
self.send_cmd("GET_TARGETS", "PC_AC_PRVC")
self.send_cmd("GET_TARGETS", "PC_PSV")
self.send_cmd("GET_TARGETS", "TEST")
self.send_cmd("GENERAL", "GET_PERSONAL")
super().start_client()
def get_updates(self, payload):
"""callback from the polling function, payload is data from socket """
# Store data in dictionary of lists
self.statusBar().showMessage(f"{payload}")
try:
if (payload["type"] == "DATA"):
self.data = payload["DATA"]
# remove first entry and append plot data to end
self.plots = np.append(np.delete(self.plots, 0, 0), [[self.data["timestamp"], self.data["pressure_patient"], 0, 0]], axis=0)
except KeyError:
logging.warning(f"Invalid payload: {payload}")
@Slot(str, str, float)
def q_send_cmd(self, cmdtype: str, cmd: str, param: float = None) -> None:
"""send command to hevserver via socket"""
self.send_cmd(cmdtype=cmdtype, cmd=cmd, param=param)
@Slot(str)
def q_ack_alarm(self, alarm: str):
"""acknowledge an alarm in the hevserver"""
self.ack_alarm(alarm=alarm)
@Slot(str)
def q_send_personal(self, personal: str):
"""send personal details to hevserver"""
self.send_personal(personal=personal)
if __name__ == "__main__":
# parse args and setup logging
parser = argparse.ArgumentParser(description='Plotting script for the HEV lab setup')
parser.add_argument('-d', '--debug', action='count', default=0, help='Show debug output')
args = parser.parse_args()
if args.debug == 0:
logging.getLogger().setLevel(logging.WARNING)
elif args.debug == 1:
logging.getLogger().setLevel(logging.INFO)
else:
logging.getLogger().setLevel(logging.DEBUG)
# setup pyqtplot widget
app = QApplication(sys.argv)
dep = NativeUI()
dep.show()
app.exec_()
\ No newline at end of file
import logging
import argparse
import sys
from PySide2.QtCore import Slot
from PySide2.QtWidgets import QWidget, QApplication, QHBoxLayout, QVBoxLayout
from hevclient import HEVClient
from tab_plots import TabPlots
class MainView(QWidget):
def __init__(self, *args, **kwargs):
super(MainView, self).__init__(*args, **kwargs)
hlayout = QHBoxLayout()
vlayout = QVBoxLayout()
self.tab_plots = TabPlots()
vlayout.addWidget(self.tab_plots)
hlayout.addLayout(vlayout)
self.setLayout(hlayout)
\ No newline at end of file
../raspberry-dataserver/hevclient.py
\ No newline at end of file
#!/usr/bin/env python3
import logging
import os
import pyqtgraph as pg
import numpy as np
from PySide2 import QtWidgets, QtCore
from pyqtgraph import PlotWidget, plot, mkColor
from hevclient import HEVClient
logging.basicConfig(level=logging.WARNING, format='%(asctime)s - %(levelname)s - %(message)s')
class TabPlots(QtWidgets.QWidget):
def __init__(self, port=54322, *args, **kwargs):
super(TabPlots, self).__init__(*args, **kwargs)
self.history_length = 500
self.time_range = 30
self.port = port
layout = QtWidgets.QVBoxLayout()
self.graphWidget = pg.GraphicsLayoutWidget()
layout.addWidget(self.graphWidget)
self.pressurePlot = self.graphWidget.addPlot(title="Pressure")
self.graphWidget.nextRow()
self.flowPlot = self.graphWidget.addPlot(title="Flow")
self.graphWidget.nextRow()
self.volumePlot = self.graphWidget.addPlot(title="Volume")
self.graphWidget.nextRow()
self.timestamp = list(el*(-1) for el in range(self.history_length))[::-1]
self.PID_P = list(0 for _ in range(self.history_length))
self.PID_I = list(0 for _ in range(self.history_length))
self.PID_D = list(0 for _ in range(self.history_length))
self.graphWidget.setBackground(mkColor(30, 30, 30))
# Add grid
self.flowPlot.showGrid(x=True, y=True)
self.volumePlot.showGrid(x=True, y=True)
self.pressurePlot.showGrid(x=True, y=True)
# Set Range
self.flowPlot.setXRange(self.time_range * (-1), 0, padding=0)
self.volumePlot.setXRange(self.time_range * (-1), 0, padding=0)
self.pressurePlot.setXRange(self.time_range * (-1), 0, padding=0)
self.flowPlot.enableAutoRange('y', True)
self.volumePlot.enableAutoRange('y', True)
self.pressurePlot.enableAutoRange('y', True)
# Plot styles
self.line1 = self.plot(self.pressurePlot, self.timestamp, self.PID_P, "Airway Pressure", "077")
self.line2 = self.plot(self.flowPlot, self.timestamp, self.PID_D, "Flow", "00F")
self.line3 = self.plot(self.volumePlot, self.timestamp, self.PID_I, "Volume", "707")
self.setLayout(layout)
self.timer = QtCore.QTimer()
self.timer.setInterval(16) # just faster than 60Hz
self.timer.timeout.connect(self.update_plot_data)
self.timer.start()
def plot(self, canvas, x, y, plotname, color):
pen = pg.mkPen(color=color, width=3)
return canvas.plot(x, y, name=plotname, pen=pen)
def update_plot_data(self):
# subtract latest timestamp and scale to seconds
timestamp = np.true_divide(np.subtract(self.parent().parent().plots[:, 0], self.parent().parent().plots[-1, 0]), 1000)
self.line1.setData(timestamp, self.parent().parent().plots[:, 1])
self.line2.setData(timestamp, self.parent().parent().plots[:, 2])
self.line3.setData(timestamp, self.parent().parent().plots[:, 3])
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment