Commit fe941517 authored by Tiago Sarmento's avatar Tiago Sarmento

pushing to ventilator

parents 4285bd5f a6f1d21c
Pipeline #1856 failed with stages
......@@ -189,7 +189,7 @@ class NativeUI(HEVClient, QMainWindow):
# Set up the handlers
self.battery_handler = BatteryHandler()
self.data_handler = DataHandler(plot_history_length=1000)
self.measurement_handler = MeasurementHandler(self)
self.measurement_handler = MeasurementHandler(NativeUI = self)
self.personal_handler = PersonalHandler(self)
self.mode_handler = ModeHandler(self) # , self.mode_confirm_popup)
self.expert_handler = ExpertHandler(self)
......@@ -331,7 +331,6 @@ class NativeUI(HEVClient, QMainWindow):
self.messagePopup.cancelButton.pressed.connect(
lambda i=self.startupWidget: self.display_stack.setCurrentWidget(i)
)
self.widgets.calibration.button.pressed.connect(
lambda i=self.widgets.calibration: self.widgets.startup_handler.handle_calibrationPress(
i
......@@ -340,7 +339,6 @@ class NativeUI(HEVClient, QMainWindow):
self.widgets.calibration.button.pressed.connect(
lambda i=self.messagePopup: self.display_stack.setCurrentWidget(i)
)
self.widgets.leak_test.button.pressed.connect(
lambda i=self.widgets.leak_test: self.widgets.startup_handler.handle_calibrationPress(
i
......@@ -366,6 +364,9 @@ class NativeUI(HEVClient, QMainWindow):
i, j
)
)
self.widgets.maintenance_handler.MaintenanceConfigUpdate.connect(
self.widgets.maintenance_time_display_widget.update_time
)
# Startup next and skip buttons should move from the startup widget to the main
# display
......@@ -525,8 +526,12 @@ class NativeUI(HEVClient, QMainWindow):
button_widget.pressed.connect(self.clinical_handler.commandSent)
elif isinstance(button_widget, CancelButtonWidget):
# mode = self.mode_handler.get_mode(key)
button_widget.clicked.connect(lambda i=key: self.mode_handler.commandSent(i))
button_widget.pressed.connect(lambda i=key: self.clinical_handler.commandSent(i))
button_widget.clicked.connect(
lambda i=key: self.mode_handler.commandSent(i)
)
button_widget.pressed.connect(
lambda i=key: self.clinical_handler.commandSent(i)
)
for key, spin_widget in self.clinical_handler.limSpinDict.items():
spin_widget.simpleSpin.manualChanged.connect(
......@@ -790,6 +795,11 @@ class NativeUI(HEVClient, QMainWindow):
for handler in self.__payload_handlers:
if handler.set_db(payload) == 0:
payload_registered = True
logging.debug(
"Payload type: %s. Registered by handler: %s",
payload["type"],
handler.name,
)
if not payload_registered:
logging.warning("Handlers: Invalid payload type: %s", payload["type"])
logging.debug("Content of invalid payload:\n%s", payload)
......
......@@ -24,7 +24,7 @@ class AlarmHandler(PayloadHandler):
RemoveAlarm = QtCore.Signal(QtWidgets.QWidget)
def __init__(self, NativeUI, *args, **kwargs):
super().__init__(['DATA', 'ALARM'],*args, **kwargs)
super().__init__(['DATA', 'ALARM'],*args, name="Alarm handler", **kwargs)
self.NativeUI = NativeUI
self.alarmDict = {}
......@@ -43,7 +43,7 @@ class AlarmHandler(PayloadHandler):
#outdict = {}
full_payload = args[0]
#print(full_payload['alarms'])
#logging.debug(full_payload['alarms'])
currentAlarms = full_payload['alarms']#self.NativeUI.ongoingAlarms # instead of getting database at a particular frequency, this should be triggered when a new alarm arrives
self.alarm_list = currentAlarms
#self._set__alarm_list(currentAlarms)
......
{
"maintenance": {
"label": "maintenance",
"last_performed": 0,
"cmd_code": "main_tenance",
"message": "Was maintenance carried out successfully?"
}
}
{
"maintenance": {
"label": "maintenance",
"last_performed": 0,
"cmd_code": "main_tenance",
"message": "Was maintenance carried out successfully?"
}
}
......@@ -10,11 +10,5 @@
"last_performed": 0,
"cmd_code": "leak_test",
"message": "Ensure patient is disconnected from ventilator"
},
"maintenance": {
"label": "maintenance",
"last_performed": 0,
"cmd_code": "main_tenance",
"message": "Was maintenance carried out successfully?"
}
}
......@@ -249,7 +249,8 @@ class labelledSpin(QtWidgets.QWidget):
return 0
self.currentDbValue = newVal
self.simpleSpin.setValue(newVal)
self.simpleSpin.setValue(float(newVal))
if self.isEnabled():
self.simpleSpin.setProperty("textColour", "0")
else:
......
......@@ -351,7 +351,7 @@ class AbstractTypeValPopup(QtWidgets.QDialog):
# ) # no window title
#
# def handle_ok_press(self):
# print(self.cha)
# logging.debug(self.cha)
# val = self.lineEdit.text()
# self.currentWidg.setValue(float(val))
# self.close()
......
"""
alarm_handler.py
"""
from handler_library.handler import PayloadHandler
from PySide2.QtCore import QObject
class AlarmHandler(PayloadHandler, QObject):
"""
Subclass of the PayloadHandler class (handler.py) to handle alarm data.
Inherits from QObject to give us access to pyside2's signal class.
"""
def __init__(self):
super().__init__()
QObject.__init__(self)
......@@ -10,8 +10,8 @@ class BatteryHandler(PayloadHandler):
UpdateBatteryDisplay = Signal(dict)
def __init__(self, *args, **kwargs):
super().__init__(["BATTERY"], *args, **kwargs)
def __init__(self, *args, name="Battery handler", **kwargs):
super().__init__(["BATTERY"], *args, name=name, **kwargs)
def active_payload(self, *args, **kwargs) -> int:
"""
......
......@@ -22,7 +22,7 @@ class ClinicalHandler(PayloadHandler):
# settingToggle = QtCore.Signal(str)
def __init__(self, NativeUI, *args, **kwargs):
super().__init__(["TARGET"], *args, **kwargs)
super().__init__(["TARGET"], *args, name="Clinical handler", **kwargs)
# super(TabModes, self).__init__(NativeUI, *args, **kwargs)
self.NativeUI = NativeUI
self.limSpinDict = {}
......@@ -88,7 +88,7 @@ class ClinicalHandler(PayloadHandler):
return 0
def handle_okbutton_click(self):
print('handling ok clicks')
logging.debug('handling ok clicks')
message, command = [], []
for key, widget in dict(self.limSpinDict, **self.setSpinDict).items():
if widget.manuallyUpdated:
......@@ -99,7 +99,7 @@ class ClinicalHandler(PayloadHandler):
command.append([widget.cmd_type, widget.cmd_code, setVal])
# create a signal emitting message, command, handler identifier - in nativeui connect to a popup widget
# command sending should occur in handler
print(message)
logging.debug(message)
self.commandList = command
self.OpenPopup.emit(self, message)
......@@ -113,7 +113,7 @@ class ClinicalHandler(PayloadHandler):
return 0
def handle_cancelbutton_click(self):
print('handling a click')
logging.debug('handling a click')
for key, widget in dict(self.limSpinDict, **self.setSpinDict).items():
widget.manuallyUpdated = False
widget.set_value(self.valueDict[key])
......@@ -122,7 +122,7 @@ class ClinicalHandler(PayloadHandler):
self.refresh_button_colour()
def handle_cancel_pressed(self, buttonMode):
print('handle cancel button press in clinical')
logging.debug('handle cancel button press in clinical')
if buttonMode == self.NativeUI.currentMode:
logging.debug("modes match ")
self.commandSent()
......@@ -189,11 +189,11 @@ class ClinicalHandler(PayloadHandler):
denominator = setValue # get percentage if looking for percentage
pct_to_max = 100 * (maxValue - setValue) / denominator
pct_to_min = 100 * (minValue - setValue) / denominator
# print('maximum is ' + str(limMax))
# print('pct to max is ' + str(pct_to_max))
# print('maxval is ' + str(maxValue))
# print('setVal is ' + str(setValue))
# print('denom is ' + str(denominator))
# logging.debug('maximum is ' + str(limMax))
# logging.debug('pct to max is ' + str(pct_to_max))
# logging.debug('maxval is ' + str(maxValue))
# logging.debug('setVal is ' + str(setValue))
# logging.debug('denom is ' + str(denominator))
if round(pct_to_max, 4) <= round(
limMax, 4
......
......@@ -7,8 +7,8 @@ from threading import Lock
class DataHandler(PayloadHandler):
UpdatePlots = Signal(dict)
def __init__(self, *args, plot_history_length=500, **kwargs):
super().__init__(["DATA", "CYCLE"], *args, **kwargs)
def __init__(self, *args, plot_history_length=500, name="Data handler", **kwargs):
super().__init__(["DATA", "CYCLE"], *args, name=name, **kwargs)
self.__plot_history_length = plot_history_length
self.__plots_database = {
"data": np.zeros((plot_history_length, 4)),
......@@ -39,7 +39,7 @@ class DataHandler(PayloadHandler):
if payload["type"] == "DATA":
# The earliest index to plot for cycle plots needs to move back to keep pace
# with the data.
# with the data (unless it would move past 0, in which case set it to 0).
self.__cycle_index = [
index - 1 if (index - 1 >= 0) else 0 for index in self.__cycle_index
]
......@@ -83,6 +83,10 @@ class DataHandler(PayloadHandler):
self.__cycle_index[0] :, 3
]
elif payload["type"] == "CYCLE":
# When a new cycle starts, we want to stop plotting the data of the oldest
# cycle so we remove that index from the __cycle_index list and add the
# highest possible index (which is also the data index at which the cycle
# was recieved) to the end.
self.__cycle_index.pop(0)
self.__cycle_index.append(self.__plot_history_length)
......
......@@ -15,7 +15,7 @@ class ExtraHandler(PayloadHandler):
def __init__(self, *args, **kwargs):
super().__init__(["ALARM_MUTE", "BAD_THRESHOLD"], *args, **kwargs)
def active_payload(self, *args, **kwargs) -> int:
def active_payload(self, *args, name="Extra handler", **kwargs) -> int:
"""
When the personal data is updated, extract those parameters needed for display
and emit the UpdatePersonalDisplay signal.
......
......@@ -19,10 +19,11 @@ class GenericDataHandler(QObject):
Base class for non-payload data handlers.
"""
def __init__(self, *args, **kwargs):
def __init__(self, *args, name: str = "GenericDataHandler", **kwargs):
super().__init__(*args, **kwargs)
self.__database = {}
self.__lock = Lock()
self.name = name
def set_db(self, data: dict) -> int:
"""
......@@ -35,6 +36,14 @@ class GenericDataHandler(QObject):
self.on_data_set()
return 0
def clear_db(self) -> int:
"""
Clear the contents of the __database dictionary
"""
with self.__lock:
self.__database.clear()
return 0
def get_db(self) -> dict:
"""
Return the content of the __database dictionary.
......@@ -57,8 +66,8 @@ class PayloadHandler(GenericDataHandler):
Base class for the payload data handlers.
"""
def __init__(self, payload_types: list, *args, **kwargs):
super().__init__(*args, **kwargs)
def __init__(self, payload_types: list, *args, name="PayloadHandler", **kwargs):
super().__init__(*args, name=name, **kwargs)
for key in payload_types:
if not isinstance(key, str):
raise TypeError(
......@@ -95,3 +104,11 @@ class PayloadHandler(GenericDataHandler):
so that we have access to the full context of the information.
"""
pass
def clear_db(self):
"""
Purge all information from the database.
"""
with self.__lock:
self.__database = {}
return 0
from global_widgets.global_spinbox import labelledSpin
from widget_library.startup_calibration_widget import calibrationWidget
from widget_library.ok_cancel_buttons_widget import (
OkButtonWidget,
CancelButtonWidget,
OkSendButtonWidget,
)
from global_widgets.global_send_popup import SetConfirmPopup
from PySide2.QtWidgets import QRadioButton
from datetime import datetime
import json
from PySide2 import QtWidgets, QtGui, QtCore
import logging
class MaintenanceHandler(
QtWidgets.QWidget
): # chose QWidget over QDialog family because easier to modify
OpenPopup = QtCore.Signal(list)
MaintenanceConfigUpdate = QtCore.Signal(dict)
def __init__(self, NativeUI, *args, **kwargs):
super().__init__(*args, **kwargs)
self.NativeUI = NativeUI
self.calibDict = {}
self.calibs_done_dict = {}
self.activeWidget = None
def add_widget(self, widget, key: str):
if isinstance(widget, labelledSpin):
self.spinDict[key] = widget
widget.cmd_type = widget.cmd_type.replace("startup", "CURRENT")
if isinstance(widget, calibrationWidget):
self.calibDict[key] = widget
if (
isinstance(widget, OkButtonWidget)
or isinstance(widget, CancelButtonWidget)
or isinstance(widget, OkSendButtonWidget)
):
self.buttonDict[key] = widget
if isinstance(widget, QRadioButton):
if widget.text() in self.NativeUI.modeList:
self.modeRadioDict[key] = widget
else:
self.settingsRadioDict[key] = widget
def handle_calibrationPress(self, calibrationWidget) -> int:
"""
When a calibration buttonis pressed, run the corresponding calibration. If all
calibrations are completed, emit the CalibrationComplete signal.
Currently doesn't actually do any calibrations, just a placeholder for now.
"""
self.activeWidget = calibrationWidget
logging.debug(self.activeWidget)
logging.debug(NativeUI.messagePopup)
self.NativeUI.messagePopup.setLabelText(calibrationWidget.infoDict["message"])
def carryout_calibration(self) -> int:
assert self.activeWidget is not None
calibrationWidget = self.activeWidget
if "calibration" in calibrationWidget.infoDict["label"]:
self.NativeUI.q_send_cmd("DO_CALIBRATION", "DO_CALIBRATION")
elif "Leak_Test" in calibrationWidget.infoDict["label"]:
self.NativeUI.q_send_cmd()
else:
logging.debug("ERROR: label does not mach any calibration procedure")
# self.NativeUI.display_stack.setCurrentWidget(self.NativeUI.startupWidget)
calibrationWidget.progBar.setValue(100)
calibrationWidget.lineEdit.setText("completed")
with open("NativeUI/configs/maintenance_config.json", "r") as json_file:
startupDict = json.load(json_file)
startupDict[calibrationWidget.key]["last_performed"] = int(
datetime.now().timestamp()
)
with open("NativeUI/configs/maintenance_config.json", "w") as json_file:
json.dump(startupDict, json_file)
self.MaintenanceConfigUpdate.emit(startupDict)
logging.debug("Writing to %s", json_file)
self.calibs_done_dict[calibrationWidget.key] = True
return 0
def localise_text(self, text: dict) -> int:
"""
Change the language of text elements.
"""
pass
......@@ -14,8 +14,8 @@ class MeasurementHandler(PayloadHandler):
UpdateMeasurements = Signal(dict)
def __init__(self, NativeUI, *args, **kwargs):
super().__init__(["CYCLE", "READBACK"], *args, **kwargs)
def __init__(self, NativeUI=None, *args, name="Measurement handler", **kwargs):
super().__init__(["CYCLE", "READBACK"], *args, name=name, **kwargs)
self.NativeUI = NativeUI
def active_payload(self, *args, **kwargs) -> int:
......@@ -43,7 +43,13 @@ class MeasurementHandler(PayloadHandler):
outdict[key] = cycle_data[key]
except KeyError:
logging.debug("Invalid key %s in measurement database", key)
self.NativeUI.currentMode = cycle_data["ventilation_mode"]
print(self.NativeUI.currentMode)
if self.NativeUI is None:
logging.warning("MeasurementHandler initialised with nonetype NativeUI - ignore this warning if unit testing")
else:
try:
self.NativeUI.currentMode = cycle_data["ventilation_mode"]
logging.debug(self.NativeUI.currentMode)
except KeyError as e:
logging.error(e)
self.UpdateMeasurements.emit(outdict)
return 0
......@@ -22,7 +22,7 @@ class ModeHandler(PayloadHandler):
settingToggle = QtCore.Signal(str)
def __init__(self, NativeUI, *args, **kwargs):
super().__init__(["TARGET"], *args, **kwargs)
super().__init__(["TARGET"], *args, name="Mode handler", **kwargs)
# super(TabModes, self).__init__(NativeUI, *args, **kwargs)
self.NativeUI = NativeUI
self.spinDict = {}
......@@ -64,7 +64,7 @@ class ModeHandler(PayloadHandler):
def active_payload(self, *args) -> int:
target_data = self.get_db()
print(target_data)
logging.debug(target_data)
outdict = {}
for key in self.relevantKeys:
......@@ -133,17 +133,17 @@ class ModeHandler(PayloadHandler):
return 0
def handle_cancel_pressed(self, buttonMode: str):
print('handle cancel button press in mode')
logging.debug('handle cancel button press in mode')
for widget in self.spinDict:
if buttonMode in widget or self.NativeUI.currentMode in widget:
self.spinDict[widget].manuallyUpdated = False
print('checking')
print(buttonMode)
print(self.NativeUI.currentMode)
print(widget)
print('cancel pressed')
print(buttonMode)
print(self.NativeUI.currentMode)
logging.debug('checking')
logging.debug(buttonMode)
logging.debug(self.NativeUI.currentMode)
logging.debug(widget)
logging.debug('cancel pressed')
logging.debug(buttonMode)
logging.debug(self.NativeUI.currentMode)
if buttonMode == self.NativeUI.currentMode or buttonMode == 'CURRENT':
logging.debug("modes match ")
for widget in self.mainSpinDict:
......@@ -156,14 +156,14 @@ class ModeHandler(PayloadHandler):
def commandSent(self, key = 'CURRENT'):
self.commandList = []
print('going to use ')
print(key)
logging.debug('going to use ')
logging.debug(key)
mode = self.get_mode(key)
if mode == 'CURRENT':
mode == self.NativeUI.currentMode
for widget in self.spinDict:
if mode.replace("/", "_").replace("-", "_") in widget.replace("/", "_").replace("-", "_"):#self.NativeUI.currentMode.replace("/", "_").replace("-", "_") in widget:
print('setting false ' + widget)
logging.debug('setting false ' + widget)
self.spinDict[widget].manuallyUpdated = False
for widget in self.mainSpinDict:
self.mainSpinDict[widget].manuallyUpdated = False
......@@ -252,5 +252,5 @@ class ModeHandler(PayloadHandler):
for mode in self.modeList + ["CURRENT"]:
if mode.replace("/", "_").replace("-", "_") in key.replace("/", "_").replace("-", "_"):
return mode
print('failed to get mode out of')
print(key)
logging.debug('failed to get mode out of')
logging.debug(key)
......@@ -16,8 +16,8 @@ class PersonalHandler(PayloadHandler):
UpdatePersonalDisplay = Signal(dict)
def __init__(self, *args, **kwargs):
super().__init__(["PERSONAL"], *args, **kwargs)
def __init__(self, *args, name="Personal handler", **kwargs):
super().__init__(["PERSONAL"], *args, name=name, **kwargs)
def active_payload(self, *args, **kwargs) -> int:
"""
......
import logging
from handler_library.handler import GenericDataHandler
from PySide2.QtCore import Signal, Slot
from PySide2.QtGui import QFont
import json
import os
class TextHandler(GenericDataHandler):
UpdateText = Signal(dict)
def __init__(self, localisation_files: list, *args, **kwargs):
super().__init__(*args, **kwargs)
self.__localisation_files: list = []
self.__localisation_index = 0
for filepath in localisation_files:
if os.path.isfile(filepath):
self.__localisation_files.append(filepath)
else:
raise FileNotFoundError(
"Could not locate localisation config file %s", filepath
)
self.set_db(
{
"label_text_font": QFont("Sans Serif", 20),
"measurement_text_font": QFont("Sans Serif", 40),
}
)
def next_config(self) -> int:
"""
Iterate the localisation to the next option in the __localisation_files list.
"""
index = self.__localisation_index
index += 1
if index > len(self.__localisation_files):
index = 0
self.set_localisation(index)
return 0
@Slot()
def set_localisation(self, index: int) -> int:
"""
Update the current localisation index to the specified value and load the
corresponding file.
"""
if index > len(self.__localisation_files):
raise IndexError(
"Could not index list %s with index %s",
(self.__localisation_files, index),
)
self.__localisation_index = index
with open(self.__localisation_files[self.__localisation_index]) as infile:
self.set_db(json.load(infile))
return 0
def on_data_set(self) -> int:
"""
When the database is updated, emit a signal containing the current text dictionary
"""
self.UpdateText.emit(self.get_db())
return 0
......@@ -20,8 +20,8 @@ class PersonalHandler(
UpdatePersonal = Signal(dict)
OpenPopup = QtCore.Signal(PayloadHandler, list)
def __init__(self, NativeUI, *args, **kwargs):
super().__init__(["PERSONAL"], *args, **kwargs)
def __init__(self, NativeUI, *args, name="Personal handler", **kwargs):
super().__init__(["PERSONAL"], *args, name=name, **kwargs)
self.NativeUI = NativeUI
self.spinDict = {}
self.buttonDict = {}
......
# Tests Documentation
Install pytest & pytest-qt:
```bash
cd hev
. .hev_env/bin/activate
pip3 install pytest
pip3 install pytest-qt
```
⚠️ For unit and integration testing, remember️ to export your PYTHONPATH environment variable as shown below.
## Unit Tests
To run all unit tests in the `NativeUI/tests/unittests` dir on a Raspberry Pi or VM, run the following
(adjust the full path to your NativeUI directory accordingly):
```bash
cd hev
. .hev_env/bin/activate
export PYTHONPATH=/home/pi/hev/NativeUI
pytest NativeUI/tests/unittests
......@@ -19,18 +31,50 @@ pytest NativeUI/tests/unittests/test_hevclient.py
## Integration Tests
To run all integration tests, the Arduino emulator and hevserver processes first need to be running, and
then run all integrations tests in the `NativeUI/tests/integration` dir:
* ⚠️ Integration tests automatically start / stop the ArduinoEmulator & hevserver, there is no longer any need to run
them manually from the CLI.
* ⚠️ Some integration tests spin up the GUI and simulate button clicks / and interactions with the GUI.
These tests then assert that the state of the GUI has changed as expected.
Therefore, a DISPLAY is (currently) needed.
To run all integration tests in the `NativeUI/tests/integration` dir:
```bash
cd hev
. .hev_env/bin/activate
export PYTHONPATH=/home/pi/hev/NativeUI
./raspberry-dataserver/ArduinoEmulator.py -f raspberry-dataserver/share/B6-20201207.dump &
./raspberry-dataserver/hevserver.py --use-dump-data &
pytest NativeUI/tests/integration
```
### Coverage
* To run a single integration test file, set the environment as above and specify the test file:
```bash
pytest NativeUI/tests/integration/test_hevclient_integration.py
```
### Integration tests automatically start / stop the ArduinoEmulator & hevserver
`integration_utils.py` has functions to start/manage/kill the ArduinoEmulator and hevserver
from within pytest code INSTEAD of running these background processes separately on the CLI.
Nothing wrong with using the CLI, but managing these processes from within test-code itself
can be useful - tests can be more atomic and easier to manage: e.g. you can choose to
start/kill these background processes for each/every test individually
and/or once for a particular test-suite.
The `sample_test_hevclient_integration.py` shows sample usage (note, don't run
this as a pytest if hevserver/ArduinoEmulator is already running, it will clash).
This sample integration test is purposefully not pre/appended with 'test' so that
it is excluded from pytest by default, but you can run it explicitly as below:
```bash
cd hev
. .hev_env/bin/activate
export PYTHONPATH=/home/pi/hev/NativeUI
pytest NativeUI/tests/integration/sample_test_hevclient_integration.py
```
## Coverage
To get pytest coverage run from the root of the repo:
......
"""
- give NativeUI.get_updates() a fake payload of a high pressure alarm at high priority.
- that triggers NativeUI.set_alarms_db() which sets __alarms DB
- hev_alarms.updateAlarms() is run regularly to check for new alarms. When new alarms are added to NativeUI.__alarms hev_alarms.updateAlarms() triggers.
- hev_alarms.updateAlarms adds the alarm to the alarmDict, creates an alarm popup, and adds it to the alarm table displayed in the UI.
"""
# NativeUI.get_updates()
# NativeUI.set_alarms_db()
# __alarms DB Updated
# hev_alarms.updateAlarms()
# abstractAlarm
# TODO get a fake payload from the MCU side. Currently (8.4.21) not available as MCU and UI are not linked.
import json
......@@ -20,6 +7,7 @@ import sys
from PySide2.QtWidgets import QApplication
import time
import _thread
import pytest
current_path = os.path.abspath(os.getcwd())
root_path = os.path.abspath(current_path + "/NativeUI")
......@@ -34,14 +22,20 @@ import hevclient
# - Sort Threading: In order to test the UI, the PyQT app needs to be running. This is controlled by app.exec_() (line 110), however when the app is running the main thread is locked out of firing off any other functions (i.e. test_high_pressure_alarm_high_priority()). Currently, the test_high_pressure_alarm_high_priority() test function runs in a secondary thread which doesn't interact with the app running in the main thread.
# - Assert that the __alarms DB is modified correctly (line 77-79)
@pytest.fixture(scope="session", autouse=True)
def myNativeUI():
if not QApplication.instance():
app = QApplication(sys.argv)
else:
app = QApplication.instance()
myNativeUI = NativeUI()
return myNativeUI
def test_high_pressure_alarm_high_priority(NativeUI):
def test_high_pressure_alarm_high_priority(myNativeUI):
"""
The intention of this test is to solve the risk with Risk ID: SW11.
When excessive airway pressure is applied the microcontroller should send a High Pressure Alarm at a High Priority. This integration test tests from the point when that payload is fed into the UI. It will test that the payload will propgate through NativeUI app correctly and create a popup and add the alarm to the list of alarms.
"""
myNativeUI = NativeUI
# define vars
popup_activated = False
list_activated = False
......
"""
Utils for integration tests.
Includes functions to start and kill the background ArduinoEmulator
and hevserver scripts and manage them from within code, rather than
using the CLI. Nothing wrong with using CLI, but managing these
processes from within test-code itself is useful for testing
can be more atomic and easier to manage: consider starting/killing
the background processes for each/every test individually and/or once
for a particular test-suite.
"""
import os
import time
import signal
from pathlib import Path
from subprocess import Popen
import pytest
def start_background_arduinosim_hevserver() -> [Popen]:
"""Start the ArduinoEmulator.pt and Hevserver.py and return a
Popen that represents a *nix process group. The process-group.pid is
a linux session id that can be used to kill spawned child processes
started by the arduino and hevserver."""
pythonpath_val = os.path.expandvars("$PYTHONPATH")
myenv = {"PYTHONPATH": pythonpath_val}
pypath = Path(pythonpath_val)
proj_root_path = str(pypath.parent.absolute())
procs = []
# Run with preexec_fn=os.setid. Is run after fork() but before exec() to get a sessionId for
# killing child procs spawned off shell (means SIGTERMing the parent shell SIGTERMs children)
# B6-20201207.dump
process1 = Popen(
"""
{ . ./.hev_env/bin/activate ;
cd ./raspberry-dataserver ;
./ArduinoEmulator.py -f ./share/FromDerick.dump &
./hevserver.py --use-dump-data &
}
""",
cwd=proj_root_path,
shell=True,
env=myenv,
# stderr=subprocess.PIPE, stdout=subprocess.PIPE, # don't use in our case - overrides capsys
text=True,
preexec_fn=os.setsid,
)
procs.append(process1)
time.sleep(2) # give enough time for scripts to generate stderr/out
return procs
def kill_process_group(pid: int):
"""Safely SIGTERM the process group with the given id"""
if pid is not None and pid > 0:
os.killpg(os.getpgid(pid), signal.SIGTERM)
def assert_posix():
"""Assert the test platform is posix based"""
assert os.name == "posix"
def assert_pythonpath():
"""Assert the PYTHONPATH env var has been set"""
pythonpath_key = "$PYTHONPATH"
pythonpath_val = os.path.expandvars(pythonpath_key)
if pythonpath_val == pythonpath_key:
pytest.fail(msg="Please set the $PYTHONPATH env var")
"""
Create a QT GUI and simulate button clicks.
Also start ArduinoEmulator & hevserver background processes to show
how we could test NativeUI.
Script is purposefully named without a leading or trailing 'test' in
the script name so that it doesn't get executed by pytest by default.
Make sure your PYTHONPATH var is set to the full path of:
'/<your_hev_root_dir>/hev/NativeUI'.
"""
import os
import random
import sys
import tempfile
import time
import hevclient
from tests.integration.integration_utils import start_background_arduinosim_hevserver,\
kill_process_group, assert_posix, assert_pythonpath
import PySide2
# from PySide2 import QtCore, QtWidgets, QtGui
# from PySide2.QtWidgets import QApplication, QMainWindow, QWidget, QDialog
from PySide2 import QtCore, QtWidgets
from PySide2.QtWidgets import QWidget
# Overwrite the mm file for OS agnostic testing
hevclient.mmFileName = tempfile.gettempdir() + os.path.sep + "HEVCLIENT_last_Data.mmap"
def test_qtbot(qtbot, tmpdir):
"""
Sample integration test showing how to test a QT GUI application
with simulated mouse clicks. Also start background arduino and
hev to show how we could do this when the GUI is NativeUI.
"""
assert_posix()
assert_pythonpath()
background_procs = None
try:
background_procs = start_background_arduinosim_hevserver()
time.sleep(2)
print(PySide2.__version__)
# Prints the Qt version used to compile PySide2
print(PySide2.QtCore.__version__)
if not QtWidgets.QApplication.instance():
app = QtWidgets.QApplication(sys.argv)
else:
app = QtWidgets.QApplication.instance()
widget = MyTestWidget()
qtbot.addWidget(widget)
# assert starting label before button click
assert widget.text.text() == "Hello World"
assert widget.click_count == 0
# click button and make sure it updates the appropriate label
qtbot.mouseClick(widget.button, QtCore.Qt.LeftButton)
assert widget.text.text() == "Hallo Welt"
assert widget.click_count == 1
qtbot.mouseClick(widget.button, QtCore.Qt.LeftButton)
assert widget.click_count == 2
finally:
if background_procs is not None:
for proc in background_procs:
kill_process_group(proc.pid)
class MyTestWidget(QWidget):
def __init__(self):
super().__init__()
self.click_count = 0
self.hello = ["Hallo Welt"]
self.button = QtWidgets.QPushButton("Please click me, oh please!")
self.text = QtWidgets.QLabel("Hello World")
self.text.setAlignment(QtCore.Qt.AlignCenter)
self.layout = QtWidgets.QVBoxLayout()
self.layout.addWidget(self.text)
self.layout.addWidget(self.button)
self.setLayout(self.layout)
self.button.clicked.connect(self.magic)
def magic(self):
self.text.setText(random.choice(self.hello))
self.click_count += 1
"""
Start ArduinoEmulator & hevserver background processes and assert expected
hevclient state/behaviour. Safely kills the background processes so you don't
have to use the CLI. Aim is to help integration tests to be more atomic and
easy to execute. Nothing wrong with using CLI, but managing these
processes from within test-code itself is useful for testing
can be more atomic and easier to manage: consider starting/killing
the background processes for each/every test individually and/or once
for a particular test-suite
Script is purposefully named without a leading or trailing 'test' in
the script name so that it doesn't get executed by pytest by default.
Make sure your PYTHONPATH var is set to the full path of:
'/<your_hev_root_dir>/hev/NativeUI'.
"""
import os
import tempfile
import time
import hevclient
from hevclient import HEVClient
from tests.integration.integration_utils import start_background_arduinosim_hevserver,\
kill_process_group, assert_posix, assert_pythonpath
# Overwrite the mm file for OS agnostic testing
hevclient.mmFileName = tempfile.gettempdir() + os.path.sep + "HEVCLIENT_last_Data.mmap"
def test_hevclient(capsys, caplog):
"""
Start the required background ArduinoSimulator and hevserver and
assert the state of hevclient.
IF the test fails unexpectedly due to e.g. raised exception,
can't guarantee cleanup of the background processes so do check on the CLI!
"""
assert_posix()
assert_pythonpath()
background_procs = None
try:
background_procs = start_background_arduinosim_hevserver()
HEVClient(True)
time.sleep(1) # give enough time for hevclient to log
captured = capsys.readouterr()
capsysout = captured.out
capsyserr = captured.err
assert "Err" not in capsyserr
assert "Err" not in capsysout
assert "is the microcontroller running" not in caplog.text
assert "[Errno" not in caplog.text
finally:
if background_procs is not None:
for proc in background_procs:
kill_process_group(proc.pid)
def test_hevclient_alarms():
"""
Start the required background ArduinoSimulator and hevserver and
assert the state of hevclient alarms using B6-20201207.dump file.
IF the test fails unexpectedly due to e.g. raised exception,
can't guarantee cleanup of the background processes so do check on the CLI!
"""
assert_posix()
assert_pythonpath()
background_procs = None
try:
background_procs = start_background_arduinosim_hevserver()
myhevclient = HEVClient(True)
time.sleep(2)
alarms = myhevclient.get_alarms()
alarms_as_str = ''.join(str(v) for v in alarms)
# TODO meaningful assertions:
assert 'PRIORITY_HIGH' in alarms_as_str
assert 'PRIORITY_MEDIUM' in alarms_as_str
assert '182' in alarms_as_str
assert 'LOW_VTI' in alarms_as_str
print(alarms)
finally:
if background_procs is not None:
for proc in background_procs:
kill_process_group(proc.pid)
"""
Perform tests by creating NativeUI and using qtbot to read the
GUI directly and assert readings (assumes a known dump file).
Make sure your PYTHONPATH var is set to the full path of:
'/<your_hev_root_dir>/hev/NativeUI'.
"""
import os
import tempfile
import statistics
import pytest
import hevclient
from tests.integration.integration_utils import start_background_arduinosim_hevserver, \
kill_process_group, assert_posix, assert_pythonpath
from NativeUI import set_window_size, interpret_resolution
from NativeUI import NativeUI
# Overwrite the mm file for OS agnostic testing
hevclient.mmFileName = tempfile.gettempdir() + os.path.sep + "HEVCLIENT_last_Data.mmap"
@pytest.fixture(scope="class")
def background_procs():
"""
Fixutre to start the Arduino emulator and hevserver as background
processes and kill the processes after the test.
"""
background_processes = start_background_arduinosim_hevserver()
yield background_processes
if background_processes is not None:
for proc in background_processes:
kill_process_group(proc.pid)
@pytest.fixture
def native_ui(qtbot):
"""
Fixture to inject the NativeUI. Fixture waits until GUI is shown.
"""
# if not QtWidgets.QApplication.instance():
# app = QtWidgets.QApplication(sys.argv)
# else:
# app = QtWidgets.QApplication.instance()
res = interpret_resolution(input_string='1024*768')
native_gui = NativeUI(resolution=res)
set_window_size(
native_gui,
resolution='1024*768',
windowed=False,
)
qtbot.addWidget(native_gui)
native_gui.show()
qtbot.waitForWindowShown(native_gui)
return native_gui
def test_steady_state_measurements_from_gui(background_procs, native_ui, qtbot):
"""
Create the NativeUI and use qtbot to read the measurement
GUI items directly from the UI and assert the steady-state
of those readings (assumes a known dump file).
"""
assert_posix()
assert_pythonpath()
# wait for approx 1 min because it takes that
# long for the real dump data to start reading meaningful values.
qtbot.wait(60000)
# To store measurement readings read straight from the GUI
plateau_pressures = []
respiratory_rates = []
fio2_percentages = []
exhaled_tidal_volumes = []
exhaled_minute_volumes = []
peeps = []
sample_size = 5
time_step_ms = 4000
widget_measurement_list = native_ui.widgets.normal_measurements.widget_list
for _ in range(sample_size):
plateau_pressure_sample = float(widget_measurement_list[0].value_display.text())
plateau_pressures.append(plateau_pressure_sample)
assert plateau_pressure_sample < 20 # expected: 19.5
assert plateau_pressure_sample > 19
respiratory_rate_sample = float(widget_measurement_list[1].value_display.text())
respiratory_rates.append(respiratory_rate_sample)
assert respiratory_rate_sample < 16.0 # expected: 15.0
assert respiratory_rate_sample > 14.0
fio2_percentage_sample = float(widget_measurement_list[2].value_display.text())
fio2_percentages.append(fio2_percentage_sample)
assert fio2_percentage_sample < 105 # expected: 103.8
assert fio2_percentage_sample > 102
exhaled_tidal_sample = float(widget_measurement_list[3].value_display.text())
exhaled_tidal_volumes.append(exhaled_tidal_sample)
assert exhaled_tidal_sample < 276 # expected: 275.7
assert exhaled_tidal_sample > 275
exhaled_minute_sample = float(widget_measurement_list[4].value_display.text())
exhaled_minute_volumes.append(exhaled_minute_sample)
assert exhaled_minute_sample < 4.5 # expected: 4.2
assert exhaled_minute_sample > 3.5
peeps_sample = float(widget_measurement_list[5].value_display.text())
peeps.append(peeps_sample)
assert peeps_sample < 8.5 # expected: 8.2
assert peeps_sample > 8.0
qtbot.wait(time_step_ms)
# Calculate standard deviation
plateau_pressures_stdev = statistics.stdev(plateau_pressures)
respiratory_rates_stdev = statistics.stdev(respiratory_rates)
fio2_percent_stdev = statistics.stdev(fio2_percentages)
exhaled_tidal_volumes_stdev = statistics.stdev(exhaled_tidal_volumes)
exhaled_minute_volumes_stdev = statistics.stdev(exhaled_minute_volumes)
peeps_stdev = statistics.stdev(peeps)
print("plateau_pressures_stdev: " + str(plateau_pressures_stdev))
print("respiratory_rates_stdev: " + str(respiratory_rates_stdev))
print("fio2_percent_stdev: " + str(fio2_percent_stdev))
print("exhaled_tidal_volumes_stdev: " + str(exhaled_tidal_volumes_stdev))
print("exhaled_minute_volumes_stdev: " + str(exhaled_minute_volumes_stdev))
print("peeps_stdev: " + str(peeps_stdev))
# Assert variance is below a sensible threshold - this is a
# 'steady-state' dump file we're asserting so we assume quite a low variance
coefficient_of_variation = 0.1
assert plateau_pressures_stdev <= coefficient_of_variation
assert respiratory_rates_stdev <= coefficient_of_variation
assert fio2_percent_stdev <= coefficient_of_variation
assert exhaled_tidal_volumes_stdev <= coefficient_of_variation
assert exhaled_minute_volumes_stdev <= coefficient_of_variation
assert peeps_stdev <= coefficient_of_variation
"""
Integration test for payload handlers.
"""
import json
import sys
import os
import pytest
import logging
from unittest.mock import MagicMock
from PySide2.QtWidgets import QApplication
current_path = os.path.abspath(os.getcwd())
root_path = os.path.abspath(current_path + "/NativeUI")
sys.path.append(root_path)
from NativeUI import NativeUI
# remove once all other jsons have been created and imported
TEST_JSON = json.load(
open(
os.environ["PYTHONPATH"].split(os.pathsep)[0]
+ "/tests/unittests/fixtures/testSample.json"
)
)
ALARM_JSON = json.load(
open(
os.environ["PYTHONPATH"].split(os.pathsep)[0]
+ "/tests/unittests/fixtures/alarmSample.json"
)
)
BATTERY_JSON = json.load(
open(
os.environ["PYTHONPATH"].split(os.pathsep)[0]
+ "/tests/unittests/fixtures/batterySample.json"
)
)
CYCLE_JSON = json.load(
open(
os.environ["PYTHONPATH"].split(os.pathsep)[0]
+ "/tests/unittests/fixtures/cycleSample.json"
)
)
DATA_JSON = json.load(
open(
os.environ["PYTHONPATH"].split(os.pathsep)[0]
+ "/tests/unittests/fixtures/dataSample.json"
)
)
PERSONAL_JSON = json.load(
open(
os.environ["PYTHONPATH"].split(os.pathsep)[0]
+ "/tests/unittests/fixtures/personalSample.json"
)
)
READBACK_JSON = json.load(
open(
os.environ["PYTHONPATH"].split(os.pathsep)[0]
+ "/tests/unittests/fixtures/readbackSample.json"
)
)
TARGETS_JSON = json.load(
open(
os.environ["PYTHONPATH"].split(os.pathsep)[0]
+ "/tests/unittests/fixtures/targetSample.json"
)
)
payloads = [ALARM_JSON, BATTERY_JSON, CYCLE_JSON, DATA_JSON, PERSONAL_JSON, READBACK_JSON, TARGETS_JSON]
@pytest.fixture(scope="session", autouse=True)
def myNativeUI():
if not QApplication.instance():
app = QApplication(sys.argv)
else:
app = QApplication.instance()
myNativeUI = NativeUI()
return myNativeUI
def test_battery_payload_registered_battery_handler(myNativeUI, caplog):
"""
Tests that giving NatviUI.get_updates a battery payload launches the battery handler.
"""
# Set logging level to debug
caplog.set_level(logging.DEBUG)
# give NativeUI a battery payload
myNativeUI.get_updates(BATTERY_JSON)
# Run test
assert "Battery handler" in caplog.text, "The battery payload did not activate the battery handler."
# Clear logs
caplog.clear()
# Test that the other payloads don't trigger battery handler
payload_subset = [payload for payload in payloads]
payload_subset.remove(BATTERY_JSON)
for payload in payload_subset:
myNativeUI.get_updates(payload)
assert "Battery handler" not in caplog.text, ("The battery handler was activated by payload:", payload["type"])
def test_cycle_payload_registered_measurement_handler(myNativeUI, caplog):
"""
Tests that giving NatviUI.get_updates a cycle payload launches the measurement handler.
"""
# Set logging level to debug
caplog.set_level(logging.DEBUG)
# give NativeUI a cycle payload
myNativeUI.get_updates(CYCLE_JSON)
# Run test
assert "Measurement handler" in caplog.text, "The cycle payload did not activate the measurement handler."
# Clear logs
caplog.clear()
# Test that the other payloads don't trigger measurement handler
payload_subset = [payload for payload in payloads]
payload_subset.remove(READBACK_JSON)
payload_subset.remove(CYCLE_JSON)
for payload in payload_subset:
myNativeUI.get_updates(payload)
assert "Measurement handler" not in caplog.text, ("The measurement handler was activated by payload:", payload["type"])
def test_personal_payload_registered_personal_handler(myNativeUI, caplog):
"""
Tests that giving NatviUI.get_updates a personal payload launches the personal handler.
"""
# Set logging level to debug
caplog.set_level(logging.DEBUG)
# give NativeUI a personal payload
myNativeUI.get_updates(PERSONAL_JSON)
# Run test
assert "Personal handler" in caplog.text, "The personal payload did not activate the personal handler."
# Clear logs
caplog.clear()
# Test that the other payloads don't trigger personal handler
payload_subset = [payload for payload in payloads]
payload_subset.remove(PERSONAL_JSON)
for payload in payload_subset:
myNativeUI.get_updates(payload)
assert "Personal handler" not in caplog.text, ("The personal handler was activated by payload:", payload["type"])
def test_data_payload_registered_data_and_alarm_handlers(myNativeUI, caplog):
"""
Tests that giving NatviUI.get_updates a data payload launches the data and alarm handlers.
"""
# Set logging level to debug
caplog.set_level(logging.DEBUG)
# give NativeUI a personal payload
myNativeUI.get_updates(DATA_JSON)
# Run test
assert "Data handler" and "Alarm handler" in caplog.text, "The data payload did not activate the data and alarm handler(s)."
# Clear logs
caplog.clear()
# Test that the other payloads don't trigger battery handler
payload_subset = [payload for payload in payloads]
payload_subset.remove(DATA_JSON)
payload_subset.remove(ALARM_JSON)
for payload in payload_subset:
myNativeUI.get_updates(payload)
assert "Data handler" and "Alarm handler" not in caplog.text, ("The data handler was activated by payload:", payload["type"])
def test_alarm_payload_registered_alarm_handler(myNativeUI, caplog):
"""
TTests that giving NatviUI.get_updates a alarm payload launches the alarm handler.
"""
# Set logging level to debug
caplog.set_level(logging.DEBUG)
# give NativeUI a personal payload
myNativeUI.get_updates(ALARM_JSON)
# Run test
assert "Alarm handler" in caplog.text, "The alarm payload did not activate the alarm handler."
# Clear logs
caplog.clear()
# Test that the other payloads don't trigger battery handler
payload_subset = [payload for payload in payloads]
payload_subset.remove(ALARM_JSON)
payload_subset.remove(DATA_JSON)
for payload in payload_subset:
myNativeUI.get_updates(payload)
assert "Alarm handler" not in caplog.text, ("The alarm handler was activated by payload:", payload["type"])
def test_readback_payload_registered_expert_mode_and_measurement_handlers(
myNativeUI, caplog
):
"""
Tests that giving NatviUI.get_updates a readback payload launches the expert, mode, and measurement handlers.
"""
# Set logging level to debug
caplog.set_level(logging.DEBUG)
# give NativeUI a personal payload
myNativeUI.get_updates(READBACK_JSON)
# Run test
assert "Expert handler" and "Measurement handler" in caplog.text, "The readback payload did not activate the expert, or measurement, or mode handler(s)."
# Clear logs
caplog.clear()
# Test that the other payloads don't trigger battery handler
payload_subset = [payload for payload in payloads]
payload_subset.remove(READBACK_JSON)
payload_subset.remove(DATA_JSON)
payload_subset.remove(TARGETS_JSON)
payload_subset.remove(CYCLE_JSON)
for payload in payload_subset:
myNativeUI.get_updates(payload)
assert "Expert handler" and "Measurement handler" not in caplog.text, ("The expert, or measurement, or mode handler(s) was activated by payload:", payload["type"])
def test_targets_payload_registered_mode_handler(myNativeUI, caplog):
"""
Tests that giving NatviUI.get_updates a targets payload launches the mode handler.
"""
# Set logging level to debug
caplog.set_level(logging.DEBUG)
# give NativeUI a personal payload
myNativeUI.get_updates(TARGETS_JSON)
# Run test
assert "Mode handler" in caplog.text, "The targets payload did not activate the mode handler."
# Clear logs
caplog.clear()
# Test that the other payloads don't trigger battery handler
payload_subset = [payload for payload in payloads]
payload_subset.remove(TARGETS_JSON)
payload_subset.remove(READBACK_JSON)
for payload in payload_subset:
myNativeUI.get_updates(payload)
assert "Mode handler" not in caplog.text, ("The mode handler was activated by payload:", payload["type"])
......@@ -26,7 +26,7 @@ def test_hev_client_state(caplog):
HEVClient(True)
time.sleep(1) # give enough time for het to log
# confirm message 'is the microcontroller running?' is NOT logged using err code
assert "[Errno" not in caplog.text
assert "[Errno 111]" in caplog.text
# TODO assert hevclient.method tests
# myhevclient.send_cmd("CMD", "blah")
......
import pytest
import sys
from PySide2.QtWidgets import QApplication
sys.path.append("/home/pi/hev/NativeUI")
from NativeUI import NativeUI
from widget_library.battery_display_widget import BatteryDisplayWidget
RESOLUTION_NativeUI = [1620, 910]
@pytest.fixture(scope="session", autouse=True)
def nat_widget():
app = QApplication(sys.argv)
nat_widget = NativeUI(RESOLUTION_NativeUI)
return nat_widget
@pytest.fixture(scope="session", autouse=True)
def bat_widget(nat_widget):
return BatteryDisplayWidget(nat_widget)
{
"type": "ALARM",
"ALARM":{
"version": 182,
"timestamp": 816562,
"type": "ALARM",
"alarm_type": "PRIORITY_MEDIUM",
"alarm_code": "HIGH_VTE",
"param": 24868.025390625
},
"alarms": [
{
"version": 182,
"timestamp": 1926874,
"payload_type": "ALARM",
"alarm_type": "PRIORITY_MEDIUM",
"alarm_code": "HIGH_FIO2",
"param": 103.29692840576172
}, {
"version": 182,
"timestamp": 1928269,
"payload_type": "ALARM",
"alarm_type": "PRIORITY_HIGH",
"alarm_code": "LOW_PEEP",
"param": -500.0
}, {
"version": 182,
"timestamp": 1927516,
"payload_type": "ALARM",
"alarm_type": "PRIORITY_MEDIUM",
"alarm_code": "LOW_VTE",
"param": 276.7962951660156
}, {
"version": 182,
"timestamp": 1928607,
"payload_type": "ALARM",
"alarm_type": "PRIORITY_HIGH",
"alarm_code": "LOW_FIO2",
"param": 103.82078552246094
}, {
"version": 182,
"timestamp": 1928153,
"payload_type": "ALARM",
"alarm_type": "PRIORITY_MEDIUM",
"alarm_code": "LOW_VTI",
"param": 268.6437683105469
}, {
"version": 182,
"timestamp": 1927939,
"payload_type": "ALARM",
"alarm_type": "PRIORITY_HIGH",
"alarm_code": "O2_FAIL",
"param": 65.677734375
}
]
}
{
"version": 182,
"timestamp": 0,
"type": "BATTERY",
"bat": 0,
"ok": 1,
"alarm": 0,
"rdy2buf": 0,
"bat85": 0,
"prob_elec": 0,
"dummy": false
}
\ No newline at end of file
"type":"BATTERY",
"BATTERY":{
"version": 182,
"timestamp": 0,
"type": "BATTERY",
"bat": 0,
"ok": 0,
"alarm": 0,
"rdy2buf": 0,
"bat85": 1,
"prob_elec": 0,
"dummy": false
},
"alarms": [
{
"version": 182,
"timestamp": 1926874,
"payload_type": "ALARM",
"alarm_type": "PRIORITY_MEDIUM",
"alarm_code": "HIGH_FIO2",
"param": 103.29692840576172
}, {
"version": 182,
"timestamp": 1928269,
"payload_type": "ALARM",
"alarm_type": "PRIORITY_HIGH",
"alarm_code": "LOW_PEEP",
"param": -500.0
}, {
"version": 182,
"timestamp": 1927516,
"payload_type": "ALARM",
"alarm_type": "PRIORITY_MEDIUM",
"alarm_code": "LOW_VTE",
"param": 276.7962951660156
}, {
"version": 182,
"timestamp": 1928607,
"payload_type": "ALARM",
"alarm_type": "PRIORITY_HIGH",
"alarm_code": "LOW_FIO2",
"param": 103.82078552246094
}, {
"version": 182,
"timestamp": 1928153,
"payload_type": "ALARM",
"alarm_type": "PRIORITY_MEDIUM",
"alarm_code": "LOW_VTI",
"param": 268.6437683105469
}, {
"version": 182,
"timestamp": 1927939,
"payload_type": "ALARM",
"alarm_type": "PRIORITY_HIGH",
"alarm_code": "O2_FAIL",
"param": 65.677734375
}
]
}
{
"label":[
"1: Mains power, no fault",
"2: battery power, 0% charge, no fault",
"3: battery power, 85% charge, no fault",
"4: mains power, fault",
"5: battery power, fault",
"6: conflicting status (0,0), no fault",
"7: conflicting status (1,1), no fault",
"8: conflicting status (1,1), fault"
],
"payload":[
{
"type": "BATTERY",
"BATTERY":{
"version": 182,
"timestamp": 0,
"type": "BATTERY",
"bat": 0,
"ok": 1,
"alarm": 0,
"rdy2buf": 0,
"bat85": 0,
"prob_elec": 0,
"dummy": false
}
},
{
"type": "BATTERY",
"BATTERY":{
"version": 182,
"timestamp": 0,
"type": "BATTERY",
"bat": 1,
"ok": 0,
"alarm": 0,
"rdy2buf": 0,
"bat85": 0,
"prob_elec": 0,
"dummy": false
}
},
{
"type": "BATTERY",
"BATTERY":{
"version": 182,
"timestamp": 0,
"type": "BATTERY",
"bat": 1,
"ok": 0,
"alarm": 0,
"rdy2buf": 0,
"bat85": 1,
"prob_elec": 0,
"dummy": false
}
},
{
"type": "BATTERY",
"BATTERY":{
"version": 182,
"timestamp": 0,
"type": "BATTERY",
"bat": 0,
"ok": 1,
"alarm": 0,
"rdy2buf": 0,
"bat85": 0,
"prob_elec": 1,
"dummy": false
}
},
{
"type": "BATTERY",
"BATTERY":{
"version": 182,
"timestamp": 0,
"type": "BATTERY",
"bat": 1,
"ok": 0,
"alarm": 0,
"rdy2buf": 0,
"bat85": 0,
"prob_elec": 1,
"dummy": false
}
},
{
"type": "BATTERY",
"BATTERY":{
"version": 182,
"timestamp": 0,
"type": "BATTERY",
"bat": 0,
"ok": 0,
"alarm": 0,
"rdy2buf": 0,
"bat85": 0,
"prob_elec": 0,
"dummy": false
}
},
{
"type": "BATTERY",
"BATTERY":{
"version": 182,
"timestamp": 0,
"type": "BATTERY",
"bat": 1,
"ok": 1,
"alarm": 0,
"rdy2buf": 0,
"bat85": 0,
"prob_elec": 0,
"dummy": false
}
},
{
"type": "BATTERY",
"BATTERY":{
"version": 182,
"timestamp": 0,
"type": "BATTERY",
"bat": 1,
"ok": 1,
"alarm": 0,
"rdy2buf": 0,
"bat85": 0,
"prob_elec": 1,
"dummy": false
}
}
],
"status":[
{
"on_mains_power": true,
"on_battery_power": false,
"battery_percent": 0.0,
"electrical_problem": null
},
{
"on_mains_power": false,
"on_battery_power": true,
"battery_percent": 0.0,
"electrical_problem": null
},
{
"on_mains_power": false,
"on_battery_power": true,
"battery_percent": 85.0,
"electrical_problem": null
},
{
"on_mains_power": true,
"on_battery_power": false,
"battery_percent": 0.0,
"electrical_problem": "ERROR ELEC."
},
{
"on_mains_power": false,
"on_battery_power": true,
"battery_percent": 0.0,
"electrical_problem": "ERROR ELEC."
},
{
"on_mains_power": false,
"on_battery_power": false,
"battery_percent": 0.0,
"electrical_problem": "ERROR ELEC."
},
{
"on_mains_power": false,
"on_battery_power": false,
"battery_percent": 0.0,
"electrical_problem": "ERROR ELEC."
},
{
"on_mains_power": false,
"on_battery_power": false,
"battery_percent": 0.0,
"electrical_problem": "ERROR ELEC."
}
]
}
{
"on_mains_power": true,
"on_battery_power": false,
"battery_percent": 0.0,
"electrical_problem": null
}
\ No newline at end of file
{"respiratory_rate": 0, "tidal_volume": 0, "exhaled_minute_volume": 0, "inhaled_minute_volume": 0, "minute_volume": 0, "exhaled_tidal_volume": 0, "inhaled_tidal_volume": 0, "lung_compliance": 0, "static_compliance": 0, "inhalation_pressure": 0, "peak_inspiratory_pressure": 0, "plateau_pressure": 0, "mean_airway_pressure": 0, "fiO2_percent": 0, "apnea_index": 0, "apnea_time": 0, "mandatory_breath": 0}
\ No newline at end of file
{
"type": "CYCLE",
"CYCLE": {
"version": 182,
"timestamp": 1928010,
"payload_type": "CYCLE",
"respiratory_rate": 15.0,
"tidal_volume": 545.4400024414062,
"exhaled_tidal_volume": 276.7962951660156,
"inhaled_tidal_volume": 268.6437683105469,
"minute_volume": 8.181600570678711,
"exhaled_minute_volume": 4.151944160461426,
"inhaled_minute_volume": 4.029656410217285,
"lung_compliance": 25.938955307006836,
"static_compliance": 49.488800048828125,
"inhalation_pressure": 14.09315299987793,
"peak_inspiratory_pressure": 29.47732925415039,
"plateau_pressure": 19.47098159790039,
"mean_airway_pressure": 14.09315299987793,
"fiO2_percent": 103.29692840576172,
"apnea_index": 0,
"apnea_time": 12010,
"mandatory_breath": 1
},
"alarms": [
{
"version": 182,
"timestamp": 1926874,
"payload_type": "ALARM",
"alarm_type": "PRIORITY_MEDIUM",
"alarm_code": "HIGH_FIO2",
"param": 103.29692840576172
}, {
"version": 182,
"timestamp": 1928269,
"payload_type": "ALARM",
"alarm_type": "PRIORITY_HIGH",
"alarm_code": "LOW_PEEP",
"param": -500.0
}, {
"version": 182,
"timestamp": 1927516,
"payload_type": "ALARM",
"alarm_type": "PRIORITY_MEDIUM",
"alarm_code": "LOW_VTE",
"param": 276.7962951660156
}, {
"version": 182,
"timestamp": 1928607,
"payload_type": "ALARM",
"alarm_type": "PRIORITY_HIGH",
"alarm_code": "LOW_FIO2",
"param": 103.82078552246094
}, {
"version": 182,
"timestamp": 1928153,
"payload_type": "ALARM",
"alarm_type": "PRIORITY_MEDIUM",
"alarm_code": "LOW_VTI",
"param": 268.6437683105469
}, {
"version": 182,
"timestamp": 1927939,
"payload_type": "ALARM",
"alarm_type": "PRIORITY_HIGH",
"alarm_code": "O2_FAIL",
"param": 65.677734375
}
]
}
{
"type": "DATA",
"DATA": {
"version": 182,
"timestamp": 816103,
"payload_type": "DATA",
"pressure_patient": 16.122142791748047,
"flow": 34.1150016784668,
"volume": 46869.6953125,
"target_pressure": 17.0,
"fsm_state": "INHALE",
"pressure_air_supply": 13,
"pressure_air_regulated": 365.1913757324219,
"pressure_o2_supply": 45,
"pressure_o2_regulated": 0.0322265625,
"pressure_buffer": 239.28102111816406,
"pressure_inhale": 17.676271438598633,
"temperature_buffer": 659,
"pressure_diff_patient": 0.5741281509399414,
"ambient_pressure": 0,
"ambient_temperature": 0,
"airway_pressure": 7.061350345611572,
"flow_calc": 0.9805641174316406,
"process_pressure": 17.676271438598633,
"valve_duty_cycle": 0.5721527934074402,
"proportional": -0.0013525428948923945,
"integral": 0.04374659061431885,
"derivative": -0.12062221765518188
},
"alarms": [
{
"version": 182,
"timestamp": 1926874,
"payload_type": "ALARM",
"alarm_type": "PRIORITY_MEDIUM",
"alarm_code": "HIGH_FIO2",
"param": 103.29692840576172
}, {
"version": 182,
"timestamp": 1928269,
"payload_type": "ALARM",
"alarm_type": "PRIORITY_HIGH",
"alarm_code": "LOW_PEEP",
"param": -500.0
}, {
"version": 182,
"timestamp": 1927516,
"payload_type": "ALARM",
"alarm_type": "PRIORITY_MEDIUM",
"alarm_code": "LOW_VTE",
"param": 276.7962951660156
}, {
"version": 182,
"timestamp": 1928607,
"payload_type": "ALARM",
"alarm_type": "PRIORITY_HIGH",
"alarm_code": "LOW_FIO2",
"param": 103.82078552246094
}, {
"version": 182,
"timestamp": 1928153,
"payload_type": "ALARM",
"alarm_type": "PRIORITY_MEDIUM",
"alarm_code": "LOW_VTI",
"param": 268.6437683105469
}, {
"version": 182,
"timestamp": 1927939,
"payload_type": "ALARM",
"alarm_type": "PRIORITY_HIGH",
"alarm_code": "O2_FAIL",
"param": 65.677734375
}
]
}
{"name": "Justin Atest", "patient_id": "15243", "age": "22", "sex": "Testgender", "height": 1.77, "weight": 80}
\ No newline at end of file
{
"type": "PERSONAL",
"PERSONAL":{
"name": "Justin Atest",
"patient_id": "15243",
"age": "22",
"sex": "Testgender",
"height": 1.77,
"weight": 80
},
"alarms": [
{
"version": 182,
"timestamp": 1926874,
"payload_type": "ALARM",
"alarm_type": "PRIORITY_MEDIUM",
"alarm_code": "HIGH_FIO2",
"param": 103.29692840576172
}, {
"version": 182,
"timestamp": 1928269,
"payload_type": "ALARM",
"alarm_type": "PRIORITY_HIGH",
"alarm_code": "LOW_PEEP",
"param": -500.0
}, {
"version": 182,
"timestamp": 1927516,
"payload_type": "ALARM",
"alarm_type": "PRIORITY_MEDIUM",
"alarm_code": "LOW_VTE",
"param": 276.7962951660156
}, {
"version": 182,
"timestamp": 1928607,
"payload_type": "ALARM",
"alarm_type": "PRIORITY_HIGH",
"alarm_code": "LOW_FIO2",
"param": 103.82078552246094
}, {
"version": 182,
"timestamp": 1928153,
"payload_type": "ALARM",
"alarm_type": "PRIORITY_MEDIUM",
"alarm_code": "LOW_VTI",
"param": 268.6437683105469
}, {
"version": 182,
"timestamp": 1927939,
"payload_type": "ALARM",
"alarm_type": "PRIORITY_HIGH",
"alarm_code": "O2_FAIL",
"param": 65.677734375
}
]
}
{
"type": "READBACK",
"READBACK":{
"version": 182,
"timestamp": 815263,
"type": "READBACK",
"duration_pre_calibration": 6000,
"duration_calibration": 4000,
"duration_buff_purge": 600,
"duration_buff_flush": 600,
"duration_buff_prefill": 100,
"duration_buff_fill": 600,
"duration_buff_pre_inhale": 0,
"duration_inhale": 1000,
"duration_pause": 10,
"duration_exhale": 2990,
"valve_air_in": 0.0,
"valve_o2_in": 0.0,
"valve_inhale": 0,
"valve_exhale": 1,
"valve_purge": 0,
"ventilation_mode": "PC_AC",
"valve_inhale_percent": 0,
"valve_exhale_percent": 0,
"valve_air_in_enable": 1,
"valve_o2_in_enable": 1,
"valve_purge_enable": 1,
"inhale_trigger_enable": 1,
"exhale_trigger_enable": 0,
"peep": 3.167065143585205,
"inhale_exhale_ratio": 0.33779263496398926,
"kp": 0.0010000000474974513,
"ki": 0.0005000000237487257,
"kd": 0.0010000000474974513,
"pid_gain": 2.0,
"max_patient_pressure": 45
},
"alarms": [
{
"version": 182,
"timestamp": 1926874,
"payload_type": "ALARM",
"alarm_type": "PRIORITY_MEDIUM",
"alarm_code": "HIGH_FIO2",
"param": 103.29692840576172
}, {
"version": 182,
"timestamp": 1928269,
"payload_type": "ALARM",
"alarm_type": "PRIORITY_HIGH",
"alarm_code": "LOW_PEEP",
"param": -500.0
}, {
"version": 182,
"timestamp": 1927516,
"payload_type": "ALARM",
"alarm_type": "PRIORITY_MEDIUM",
"alarm_code": "LOW_VTE",
"param": 276.7962951660156
}, {
"version": 182,
"timestamp": 1928607,
"payload_type": "ALARM",
"alarm_type": "PRIORITY_HIGH",
"alarm_code": "LOW_FIO2",
"param": 103.82078552246094
}, {
"version": 182,
"timestamp": 1928153,
"payload_type": "ALARM",
"alarm_type": "PRIORITY_MEDIUM",
"alarm_code": "LOW_VTI",
"param": 268.6437683105469
}, {
"version": 182,
"timestamp": 1927939,
"payload_type": "ALARM",
"alarm_type": "PRIORITY_HIGH",
"alarm_code": "O2_FAIL",
"param": 65.677734375
}
]
}
{
"type": "TARGET",
"TARGET":{
"version": 182,
"timestamp": 544077,
"type": "TARGET",
"mode": "CURRENT",
"inspiratory_pressure": 17.0,
"ie_ratio": 0.33779263496398926,
"volume": 400.0,
"respiratory_rate": 15.0,
"peep": 5.0,
"fiO2_percent": 21.0,
"inhale_time": 1.0,
"inhale_trigger_enable": 1,
"exhale_trigger_enable": 0,
"volume_trigger_enable": 0,
"inhale_trigger_threshold": 5.0,
"exhale_trigger_threshold": 25.0,
"buffer_upper_pressure": 300.0,
"buffer_lower_pressure": 285.0
}
}
......@@ -10,158 +10,24 @@ import sys
import hevclient
import numpy as np
import pytest
import sys
import json
from PySide2.QtWidgets import QApplication
import numpy as np
from NativeUI import NativeUI
sys.path.append("../..")
hevclient.mmFileName = (
"/home/pi/hev/NativeUI/tests/integration/fixtures/HEVClient_lastData.mmap"
)
@pytest.fixture(scope="session", autouse=True)
def widget():
app = QApplication(sys.argv)
return NativeUI()
# Test default values of databases(no set method involved)
# Superseeded by handlers.
# def test_must_return_if_raises_attribute_error_when_false_db_item_is_got_from_get_db(
# widget,
# ):
# with pytest.raises(AttributeError):
# widget.get_db("__false_db_item")
# def test_must_return_correct_db_item_from_get_db_data(widget):
# assert widget.get_db("__data") == {} and widget.get_db("data") == {}
# def test_must_return_correct_db_item_from_get_db_readback(widget):
# assert widget.get_db("__readback") == {} and widget.get_db("readback") == {}
# def test_must_return_correct_db_item_from_get_db_cycle(widget):
# assert widget.get_db("__cycle") == {} and widget.get_db("cycle") == {}
# def test_must_return_correct_db_item_from_get_db_battery(widget):
# assert widget.get_db("__battery") == {} and widget.get_db("battery") == {}
# def test_must_return_correct_db_item_from_get_db_plots(widget):
# plot_history_length = 1000
# plot_dict = {
# "data": np.zeros((plot_history_length, 5)),
# "timestamp": list(el * (-1) for el in range(plot_history_length))[::-1],
# "pressure": list(0 for _ in range(plot_history_length)),
# "flow": list(0 for _ in range(plot_history_length)),
# "volume": list(0 for _ in range(plot_history_length)),
# "pressure_axis_range": [0, 20],
# "flow_axis_range": [-40, 80],
# "volume_axis_range": [0, 80],
# }
# assert (
# widget.get_db("__plots").keys() == plot_dict.keys()
# and widget.get_db("plots").keys() == plot_dict.keys()
# )
# def test_must_return_correct_db_item_from_get_db_alarms(widget):
# assert widget.get_db("__alarms") == {} and widget.get_db("alarms") == {}
# def test_must_return_correct_db_item_from_get_db_targets(widget):
# assert widget.get_db("__targets") == {} and widget.get_db("targets") == {}
# def test_must_return_correct_db_item_from_get_db_personal(widget):
# assert widget.get_db("__personal") == {} and widget.get_db("personal") == {}
# # Test set methods with sample payloads
# def test_must_return_0_for_set_data_db(widget):
# with open("/home/pi/hev/samples/dataSample.json", "r") as f:
# data_payload = json.load(f)
# assert widget.__set_db("data", data_payload) == 0
# def test_must_return_0_for_set_targets_db(widget):
# with open("/home/pi/hev/samples/targetSample.json", "r") as g:
# target_payload = json.load(g)
# assert widget.__set_db("targets", target_payload) == 0
# def test_must_return_0_for_set_readback_db(widget):
# with open("/home/pi/hev/samples/readbackSample.json", "r") as f:
# readback_payload = json.load(f)
# assert widget.__set_db("readback", readback_payload) == 0
# def test_must_return_0_for_set_cycle_db(widget):
# with open(
# "/home/pi/hev/NativeUI/tests/unittests/fixtures/cycleSample.json", "r"
# ) as f:
# cycle_payload = json.load(f)
# assert widget.__set_db("cycle", cycle_payload) == 0
# def test_must_return_0_for_set_battery_db(widget):
# with open("/home/pi/hev/samples/batterySample.json", "r") as f:
# battery_payload = json.load(f)
# assert widget.__set_db("battery", battery_payload) == 0
# def test_must_return_0_for_set_plots_db(widget):
# with open("/home/pi/hev/samples/dataSample.json", "r") as f:
# data_payload = json.load(f)
# assert widget.__set_plots_db(data_payload) == 0
# def test_must_return_error_if_not_data_is_sent_as_payload_for_set_plots_db(widget):
# with open("/home/pi/hev/samples/batterySample.json", "r") as f:
# battery_payload = json.load(f)
# with pytest.raises(KeyError):
# widget.__set_plots_db(battery_payload)
# def test_must_return_0_when__update_plot_ranges_correctly(widget):
# assert widget.__update_plot_ranges() == 0
# def test_must_return_0_for_set_alarms_db(widget):
# with open("/home/pi/hev/samples/alarmSample.json", "r") as f:
# alarm_payload = json.load(f)
# assert widget.__set_db("alarms", alarm_payload) == 0
# def test_must_return_0_for_set_personal_db(widget):
# with open(
# "/home/pi/hev/NativeUI/tests/unittests/fixtures/personalSample.json", "r"
# ) as f:
# personal_payload = json.load(f)
# assert widget.__set_db("personal", personal_payload) == 0
# Asyncio can handle event loops, but we need to add more interaction i think
# @pytest.mark.asyncio
# async def test_start_client(widget):
# with pytest.raises(RuntimeError):
# widget.start_client()
mmFileName = "/home/pi/hev/NativeUI/tests/integration/fixtures/HEVClient_lastData.mmap"
def test_get_updates_data_payload(widget):
def test_get_updates_data_payload(nat_widget):
"""
Currently fails because the dataSample.json is only part of the data payload.
"""
with open("/home/pi/hev/samples/dataSample.json", "r") as f:
data_payload = json.load(f)
widget.get_updates(data_payload)
nat_widget.get_updates(data_payload)
def test_get_updates_wrong_payload(widget):
def test_get_updates_wrong_payload(nat_widget):
fake_payload = {
"types": "Fake",
"pressure": 1200000.0,
......@@ -169,31 +35,31 @@ def test_get_updates_wrong_payload(widget):
"volume": 1.0,
}
with pytest.raises(KeyError):
widget.get_updates(fake_payload)
nat_widget.get_updates(fake_payload)
def test_must_return_0_when_q_send_cmd(widget):
assert widget.q_send_cmd(str, str) == 0
def test_must_return_0_when_q_send_cmd(nat_widget):
assert nat_widget.q_send_cmd(str, str, float) == 0
def test_must_return_0_when_q_ack_alarm_when_out_conection(widget):
def test_must_return_0_when_q_ack_alarm_when_out_conection(nat_widget):
with pytest.raises(ConnectionError):
widget.q_ack_alarm(str)
nat_widget.q_ack_alarm(str)
def test_must_return_0_when_q_send_personal_when_out_conection(widget):
def test_must_return_0_when_q_send_personal_when_out_conection(nat_widget):
with pytest.raises(ConnectionError):
widget.q_send_personal(str)
nat_widget.q_send_personal(str)
def test_must_return_0_when__find_icons_png_directory(widget):
assert widget.__find_icons("png") == "/home/pi/hev/hev-display/assets/png"
def test_must_return_0_when__find_icons_png_directory(nat_widget):
assert nat_widget.__find_icons("png") == "/home/pi/hev/hev-display/assets/png"
def test_must_return_0_when__find_icons_svg_directory(widget):
assert widget.__find_icons("svg") == "/home/pi/hev/hev-display/assets/svg"
def test_must_return_0_when__find_icons_svg_directory(nat_widget):
assert nat_widget.__find_icons("svg") == "/home/pi/hev/hev-display/assets/svg"
def test_must_return_0_when_cannot__find_icons_directory(widget):
def test_must_return_0_when_cannot__find_icons_directory(nat_widget):
with pytest.raises(FileNotFoundError):
widget.__find_icons("images")
nat_widget.__find_icons("images")
This diff is collapsed.
......@@ -411,13 +411,16 @@ class Layout:
# Create the system info tab
sysinfo_widgets = [
self.widgets.version_display_widget,
self.widgets.maintenance_time_display_widget,
self.widgets.maintenance,
[self.widgets.maintenance, self.widgets.maintenance_time_display_widget],
self.widgets.update_time_display_widget,
]
tab_info = self.layout_tab_info(sysinfo_widgets)
for widget in sysinfo_widgets:
widget.setFont(self.NativeUI.text_font)
for entry in sysinfo_widgets:
if isinstance(entry, list):
for widget in entry:
widget.setFont(self.NativeUI.text_font)
else:
entry.setFont(self.NativeUI.text_font)
# Create the stack
page_settings = SwitchableStackWidget(
......@@ -527,8 +530,13 @@ class Layout:
"""
tab_info = LocalisedQWidget()
tab_info_layout = QtWidgets.QVBoxLayout(tab_info)
for widget in widgets:
tab_info_layout.addWidget(widget)
for entry in widgets:
if isinstance(entry, list):
for widget in entry:
tab_info_layout.addWidget(widget)
else:
tab_info_layout.addWidget(entry)
tab_info_layout.addStretch()
tab_info.setLayout(tab_info_layout)
return tab_info
......@@ -921,7 +929,7 @@ class Layout:
hlayout.addWidget(self.NativeUI.widgets.get_widget(attrName + "_lim"))
# items = (hlayout.itemAt(i) for i in range(hlayout.count()))
# for w in items:
# print(w)
# logging.debug(w)
# w.setAlignment(QtCore.Qt.AlignLeft)
hlayout.setAlignment(QtCore.Qt.AlignCenter)
vlayout.addLayout(hlayout)
......
......@@ -27,6 +27,7 @@ from global_widgets.global_spinbox import labelledSpin
from global_widgets.global_send_popup import SetConfirmPopup
from widget_library.localisation_button_widget import LocalisationButtonWidget
from widget_library.startup_handler import StartupHandler
from handler_library.maintenance_handler import MaintenanceHandler
from widget_library.startup_calibration_widget import calibrationWidget
from widget_library.ok_cancel_buttons_widget import (
OkButtonWidget,
......@@ -83,25 +84,31 @@ class Widgets:
# Start up procedure
self.startup_confirm_popup = SetConfirmPopup(NativeUI)
self.maintenance_confirm_popup = SetConfirmPopup(NativeUI)
self.startup_handler = StartupHandler(NativeUI, self.startup_confirm_popup)
self.maintenance_handler = MaintenanceHandler(
NativeUI, self.maintenance_confirm_popup
)
self.localisation_startup_button = LocalisedQPushButton(
NativeUI, "language_name"
)
self.widget_list = self.widget_list + [
self.startup_confirm_popup,
self.maintenance_confirm_popup,
self.startup_handler,
self.maintenance_handler,
self.localisation_startup_button,
]
if not os.path.isfile("NativeUI/configs/startup_config.json"):
logging.warning(
"startup_config not found. If this is the first time using this ventilator you can safely ignore this warning."
"startup_config not found. If this is the first time using this "
"ventilator you can safely ignore this warning."
)
shutil.copyfile(
"NativeUI/configs/startup_config_default.json",
"NativeUI/configs/startup_config.json",
)
with open("NativeUI/configs/startup_config.json") as json_file:
startupDict = json.load(json_file)
for key, procedureDict in startupDict.items():
......@@ -111,6 +118,24 @@ class Widgets:
self.startup_handler,
)
if not os.path.isfile("NativeUI/configs/maintenance_config.json"):
logging.warning(
"maintenance_config not found. If this is the first time using this "
"ventilator you can safely ignore this warning."
)
shutil.copyfile(
"NativeUI/configs/maintenance_config_default.json",
"NativeUI/configs/maintenance_config.json",
)
with open("NativeUI/configs/maintenance_config.json") as json_file:
maintenanceDict = json.load(json_file)
for key, procedureDict in maintenanceDict.items():
self.add_handled_widget(
calibrationWidget(NativeUI, key, procedureDict),
key,
self.maintenance_handler,
)
for mode in NativeUI.modeList:
self.add_handled_widget(
LocalisedQRadioButton(mode),
......
......@@ -154,7 +154,7 @@ class AlarmControlWidget(QtWidgets.QWidget):
try:
self.NativeUI.q_send_cmd("SET_VOLUME", self.volumeCodes[index])
except:
print('didnt work')
logging.debug('didnt work')
def set_size(self, x: int, y: int, spacing=10) -> int:
self.setFixedSize(x, y)
......
......@@ -13,7 +13,7 @@ class ExpertHandler(PayloadHandler): # chose QWidget over QDialog family becaus
OpenPopup = QtCore.Signal(PayloadHandler,list)
def __init__(self, NativeUI, *args, **kwargs):
super().__init__(['READBACK'],*args, **kwargs)
super().__init__(['READBACK'],*args, name="Expert handler", **kwargs)
self.NativeUI = NativeUI
self.spinDict = {}
self.buttonDict = {}
......
......@@ -146,6 +146,11 @@ class MaintenanceTimeDisplayWidget(QtWidgets.QLabel):
self.setStyleSheet("color:%s" % self.__normal_color)
return 0
@QtCore.Slot(dict)
def update_time(self, calib_dict):
last_maintenance = calib_dict["maintenance"]["last_performed"]
logging.debug(last_maintenance)
@QtCore.Slot(str, str, bool)
def set_time_values(
self,
......
......@@ -45,7 +45,7 @@ class SignallingLineEditWidget(QtWidgets.QLineEdit):
class LabelledLineEditWidget(QtWidgets.QWidget):
def __init__(self, NativeUI, popup, infoArray, *args, **kwargs):
super().__init__(*args, **kwargs)
# print(infoArray)
# logging.debug(infoArray)
self.NativeUI = NativeUI
self.cmd_type, self.cmd_code = "", ""
self.min, self.max, self.step = 0, 10000, 0.3
......@@ -145,7 +145,7 @@ class LabelledLineEditWidget(QtWidgets.QWidget):
Set the size of the widget.
Also rescale the elements within it to equally distribute the width
"""
print("line_edit", self.label)
logging.debug("line_edit", self.label)
if x is not None:
self.setFixedWidth(x)
......
......@@ -47,7 +47,7 @@ class styledButton(QtWidgets.QPushButton):
# self.setFixedSize(QtCore.QSize(150, 50))
def setColour(self, option):
# print('setting colour again again')
# logging.debug('setting colour again again')
self.setEnabled(bool(float(option)))
self.setProperty("bgColour", str(option))
self.style().polish(self)
......
......@@ -72,7 +72,7 @@ class PersonalDisplayWidget(QtWidgets.QWidget):
Update the display information.
"""
outtxt = "{name}, {height}m".format(**new_info)
self.info_label.set_text(outtxt)
self.info_label.setText(outtxt)
return 0
def localise_text(self, text: dict) -> int:
......
......@@ -39,8 +39,8 @@ class SpinButton(QtWidgets.QFrame):
self.label_text, self.units, self.tag, self.cmd_type, self.cmd_code, self.min, self.max, self.initVal, self.step, self.decPlaces = (
infoArray
)
# print('before')
# print(self.cmd_type)
# logging.debug('before')
# logging.debug(self.cmd_type)
self.cmd_type = self.cmd_type.replace("SET_TARGET_", "SET_TARGET_CURRENT")
# self.cmd_type = settings[3]
# self.cmd_code = settings[4]
......
......@@ -82,7 +82,7 @@ class StartupHandler(
elif 'Leak_Test' in calibrationWidget.infoDict['label']:
self.NativeUI.q_send_cmd()
else:
logging.debug('ERROR: label does not mach any calibration procedure')
logging.debug("ERROR: label does not mach any calibration procedure")
# self.NativeUI.display_stack.setCurrentWidget(self.NativeUI.startupWidget)
calibrationWidget.progBar.setValue(100)
......@@ -95,6 +95,7 @@ class StartupHandler(
)
with open("NativeUI/configs/startup_config.json", "w") as json_file:
json.dump(startupDict, json_file)
logging.debug("Writing to %s", json_file)
self.calibs_done_dict[calibrationWidget.key] = True
if self.all_calibs_done():
......@@ -110,9 +111,7 @@ class StartupHandler(
comparing the self.calibs_done_dict to the self.calibDict.
"""
for key in self.calibDict:
if "maintenance" in key:
pass
elif key not in self.calibs_done_dict:
if key not in self.calibs_done_dict:
return False
return True
......
......@@ -90,10 +90,10 @@
virtualenv_python: python3.7
- name: copy PySide2 install into virtualenv
copy:
synchronize:
src: "/usr/lib/python3/dist-packages/PySide2"
dest: "{{ playbook_dir }}/../../.hev_env/lib/python3.7/site-packages/"
remote_src: yes
#remote_src: yes
become: yes
# - name: copy libscrc script
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment