Initial commit

parents
### Python ###
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
pytestdebug.log
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
doc/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
pythonenv*
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# profiling data
.prof
# Logs folder
logs/
# VSCode config files
.vscode/
José Sánchez Segovia <josesanchez.oncort@gmail.com>
Ismael Martel Bravo <ismael.martel@gmail.com>
Carlos García Ramos <carlos.garcia@iesppg.net>
Ladislao Martínez García <ladislao.m.garcia@gmail.com>
Elio Valenzuela Segura <elio@ugr.es>
Carlos Megías Núñez <narg@ugr.es>
Abel Miguel Cano Delgado <abelcano@ugr.es>
Jaime Lozano Tortosa <jaimelozano@ugr.es>
Víctor Vázquez Rodríguez <victorvazrod@ugr.es>
This diff is collapsed.
GNU LESSER GENERAL PUBLIC LICENSE
Version 3, 29 June 2007
Copyright (C) 2007 Free Software Foundation, Inc. <https://fsf.org/>
Everyone is permitted to copy and distribute verbatim copies
of this license document, but changing it is not allowed.
This version of the GNU Lesser General Public License incorporates
the terms and conditions of version 3 of the GNU General Public
License, supplemented by the additional permissions listed below.
0. Additional Definitions.
As used herein, "this License" refers to version 3 of the GNU Lesser
General Public License, and the "GNU GPL" refers to version 3 of the GNU
General Public License.
"The Library" refers to a covered work governed by this License,
other than an Application or a Combined Work as defined below.
An "Application" is any work that makes use of an interface provided
by the Library, but which is not otherwise based on the Library.
Defining a subclass of a class defined by the Library is deemed a mode
of using an interface provided by the Library.
A "Combined Work" is a work produced by combining or linking an
Application with the Library. The particular version of the Library
with which the Combined Work was made is also called the "Linked
Version".
The "Minimal Corresponding Source" for a Combined Work means the
Corresponding Source for the Combined Work, excluding any source code
for portions of the Combined Work that, considered in isolation, are
based on the Application, and not on the Linked Version.
The "Corresponding Application Code" for a Combined Work means the
object code and/or source code for the Application, including any data
and utility programs needed for reproducing the Combined Work from the
Application, but excluding the System Libraries of the Combined Work.
1. Exception to Section 3 of the GNU GPL.
You may convey a covered work under sections 3 and 4 of this License
without being bound by section 3 of the GNU GPL.
2. Conveying Modified Versions.
If you modify a copy of the Library, and, in your modifications, a
facility refers to a function or data to be supplied by an Application
that uses the facility (other than as an argument passed when the
facility is invoked), then you may convey a copy of the modified
version:
a) under this License, provided that you make a good faith effort to
ensure that, in the event an Application does not supply the
function or data, the facility still operates, and performs
whatever part of its purpose remains meaningful, or
b) under the GNU GPL, with none of the additional permissions of
this License applicable to that copy.
3. Object Code Incorporating Material from Library Header Files.
The object code form of an Application may incorporate material from
a header file that is part of the Library. You may convey such object
code under terms of your choice, provided that, if the incorporated
material is not limited to numerical parameters, data structure
layouts and accessors, or small macros, inline functions and templates
(ten or fewer lines in length), you do both of the following:
a) Give prominent notice with each copy of the object code that the
Library is used in it and that the Library and its use are
covered by this License.
b) Accompany the object code with a copy of the GNU GPL and this license
document.
4. Combined Works.
You may convey a Combined Work under terms of your choice that,
taken together, effectively do not restrict modification of the
portions of the Library contained in the Combined Work and reverse
engineering for debugging such modifications, if you also do each of
the following:
a) Give prominent notice with each copy of the Combined Work that
the Library is used in it and that the Library and its use are
covered by this License.
b) Accompany the Combined Work with a copy of the GNU GPL and this license
document.
c) For a Combined Work that displays copyright notices during
execution, include the copyright notice for the Library among
these notices, as well as a reference directing the user to the
copies of the GNU GPL and this license document.
d) Do one of the following:
0) Convey the Minimal Corresponding Source under the terms of this
License, and the Corresponding Application Code in a form
suitable for, and under terms that permit, the user to
recombine or relink the Application with a modified version of
the Linked Version to produce a modified Combined Work, in the
manner specified by section 6 of the GNU GPL for conveying
Corresponding Source.
1) Use a suitable shared library mechanism for linking with the
Library. A suitable mechanism is one that (a) uses at run time
a copy of the Library already present on the user's computer
system, and (b) will operate properly with a modified version
of the Library that is interface-compatible with the Linked
Version.
e) Provide Installation Information, but only if you would otherwise
be required to provide such information under section 6 of the
GNU GPL, and only to the extent that such information is
necessary to install and execute a modified version of the
Combined Work produced by recombining or relinking the
Application with a modified version of the Linked Version. (If
you use option 4d0, the Installation Information must accompany
the Minimal Corresponding Source and Corresponding Application
Code. If you use option 4d1, you must provide the Installation
Information in the manner specified by section 6 of the GNU GPL
for conveying Corresponding Source.)
5. Combined Libraries.
You may place library facilities that are a work based on the
Library side by side in a single library together with other library
facilities that are not Applications and are not covered by this
License, and convey such a combined library under terms of your
choice, if you do both of the following:
a) Accompany the combined library with a copy of the same work based
on the Library, uncombined with any other library facilities,
conveyed under the terms of this License.
b) Give prominent notice with the combined library that part of it
is a work based on the Library, and explaining where to find the
accompanying uncombined form of the same work.
6. Revised Versions of the GNU Lesser General Public License.
The Free Software Foundation may publish revised and/or new versions
of the GNU Lesser General Public License from time to time. Such new
versions will be similar in spirit to the present version, but may
differ in detail to address new problems or concerns.
Each version is given a distinguishing version number. If the
Library as you received it specifies that a certain numbered version
of the GNU Lesser General Public License "or any later version"
applies to it, you have the option of following the terms and
conditions either of that published version or of any later version
published by the Free Software Foundation. If the Library as you
received it does not specify a version number of the GNU Lesser
General Public License, you may choose any version of the GNU Lesser
General Public License ever published by the Free Software Foundation.
If the Library as you received it specifies that a proxy can decide
whether future versions of the GNU Lesser General Public License shall
apply, that proxy's public statement of acceptance of any version is
permanent authorization for you to choose that version for the
Library.
# respiros-system
\ No newline at end of file
import logging
import os
from flask import Flask
from api.config import Config, DevelopmentConfig
from api.ipc import ipc
from api.sockets import socketio
def create_app() -> Flask:
"""Create the app and initialize all its extensions.
The appropiate config is loaded depending on the selected environment.
Returns:
Flask: The app.
"""
app = Flask(__name__)
if os.environ.get("ENV") == "development":
app.config.from_object(DevelopmentConfig)
else:
app.config.from_object(Config)
if app.config["DEBUG"]:
logging_level = logging.INFO
else:
logging_level = logging.WARNING
logging.basicConfig(
filename="logs/api.log", filemode="w", level=logging_level
)
socketio.init_app(app)
ipc.init_app(app)
return app
from gevent.pywsgi import WSGIServer
from api import create_app
app = create_app()
WSGIServer(("", app.config["PORT"]), app).serve_forever()
from common.config import BaseConfig
class Config(BaseConfig):
"""Configuration for the Flask server."""
PORT = 8000
class DevelopmentConfig(Config):
"""Configuration for the development environment."""
DEBUG = True
import logging
import gevent
from common.ipc import IPCBuilder, Topic
class IPCDirector:
"""Director used for building the correct manager.
Makes use of the manager builder.
"""
def __init__(self, app=None):
self.__builder = IPCBuilder()
if app is not None:
self.init_app(app)
def init_app(self, app):
self.__builder.build_subscriber(
app.config["ZMQ_MONITOR_ADDR"],
[Topic.READING, Topic.CYCLE, Topic.ALARM],
app.config["ZMQ_SYNC_API_ADDR"],
)
self.__builder.build_subscriber(
app.config["ZMQ_OPERATION_ADDR"],
[Topic.OPERATION_PARAMS],
app.config["ZMQ_SYNC_API_ADDR"],
)
app.ipc = self.__builder.manager
app.extensions["socketio"].start_background_task(
self.__ipc_watchdog, app
)
def __ipc_watchdog(self, app):
while True:
topic, body = app.ipc.recv(block=False)
if topic == Topic.OPERATION_PARAMS:
logging.info("emitting params")
app.extensions["socketio"].emit(
"parameters",
{
"mode": "VPS",
"ipap": body["ipap"],
"epap": body["epap"],
"breathing_freq": body["freq"],
"trigger": body["trigger"],
"ie_relation": f"{body['inhale']}:{body['exhale']}",
},
)
elif topic == Topic.READING:
app.extensions["socketio"].emit(
"readings",
{
"pressure": body["pressure"],
"flow": body["airflow"],
"volume": body["volume"],
"timestamp": body["timestamp"],
},
)
elif topic == Topic.CYCLE:
# TODO: Send cycle event.
pass
elif topic == Topic.ALARM:
# TODO: Send alarm event.
pass
gevent.sleep(0.1)
ipc = IPCDirector()
import logging
from flask_socketio import SocketIO
socketio = SocketIO()
@socketio.on("connect")
def connect():
"""Callback executed when a new client connects."""
logging.info("New client connected")
@socketio.on("disconnect")
def disconnect():
"""Callback executed when a client disconnects."""
logging.info("Client disconnected")
from datetime import datetime
from enum import Enum, auto
PRESSURE_MIN = "pressure-min"
PRESSURE_MAX = "pressure-max"
VOLUME_MIN = "volume-min"
VOLUME_MAX = "volume-max"
OXYGEN_MIN = "oxygen-min"
OXYGEN_MAX = "oxygen-max"
FREQ_MAX = "freq-max"
APNEA = "apnea"
DISCONNECTION = "disconnection"
class Type(Enum):
PRESSURE_MIN = auto()
PRESSURE_MAX = auto()
VOLUME_MIN = auto()
VOLUME_MAX = auto()
OXYGEN_MIN = auto()
OXYGEN_MAX = auto()
FREQ_MAX = auto()
APNEA = auto()
DISCONNECTION = auto()
class Criticality(Enum):
NONE = auto()
MEDIUM = auto()
HIGH = auto()
class Alarm:
def __init__(
self,
type: Type,
criticality: Criticality,
timestamp: datetime = datetime.now(),
):
self.type = type
self.criticality = criticality
self.timestamp = timestamp
class BaseConfig:
"""Common configuration for all processes."""
DEBUG = False
TESTING = False
ZMQ_MONITOR_ADDR = "ipc:///tmp/monitor"
ZMQ_OPERATION_ADDR = "ipc:///tmp/operation"
ZMQ_SYNC_CONTROLS_ADDR = "ipc:///tmp/sync-controls"
ZMQ_SYNC_GUI_ADDR = "ipc:///tmp/sync-gui"
ZMQ_SYNC_API_ADDR = "ipc:///tmp/sync-api"
IPAP_DEFAULT = 15
EPAP_DEFAULT = 5
FREQ_DEFAULT = 15
TRIGGER_DEFAULT = 5
INHALE_DEFAULT = 1
EXHALE_DEFAULT = 2
IPAP_MIN = 3
EPAP_MIN = 2
FREQ_MIN = 2
TRIGGER_MIN = 2
IPAP_MAX = 35
EPAP_MAX = 34
FREQ_MAX = 30
TRIGGER_MAX = 12
INHALE_MAX = 5
EXHALE_MAX = 5
ALARM_PRESSURE_MIN = 2
ALARM_VOLUME_MIN = 2
ALARM_OXYGEN_MIN = 21
ALARM_FREQ_MIN = 2
ALARM_PRESSURE_MAX = 35
ALARM_VOLUME_MAX = 12
ALARM_OXYGEN_MAX = 100
ALARM_FREQ_MAX = 40
import json
import logging
from enum import Enum, auto
from typing import Dict, List, Tuple
import zmq
class Topic(Enum):
SYNC = auto()
CHECK = auto()
OPERATION_MODE = auto()
OPERATION_PARAMS = auto()
OPERATION_ALARMS = auto()
ALARM = auto()
SILENCE_ALARMS = auto()
REQUEST_READING = auto()
READING = auto()
CYCLE = auto()
class PublisherError(Exception):
pass
class IPCManager:
def __init__(self):
self.pub = None
self.subs = []
self.poller = None
def send(self, topic: Topic, body: Dict):
self.pub.send_multipart(
[topic.name.encode(), json.dumps(body).encode()]
)
def recv(self, block=True) -> Tuple[Topic, Dict]:
if not self.subs:
return None, None
elif not self.poller:
sub = self.subs[0]
else:
sockets = dict(self.poller.poll(timeout=0 if not block else None))
for s in self.subs:
if s in sockets:
sub = s
break
try:
topic, body = sub.recv_multipart(zmq.NOBLOCK if not block else 0)
return Topic[topic.decode()], json.loads(body)
except Exception:
return None, None
class IPCBuilder:
def __init__(self):
self.__ctx = zmq.Context()
self.__ipc = IPCManager()
@property
def manager(self):
product = self.__ipc
self.__ipc = IPCManager()
return product
def build_publisher(self, addr: str, subscribers: List[str]):
if self.__ipc.pub:
raise PublisherError()
pub = self.__ctx.socket(zmq.PUB)
pub.bind(addr)
# Sync with subscribers
sync_socket = self.__ctx.socket(zmq.REP)
for sync_addr in subscribers:
sync_socket.bind(sync_addr)
while True:
pub.send_string(Topic.SYNC.name)
try:
sync_socket.recv_string(zmq.NOBLOCK)
sync_socket.send_string(Topic.SYNC.name)
break
except zmq.Again:
continue
logging.info("Succesfully synced publisher on %s", addr)
sync_socket.close()
self.__ipc.pub = pub
def build_subscriber(self, addr: str, topics: List[Topic], sync_addr: str):
logging.info("Building subscriber on %s", addr)
sub = self.__ctx.socket(zmq.SUB)
sub.connect(addr)
# Sync with publisher
sub.setsockopt_string(zmq.SUBSCRIBE, Topic.SYNC.name)
sync = self.__ctx.socket(zmq.REQ)
sync.connect(sync_addr)
while True:
msg = sub.recv_string()
if msg == Topic.SYNC.name:
logging.info("%s received sync msg", addr)
try:
sync.send_string(Topic.SYNC.name)
sync.recv_string()
break
except zmq.Again:
continue
logging.info("Successfully synced subscriber on %s", addr)
sync.close()
sub.setsockopt_string(zmq.UNSUBSCRIBE, Topic.SYNC.name)
for t in topics:
sub.setsockopt_string(zmq.SUBSCRIBE, t.name)
self.__ipc.subs.append(sub)
# Create poller if there is more than one subscriber
if len(self.__ipc.subs) > 1:
self.__ipc.poller = zmq.Poller()
for sub in self.__ipc.subs:
self.__ipc.poller.register(sub, zmq.POLLIN)
import logging
import os
from controls.app import ControlApplication
from controls.config import Config, DevelopmentConfig, TestingConfig
from controls.context import ctx
from controls.ipc import ipc
from controls.pcb.controller import pcb
from controls.states.director import sd
def create_app() -> ControlApplication:
"""Create the app and initialize all its extensions.
The appropiate config is loaded depending on the selected environment.
Returns:
ControlApplication: The app.
"""
app = ControlApplication()
if os.environ.get("ENV") == "development":
app.config.from_object(DevelopmentConfig)
elif os.environ.get("ENV") == "test":
app.config.from_object(TestingConfig)
else:
app.config.from_object(Config)
if app.config["DEBUG"]:
logging_level = logging.INFO
else:
logging_level = logging.WARNING
logging.basicConfig(
filename="logs/controls.log", filemode="w", level=logging_level
)
ctx.init_app(app)
pcb.init_app(app)
sd.init_app(app)
ipc.init_app(app)
return app
from controls import create_app
app = create_app()
app.run()
import logging
from flask.config import Config
class ControlApplication:
"""Main control process.
Behaves like a state machine.
"""
def __init__(self):
self.config = Config({})
def run(self):
"""Main loop of the application."""
while True:
self.current_state.run()
def transition_to(self, state):
"""Change the current state of the application.
Args:
state: The state to transition to.
"""
logging.info("Transitioning to state %s", state.__name__)
self.current_state = self.states[state.__name__]
from common.config import BaseConfig
class Config(BaseConfig):
"""Base configuration for the control application."""
MOCK = False
DHT_BOX_GPIO = 16
DHT_AIR_GPIO = 16
SERVO_GPIO = 15
SERVO_MIN_WIDTH = 500
SERVO_MAX_WIDTH = 2500
ADC_SPI_CHANNEL = 0
ADC_BAUD_RATE = 96000
ADC_REF_VOLTAGE = 5
ADC_GAUGE_CHANNEL = 4
ADC_AIRFLOW_CHANNEL = 1
ADC_ATM_CHANNEL = 5
ADC_OXYGEN_CHANNEL = 2
class DevelopmentConfig(Config):
"""Configuration for the development environment."""
DEBUG = True
MOCK = True
class TestingConfig(Config):
"""Configuration for the test environment."""
DEBUG = True
from dataclasses import dataclass
from datetime import datetime
from typing import Set
from common.alarms import Alarm, Type, Criticality
@dataclass
class Context:
"""System data.
Most of the initial values are taken from a config file.
"""
ipap: int
epap: int
freq: int
trigger: int
inhale: int
exhale: int
mode: str
pressure_min: int
pressure_max: int
volume_min: int
volume_max: int
oxygen_min: int
oxygen_max: int
freq_max: int
alarms: Set[Alarm]
def __init__(self, app=None):
if app is not None:
self.init_app(app)
def init_app(self, app):
self.ipap = app.config["IPAP_DEFAULT"]
self.epap = app.config["EPAP_DEFAULT"]
self.freq = app.config["FREQ_DEFAULT"]
self.trigger = app.config["TRIGGER_DEFAULT"]
self.inhale = app.config["INHALE_DEFAULT"]
self.exhale = app.config["EXHALE_DEFAULT"]
self.mode = "vcp"
self.pressure_min = app.config["ALARM_PRESSURE_MIN"]
self.pressure_max = app.config["ALARM_PRESSURE_MAX"]
self.volume_min = app.config["ALARM_VOLUME_MIN"]
self.volume_max = app.config["ALARM_VOLUME_MAX"]
self.oxygen_min = app.config["ALARM_OXYGEN_MIN"]
self.oxygen_max = app.config["ALARM_OXYGEN_MAX"]
self.freq_max = app.config["ALARM_FREQ_MAX"]
timestamp = datetime.now()
self.alarms = set(
[Alarm(t, Criticality.NONE, timestamp) for t in Type]
)
app.ctx = self
ctx = Context()
from common.ipc import IPCBuilder, Topic
class IPCDirector:
"""Director used for building the correct manager.
Makes use of the manager builder.
"""
def __init__(self, app=None):
self.__builder = IPCBuilder()
if app is not None:
self.init_app(app)
def init_app(self, app):
self.__builder.build_publisher(
app.config["ZMQ_MONITOR_ADDR"],
[app.config["ZMQ_SYNC_GUI_ADDR"], app.config["ZMQ_SYNC_API_ADDR"]],
)
self.__builder.build_subscriber(
app.config["ZMQ_OPERATION_ADDR"],
[
Topic.OPERATION_PARAMS,
Topic.OPERATION_ALARMS,
Topic.REQUEST_READING,
],
app.config["ZMQ_SYNC_CONTROLS_ADDR"],
)
app.ipc = self.__builder.manager
ipc = IPCDirector()
from .analog import AnalogConverter, AnalogSensor
from .digital import DHTSensor
from .servo import Servo
__all__ = ("AnalogConverter", "AnalogSensor", "DHTSensor", "Servo")
"""
This module contains utilites for working with analog devices, including a
basic analog-to-digital converter.
"""
import atexit
from typing import Callable
import pigpio
class AnalogConverter:
"""
An MCP3008 analog to digital converter.
"""
def __init__(
self, pi: pigpio.pi, channel: int, baud_rate: int, voltage: float
):
"""
Instantiate a new ADC connected to the Pi.
The device is connected by SPI on the given channel. Data transfer
depends on the baud rate.
The ADC's supply voltage is also necessary for the conversion
calculations.
"""
self.pi = pi
self.channel = channel
self.baud_rate = baud_rate
self.voltage = voltage
atexit.register(self.__cleanup)
self.spi = pi.spi_open(channel, baud_rate)
def __cleanup(self):
"""
Clean things up before dumping.
"""
self.pi.spi_close(self.spi)
def read(self, channel: int) -> float:
"""
Return the voltage value on the given channel.
"""
_, data = self.pi.spi_xfer(self.spi, [1, (8 + channel) << 4, 0])
raw_value = ((data[1] & 3) << 8) + data[2]
value = (self.voltage / (2 ** 10 - 1)) * raw_value
return value
class AnalogSensor:
"""
A sensor connected to an analog-to-digital converter.
"""
def __init__(
self,
adc: AnalogConverter,
channel: int,
func: Callable[[float], float],
):
"""
Create a new instance of an analog sensor.
The ADC it is connected to and the channel must be given, along with
the function used to convert the voltage to the actual reading.
"""
self.__adc = adc
self.__channel = channel
self.__func = func
@property
def func(self) -> Callable[[float], float]:
return self.__func
@func.setter
def func(self, f):
self.__func = f
def read(self) -> float:
"""
Trigger a new reading of the sensor.
"""
voltage = self.__adc.read(self.__channel)
return self.__func(voltage)
import atexit
import logging
import sys
import pigpio
from .analog import AnalogConverter, AnalogSensor
from .digital import DHTSensor
from .servo import Servo
from .mock import (
MockServo,
MockDHTSensor,
MockAnalogSensorRandom,
MockAnalogSensorSignal,
)
from .mock.signals import flux, pressure
class PCBController:
"""Helper class that wraps all the PCB components for easy use.
This class also handles device initialization.
"""
def __init__(self, app=None):
self.dht_box = None
self.dht_air = None
self.servo = None
self.airflow_ps = None
self.atm_ps = None
self.gauge_ps = None
self.oxygen_sensor = None
if app is not None:
self.init_app(app)
def init_app(self, app):
if app.config["MOCK"]:
# Create mock components
self.dht_box = MockDHTSensor((10, 50), (0, 100))
self.dht_air = MockDHTSensor((10, 50), (0, 100))
self.servo = MockServo()
self.oxygen_sensor = MockAnalogSensorRandom((16, 95))
self.atm_ps = MockAnalogSensorRandom((34, 105))
self.gauge_ps = MockAnalogSensorSignal(pressure)
self.airflow_ps = MockAnalogSensorSignal(flux)
else:
# Create normal components
pi = pigpio.pi()
atexit.register(pi.stop)
if not pi.connected:
logging.fatal("Can't find pigpiod")
sys.exit()
self.dht_box = DHTSensor(pi, app.config["DHT_BOX_GPIO"])
self.dht_air = DHTSensor(pi, app.config["DHT_AIR_GPIO"])
self.servo = Servo(
pi,
app.config["SERVO_GPIO"],
app.config["SERVO_MIN_WIDTH"],
app.config["SERVO_MAX_WIDTH"],
)
adc = AnalogConverter(
pi,
app.config["ADC_SPI_CHANNEL"],
app.config["ADC_BAUD_RATE"],
app.config["ADC_REF_VOLTAGE"],
)
self.atm_ps = AnalogSensor(
adc,
app.config["ADC_ATM_CHANNEL"],
lambda v: (v - 0.21) / (4.6 / 100) + 15,
)
self.airflow_ps = AnalogSensor(
adc,
app.config["ADC_AIRFLOW_CHANNEL"],
lambda v: (-19.269 * v ** 2 + 172.15 * v - 276.2),
)
self.gauge_ps = AnalogSensor(
adc,
app.config["ADC_GAUGE_CHANNEL"],
lambda v: (10.971 * v - 5.9539),
)
self.oxygen_sensor = AnalogSensor(
adc,
app.config["ADC_OXYGEN_CHANNEL"],
lambda v: (74 - 21) / (0.0425 - 0.01) * v,
)
app.pcb = self
pcb = PCBController()
"""
This module contains classes for the digital devices connected to the PCB.
"""
import atexit
import logging
import time
import pigpio
# TODO: Simplify implementation.
class DHTSensor:
"""
A DHT22 sensor for temperature and humidity reading.
"""
def __init__(self, pi: pigpio.pi, gpio_pin: int):
"""
Instantiate with the Pi and gpio to which the DHT22 output
pin is connected.
"""
self.pi = pi
self.gpio_pin = gpio_pin
self.cb = None
atexit.register(self.__cleanup)
self.bad_CS = 0 # Bad checksum count.
self.bad_SM = 0 # Short message count.
self.bad_MM = 0 # Missing message count.
self.bad_SR = 0 # Sensor reset count.
# Power cycle if timeout > MAX_TIMEOUTS.
self.no_response = 0
self.MAX_NO_RESPONSE = 2
self.rhum = -999
self.temp = -999
self.tov = None
self.high_tick = 0
self.bit = 40
pi.set_pull_up_down(gpio_pin, pigpio.PUD_OFF)
pi.set_watchdog(gpio_pin, 0) # Kill any watchdogs.
self.cb = pi.callback(gpio_pin, pigpio.EITHER_EDGE, self.__cb)
def __cleanup(self):
"""Clean up before dumping"""
self.pi.set_watchdog(self.gpio_pin, 0) # Kill all watchdogs.
if self.cb is not None:
self.cb.cancel()
self.cb = None
logging.info("Destroyed DHT sensor")
def __cb(self, gpio_pin, level, tick):
"""
Accumulate the 40 data bits. Format into 5 bytes, humidity high,
humidity low, temperature high, temperature low, checksum.
"""
diff = pigpio.tickDiff(self.high_tick, tick)
if level == 0:
# Edge length determines if bit is 1 or 0.
if diff >= 50:
val = 1
if diff >= 200: # Bad bit?
self.CS = 256 # Force bad checksum.
else:
val = 0
if self.bit >= 40: # Message complete.
self.bit = 40
elif self.bit >= 32: # In checksum byte.
self.CS = (self.CS << 1) + val
if self.bit == 39:
# 40th bit received.
self.pi.set_watchdog(self.gpio_pin, 0)
self.no_response = 0
total = self.hH + self.hL + self.tH + self.tL
if (total & 255) == self.CS: # Is checksum ok?
self.rhum = ((self.hH << 8) + self.hL) * 0.1
if self.tH & 128: # Negative temperature.
mult = -0.1
self.tH = self.tH & 127
else:
mult = 0.1
self.temp = ((self.tH << 8) + self.tL) * mult
self.tov = time.time()
else:
self.bad_CS += 1
elif self.bit >= 24: # in temp low byte
self.tL = (self.tL << 1) + val
elif self.bit >= 16: # in temp high byte
self.tH = (self.tH << 1) + val
elif self.bit >= 8: # in humidity low byte
self.hL = (self.hL << 1) + val
elif self.bit >= 0: # in humidity high byte
self.hH = (self.hH << 1) + val
else: # header bits
pass
self.bit += 1
elif level == 1:
self.high_tick = tick
if diff > 250000:
self.bit = -2
self.hH = self.hL = self.tH = self.tL = self.CS = 0
else: # level == pigpio.TIMEOUT:
self.pi.set_watchdog(self.gpio_pin, 0)
if self.bit < 8: # Too few data bits received.
self.bad_MM += 1 # Bump missing message count.
self.no_response += 1
if self.no_response > self.MAX_NO_RESPONSE:
self.no_response = 0
self.bad_SR += 1 # Bump sensor reset count.
elif self.bit < 39: # Short message receieved.
self.bad_SM += 1 # Bump short message count.
self.no_response = 0
else: # Full message received.
self.no_response = 0
def temperature(self):
"""Return current temperature."""
return self.temp
def humidity(self):
"""Return current relative humidity."""
return self.rhum
def staleness(self):
"""Return time since measurement made."""
if self.tov is not None:
return time.time() - self.tov
else:
return -999
def bad_checksum(self):
"""Return count of messages received with bad checksums."""
return self.bad_CS
def short_message(self):
"""Return count of short messages."""
return self.bad_SM
def missing_message(self):
"""Return count of missing messages."""
return self.bad_MM
def sensor_resets(self):
"""Return count of power cycles because of sensor hangs."""
return self.bad_SR
def trigger(self):
"""Trigger a new relative humidity and temperature reading."""
self.pi.write(self.gpio_pin, pigpio.LOW)
time.sleep(0.017) # 17 ms
self.pi.set_mode(self.gpio_pin, pigpio.INPUT)
self.pi.set_watchdog(self.gpio_pin, 200)
import random
from typing import List, Tuple
class MockServo:
"""Mock implementation for the servo."""
def set_angle(self, angle: int):
return
class MockDHTSensor:
"""Mock implementation for the DHT sensors."""
def __init__(self, tmp_range: Tuple[int, int], hum_range: Tuple[int, int]):
self.tmp_range = tmp_range
self.hum_range = hum_range
def trigger(self):
return
def temperature(self):
return random.randint(self.tmp_range[0], self.tmp_range[1])
def humidity(self):
return random.randint(self.hum_range[0], self.hum_range[1])
class MockAnalogSensorRandom:
"""Mock implementation for an analog sensor that returns a random
integer.
"""
def __init__(self, range: Tuple[int, int]):
self.range = range
self.func = None
def read(self):
return random.randint(self.range[0], self.range[1])
class MockAnalogSensorSignal:
"""Mock implementation for an analog sensor that simulates a real
signal."""
def __init__(self, signal: List[float]):
self.signal = signal
self.counter = 0
self.func = None
def read(self):
value = self.signal[self.counter]
self.counter += 1
if self.counter == len(self.signal):
self.counter = 0
return value
import numpy as np
# Fuente de datos 1
interval = 0.01
maxtime1 = 0.5
time1 = np.arange(0, maxtime1, interval)
m1 = 40
flux1 = time1 * m1
maxtime2 = 1.7
time2 = np.arange(0, maxtime2, interval)
flux2 = max(flux1) * np.e ** (-2 * time2)
maxtime3 = 0
maxTime = (maxtime1 * 2 + maxtime2 * 2 + maxtime3 * 1) * 5
flux = np.concatenate([flux1, flux2, -flux1, -flux2])
flux = np.concatenate([flux, flux, flux, flux, flux])
flux = np.concatenate([flux, flux, flux, flux, flux])
flux = flux + (np.random.rand(len(flux)) - 0.5) * 2.5
time = np.arange(0, maxTime * 5, interval)
# Fuente de datos 2
pressure1 = np.ones(len(time1) + len(time2)) * 2
pressure2 = pressure1 + 18
pressure = np.concatenate([pressure2, pressure1])
pressure = np.concatenate([pressure, pressure, pressure, pressure, pressure])
pressure = np.concatenate([pressure, pressure, pressure, pressure, pressure])
pressure = pressure + (np.random.rand(len(pressure)) - 0.5) * 0.75
import atexit
import logging
import pigpio
class Servo:
"""
A continuous servo that moves between 0 and 180 degrees.
"""
def __init__(
self, pi: pigpio.pi, gpio_pin: int, min_width: int, max_width: int
):
"""
Instantiate a new servo connected to the Pi on the given GPIO pin.
The minimum and maximum pulse widths must be specified.
"""
self.pi = pi
self.gpio_pin = gpio_pin
self.min_width = min_width
self.max_width = max_width
atexit.register(self.__cleanup)
def __cleanup(self):
"""
Clean things up before dumping.
"""
self.pi.set_servo_pulsewidth(self.gpio_pin, self.min_width)
logging.info("Destroyed servo")
def set_angle(self, angle: int):
"""
Move the servo to a given angle between 0 and 180 degrees.
"""
if angle < 0 or angle > 180:
raise ValueError("The angle must be between 0 and 180 degrees.")
width = (angle / 180) * (
self.max_width - self.min_width
) + self.min_width
self.pi.set_servo_pulsewidth(self.gpio_pin, width)
from .failure import FailureState
from .operation import OperationState
from .self_check import SelfCheckState
from .stand_by import StandByState
__all__ = (
"FailureState",
"OperationState",
"StandByState",
"SelfCheckState",
)
import controls.states as states
class StateDirector:
"""Director used for initializing the app's states."""
def __init__(self, app=None):
if app is not None:
self.init_app(app)
def init_app(self, app):
app.states = {}
for s in map(states.__dict__.get, states.__all__):
app.states[s.__name__] = s(app)
app.current_state = app.states[states.SelfCheckState.__name__]
sd = StateDirector()
from .state import State
class FailureState(State):
"""
System failure.
"""
def run(self):
return super().run()
This diff is collapsed.
import logging
import time
from typing import Tuple
from common.ipc import Topic
from .failure import FailureState
from .stand_by import StandByState
from .state import State
BOX_TMP_MIN = 10 # ºC
BOX_TMP_MAX = 50 # ºC
BOX_HUM_MIN = 0 # %
BOX_HUM_MAX = 100 # %
AIR_TMP_MIN = 10 # ºC
AIR_TMP_MAX = 50 # ºC
ATM_MIN = 33.3 # KPa
ATM_MAX = 106.0 # KPa
OXYGEN_MIN = 16 # %
OXYGEN_MAX = 95 # %
GAUGE_MIN = 407 # cmH2O
GAUGE_MAX = 510 # cmH2O
AIRFLOW_MIN = 0 # l/min
AIRFLOW_MAX = 40 # l/min
class SelfCheckState(State):
"""Initial state where the device's status is checked."""
def run(self):
# TODO: Call the actual functions instead of using static values
dht_box_ok = True
logging.info(
"Box DHT sensor status: %s", "OK" if dht_box_ok else "FAIL"
)
dht_air_ok = True
logging.info(
"Air DHT sensor status: %s", "OK" if dht_air_ok else "FAIL"
)
gauge_ok, airflow_ok, servo_ok = (True, True, True)
logging.info(
"Gauge pressure sensor status: %s", "OK" if dht_air_ok else "FAIL"
)
logging.info(
"Airflow pressure sensor status: %s",
"OK" if dht_air_ok else "FAIL",
)
logging.info("Servo status: %s", "OK" if dht_air_ok else "FAIL")
oxygen_ok = True
logging.info(
"Oxygen sensor status: %s", "OK" if dht_air_ok else "FAIL"
)
atm_ok = True
logging.info(
"Atmospheric pressure sensor status: %s",
"OK" if dht_air_ok else "FAIL",
)
# We need to wait a bit so the GUI application can catch up
time.sleep(2)
self.app.ipc.send(
Topic.CHECK,
{
"dht_box": dht_box_ok,
"dht_air": dht_air_ok,
"gauge": gauge_ok,
"airflow": airflow_ok,
"servo": servo_ok,
"oxygen": oxygen_ok,
"atmospheric": atm_ok,
},
)
if all(
[
dht_box_ok,
dht_air_ok,
gauge_ok,
airflow_ok,
servo_ok,
oxygen_ok,
atm_ok,
]
):
self.app.transition_to(StandByState)
else:
self.app.transition_to(FailureState)
def __check_dht_box(self) -> bool:
"""
Check temperature and humidity sensor.
"""
for _ in range(3):
self.app.pcb.dht_box.trigger()
tmp = self.app.pcb.dht_box.temperature()
hum = self.app.pcb.dht_box.humidity()
if (
BOX_TMP_MIN <= tmp <= BOX_TMP_MAX
or BOX_HUM_MIN <= hum <= BOX_HUM_MAX
):
return True
time.sleep(2)
return False
def __check_dht_air(self) -> bool:
"""
Check temperature and humidity sensor.
"""
for _ in range(3):
self.app.pcb.dht_air.trigger()
tmp = self.app.pcb.dht_air.temperature()
if AIR_TMP_MIN <= tmp <= AIR_TMP_MAX:
return True
time.sleep(2)
return False
# TODO: Simplify this function's implementation
def __check_operation(self) -> Tuple[bool, bool, bool]:
"""
Check pressure and airflow sensors, along with servo operation.
"""
flag_check_gauge = False
flag_check_airflow = False
flag_check_servo = False
flag_gauge_0_ok = False
flag_flow_0_ok = False
flag_gauge_180_ok = False
flag_flow_180_ok = False
angles = [0, 180]
for _ in range(3):
for angle in angles:
self.app.pcb.servo.set_angle(angle)
time.sleep(0.5)
gauge_pressure = self.app.pcb.gauge_ps.read()
airflow_pressure = self.app.pcb.airflow_ps.read()
if (
angle == 0
and GAUGE_MIN * 0.95 < gauge_pressure < GAUGE_MIN * 1.05
):
flag_gauge_0_ok = True
if (
angle == 0
and AIRFLOW_MAX * 0.9
< airflow_pressure
< AIRFLOW_MIN * 1.05
):
flag_flow_0_ok = True
if (
angle == 180
and GAUGE_MAX * 0.95 < gauge_pressure < GAUGE_MAX * 1.05
):
flag_gauge_180_ok = True
if (
angle == 180
and AIRFLOW_MAX * 0.95
< airflow_pressure
< AIRFLOW_MIN * 1.05
):
flag_flow_180_ok = True
if flag_gauge_0_ok and flag_gauge_180_ok:
flag_check_gauge = True
if flag_flow_0_ok and flag_flow_180_ok:
flag_check_airflow = True
# If check GAUGE and AIRFLOW are OK, SERVO OK and stop attempting
if flag_check_gauge and flag_check_airflow:
flag_check_servo = True
break
return flag_check_gauge, flag_check_airflow, flag_check_servo
def __check_oxygen(self) -> bool:
"""
Check oxygen sensor.
"""
oxygen_percentage = self.app.pcb.oxygen_sensor.read()
if OXYGEN_MIN <= oxygen_percentage <= OXYGEN_MAX:
return True
return False
def __check_atmospheric(self) -> bool:
"""
Check ambient pressure sensor.
"""
atm_pressure = self.app.pcb.atm_ps.read()
if ATM_MIN <= atm_pressure <= ATM_MAX:
return True
return False
import logging
import time
from common.ipc import Topic
from .operation import OperationState
from .state import State
MEAN_CALIBRATION_LENGTH = 200
class StandByState(State):
"""In this state, the system is waiting for the user to introduce the
initial operation parameters.
"""
def run(self):
airflow_offset = 0.0
gauge_offset = 0.0
self.app.pcb.servo.set_angle(60)
time_saved = time.time()
time_start = 10.0
while True:
# Airflow and gauge calibration
airflow_offset = (
(MEAN_CALIBRATION_LENGTH - 1) / MEAN_CALIBRATION_LENGTH
) * airflow_offset + (
1 / MEAN_CALIBRATION_LENGTH
) * self.app.pcb.airflow_ps.read()
gauge_offset = (
(MEAN_CALIBRATION_LENGTH - 1) / MEAN_CALIBRATION_LENGTH
) * gauge_offset + (
1 / MEAN_CALIBRATION_LENGTH
) * self.app.pcb.gauge_ps.read()
topic, body = self.app.ipc.recv(block=False)
if topic == Topic.OPERATION_PARAMS:
self.app.ctx.ipap = body["ipap"]
self.app.ctx.epap = body["epap"]
self.app.ctx.freq = body["freq"]
self.app.ctx.trigger = body["trigger"]
self.app.ctx.inhale = body["inhale"]
self.app.ctx.exhale = body["exhale"]
logging.info("Time: %f", time.time() - time_saved)
# Airflow and gauge continue calibration
while time.time() < time_saved + time_start:
airflow_offset = (
(MEAN_CALIBRATION_LENGTH - 1) / MEAN_CALIBRATION_LENGTH
) * airflow_offset + (
1 / MEAN_CALIBRATION_LENGTH
) * self.app.pcb.airflow_ps.read()
gauge_offset = (
(MEAN_CALIBRATION_LENGTH - 1) / MEAN_CALIBRATION_LENGTH
) * gauge_offset + (
1 / MEAN_CALIBRATION_LENGTH
) * self.app.pcb.gauge_ps.read()
time.sleep(0.0001)
break
time.sleep(0.0001)
airflow_voltage_offset = self.__calculate_voltage_airflow(
airflow_offset
)
gauge_voltage_offset = self.__calculate_voltage_gauge(gauge_offset)
self.app.pcb.airflow_ps.func = lambda v: (
-19.269 * (v - (airflow_voltage_offset - 2.0963)) ** 2
+ 172.15 * (v - (airflow_voltage_offset - 2.0963))
- 276.2
)
self.app.pcb.gauge_ps.func = lambda v: (
10.971 * (v - (gauge_voltage_offset - 0.54269)) - 5.9539
)
logging.info("Airflow Voltage offset: %f", airflow_voltage_offset)
logging.info("Airflow offset: %f", airflow_offset)
logging.info("Gauge Voltage offset: %f", airflow_voltage_offset)
logging.info("Gauge offset: %f", gauge_offset)
logging.info("Starting operation in controlled mode")
logging.info("IPAP -> %d", body["ipap"])
logging.info("EPAP -> %d", body["epap"])
logging.info("Frequency -> %d", body["freq"])
logging.info("Trigger -> %d", body["trigger"])
logging.info("I:E -> %d:%d", body["inhale"], body["exhale"])
self.app.transition_to(OperationState)
def __calculate_voltage_airflow(self, airflow_offset):
return (
-172.15
+ (172.15 ** 2 - 4 * (-19.269) * (-276.2 - airflow_offset))
** (0.5)
) / (2 * (-19.269))
def __calculate_voltage_gauge(self, gauge_offset):
return (gauge_offset + 5.9539) / 10.971
from abc import ABCMeta, abstractmethod
class State(metaclass=ABCMeta):
"""
Base state class.
"""
def __init__(self, app):
self.app = app
@abstractmethod
def run(self):
"""Execute the state's logic.
This method can return when a transition to another state needs to be
made.
"""
return
import logging
import os
from gui.app import GUIApplication
from gui.config import Config, DevelopmentConfig
from gui.context import ctx
from gui.ipc import ipc
from gui.views.director import vd
def create_app() -> GUIApplication:
"""Create the app and initialize all its extensions.
The appropiate config is loaded depending on the selected environment.
Returns:
GUIApplication: The app.
"""
app = GUIApplication()
if os.environ.get("ENV") == "development":
app.config.from_object(DevelopmentConfig)
else:
app.config.from_object(Config)
if app.config["DEBUG"]:
logging_level = logging.INFO
else:
logging_level = logging.WARNING
logging.basicConfig(
filename="logs/gui.log", filemode="w", level=logging_level
)
ctx.init_app(app)
vd.init_app(app)
ipc.init_app(app)
return app
from gui import create_app
app = create_app()
app.run()
import logging
import PySimpleGUI as sg
from flask.config import Config
sg.theme("Black") # Use the dark theme
class GUIApplication(sg.Window):
"""The ventilator's GUI application.
Monitoring data is shown to users, who can also control the system with
their inputs.
"""
def __init__(self):
super().__init__("RespirOS", size=(1366, 768), margins=(10, 10))
self.config = Config({})
def run(self):
"""Main loop of the application."""
self.current_view.show()
while True:
event, values = super().read()
self.current_view.handle_event(event, values)
def show_view(self, view):
"""Show the specified view.
As PySimpleGUI doesn't support navigation as we know it, the app is
created with all the views as columns. We can achieve a behaviour akin
to navigation by making the views visible and invisible.
Args:
view: The class of the view to show.
"""
logging.info("Showing view %s", view.__name__)
self.current_view.hide()
self.current_view = self.views[view.__name__]
self.current_view.show()
from abc import ABCMeta, abstractmethod
from typing import Dict
import PySimpleGUI as sg
class Component(sg.Column, metaclass=ABCMeta):
"""Base class for GUI components.
All views and other smaller components extend this class.
"""
def __init__(self, app, *args, **kwargs):
super().__init__([], *args, **kwargs)
self.app = app
self.children = []
@abstractmethod
def handle_event(self, event: str, values: Dict):
"""Perform the appropiate logic in response to an event.
Args:
event (str): The event key.
values (Dict): Map of values associated to the window.
"""
for c in self.children:
c.handle_event(event, values)
def show(self):
"""Make the component visible."""
self.update(visible=True)
for c in self.children:
c.show()
def hide(self):
"""Make the component invisible."""
self.update(visible=False)
from common.config import BaseConfig
class Config(BaseConfig):
"""Base configuration for the GUI application."""
FONT_FAMILY = "Helvetica"
FONT_SIZE_BIG = 20
FONT_SIZE_MEDIUM = 15
FONT_SIZE_SMALL = 12
class DevelopmentConfig(Config):
"""Configuration for the development environment."""
DEBUG = True
from dataclasses import dataclass
from typing import List
from common.alarms import Alarm
@dataclass
class Context:
"""System data shown in the GUI.
Most of the initial values are taken from a config file.
"""
locked: bool
ipap: int
epap: int
freq: int
trigger: int
inhale: int
exhale: int
mode: str
pressure_min: int
pressure_max: int
volume_min: int
volume_max: int
oxygen_min: int
oxygen_max: int
freq_max: int
alarms: List[Alarm]
def __init__(self, app=None):
if app is not None:
self.init_app(app)
def init_app(self, app):
self.locked = False
self.ipap = app.config["IPAP_DEFAULT"]
self.epap = app.config["EPAP_DEFAULT"]
self.freq = app.config["FREQ_DEFAULT"]
self.trigger = app.config["TRIGGER_DEFAULT"]
self.inhale = app.config["INHALE_DEFAULT"]
self.exhale = app.config["EXHALE_DEFAULT"]
self.mode = "vcp"
self.pressure_min = app.config["ALARM_PRESSURE_MIN"]
self.pressure_max = app.config["ALARM_PRESSURE_MAX"]
self.volume_min = app.config["ALARM_VOLUME_MIN"]
self.volume_max = app.config["ALARM_VOLUME_MAX"]
self.oxygen_min = app.config["ALARM_OXYGEN_MIN"]
self.oxygen_max = app.config["ALARM_OXYGEN_MAX"]
self.freq_max = app.config["ALARM_FREQ_MAX"]
self.alarms = []
app.ctx = self
ctx = Context()
import threading
from enum import Enum, auto
from common.ipc import IPCBuilder, Topic
class ZMQEvent(Enum):
CHECK = auto()
READING = auto()
CYCLE = auto()
ALARM = auto()
OPER_MODE = auto()
class IPCDirector:
"""Director used for building the correct manager.
Makes use of the manager builder.
"""
def __init__(self, app=None):
self.__builder = IPCBuilder()
if app is not None:
self.init_app(app)
def init_app(self, app):
self.__builder.build_subscriber(
app.config["ZMQ_MONITOR_ADDR"],
[
Topic.CHECK,
Topic.READING,
Topic.CYCLE,
Topic.ALARM,
Topic.OPERATION_MODE,
],
app.config["ZMQ_SYNC_GUI_ADDR"],
)
self.__builder.build_publisher(
app.config["ZMQ_OPERATION_ADDR"],
[
app.config["ZMQ_SYNC_CONTROLS_ADDR"],
app.config["ZMQ_SYNC_API_ADDR"],
],
)
app.ipc = self.__builder.manager
threading.Thread(
target=self.__ipc_watchdog, args=[app], daemon=True
).start()
def __ipc_watchdog(self, app):
while True:
topic, body = app.ipc.recv()
if topic == Topic.CHECK:
app.write_event_value(ZMQEvent.CHECK.name, body)
elif topic == Topic.READING:
app.write_event_value(ZMQEvent.READING.name, body)
elif topic == Topic.CYCLE:
app.write_event_value(ZMQEvent.CYCLE.name, body)
elif topic == Topic.OPERATION_MODE:
app.write_event_value(ZMQEvent.OPER_MODE.name, body)
"""
elif topic == Topic.ALARM:
app.write_event_value(ZMQEvent.ALARM.name, body)
"""
ipc = IPCDirector()
from .loading import LoadingView
from .operation import OperationView
from .parameters import ParametersView
__all__ = ("LoadingView", "ParametersView", "OperationView")
import gui.views as views
class ViewDirector:
"""Director used for initializing the app's views."""
def __init__(self, app=None):
if app is not None:
self.init_app(app)
def init_app(self, app):
app.views = {}
for v in map(views.__dict__.get, views.__all__):
app.views[v.__name__] = v(app)
app.layout([list(app.views.values())]).finalize()
app.current_view = app.views[views.LoadingView.__name__]
vd = ViewDirector()
from typing import Dict
import PySimpleGUI as sg
from gui.component import Component
from gui.ipc import ZMQEvent
from gui.views.parameters import ParametersView
class LoadingView(Component):
"""Shows progress and information about system checks."""
def __init__(self, app):
super().__init__(app, visible=False, key="LoadingView")
self.loading_label = sg.Text(
"Cargando",
justification="center",
font=(
app.config["FONT_FAMILY"],
app.config["FONT_SIZE_BIG"],
"bold",
),
)
self.expander_top = sg.Text()
self.expander_bottom = sg.Text()
self.layout(
[[self.expander_top], [self.loading_label], [self.expander_bottom]]
)
def handle_event(self, event: str, values: Dict):
if event == ZMQEvent.CHECK.name:
self.app.show_view(ParametersView)
def show(self):
self.expand(expand_x=True, expand_y=True)
self.expander_top.expand(expand_x=True, expand_y=True)
self.expander_bottom.expand(expand_x=True, expand_y=True)
self.loading_label.expand(
expand_x=True, expand_y=True, expand_row=False
)
super().show()
from typing import Dict
from common.ipc import Topic
from gui.component import Component
from gui.widgets.control import ControlPane
from gui.widgets.monitor import MonitorBar
from gui.widgets.plots import PlotCanvas
class OperationView(Component):
"""Main monitorization and control view."""
def __init__(self, app):
super().__init__(app, pad=(0, 0), visible=False, key="OperationView")
self.__first = True
self.monitor_bar = MonitorBar(app)
self.control_pane = ControlPane(app)
self.canvas = PlotCanvas(app, size=(650, 750))
self.children = [self.monitor_bar, self.canvas, self.control_pane]
self.layout([[self.monitor_bar], [self.canvas, self.control_pane]])
def handle_event(self, event: str, values: Dict):
# On first execution, request the first reading
if self.__first:
self.app.ipc.send(Topic.REQUEST_READING, {})
self.__first = False
super().handle_event(event, values)
def show(self):
self.expand(expand_x=True, expand_y=True)
self.control_pane.show_tab(self.control_pane.parameters)
super().show()
from typing import Dict
import PySimpleGUI as sg
from common.ipc import Topic
from gui.component import Component
from gui.views.operation import OperationView
from gui.widgets.sliders import IESlider, NumericSlider
class ParametersView(Component):
"""Parameter selection for operation start."""
def __init__(self, app):
super().__init__(
app, pad=(373, 0), visible=False, key="ParametersView"
)
# Sliders
self.ipap = NumericSlider(
app,
label="Presión IPAP",
metric="cmH\N{SUBSCRIPT TWO}O",
values=(app.config["IPAP_MIN"], app.config["IPAP_MAX"]),
default_value=app.config["IPAP_DEFAULT"],
)
self.epap = NumericSlider(
app,
label="Presión EPAP",
metric="cmH\N{SUBSCRIPT TWO}O",
values=(app.config["EPAP_MIN"], app.config["EPAP_MAX"]),
default_value=app.config["EPAP_DEFAULT"],
)
self.freq = NumericSlider(
app,
label="Frecuencia",
metric="rpm",
values=(app.config["FREQ_MIN"], app.config["FREQ_MAX"]),
default_value=app.config["FREQ_DEFAULT"],
)
self.trigger = NumericSlider(
app,
label="Trigger de flujo",
metric="ml",
values=(app.config["TRIGGER_MIN"], app.config["TRIGGER_MAX"]),
default_value=app.config["TRIGGER_DEFAULT"],
)
self.ie = IESlider(
app,
inhale_max=app.config["INHALE_MAX"],
exhale_max=app.config["EXHALE_MAX"],
default_value=(
app.config["INHALE_DEFAULT"],
app.config["EXHALE_DEFAULT"],
),
)
# Buttons
self.start_btn = sg.Button(
"Comenzar",
size=(10, 2),
font=(app.config["FONT_FAMILY"], app.config["FONT_SIZE_SMALL"]),
)
self.children = [
self.ipap,
self.epap,
self.freq,
self.trigger,
self.ie,
]
self.layout(
[
[
sg.Text(
"Seleccione los parámetros de operación",
font=(
app.config["FONT_FAMILY"],
app.config["FONT_SIZE_BIG"],
"bold",
),
)
],
[self.ipap],
[self.epap],
[self.freq],
[self.trigger],
[self.ie],
[self.start_btn],
]
)
def handle_event(self, event: str, values: Dict):
super().handle_event(event, values)
if event == self.start_btn.Key:
self.app.ipc.send(
Topic.OPERATION_PARAMS,
{
"ipap": self.app.ctx.ipap,
"epap": self.app.ctx.epap,
"freq": self.app.ctx.freq,
"trigger": self.app.ctx.trigger,
"inhale": self.app.ctx.inhale,
"exhale": self.app.ctx.exhale,
},
)
self.app.show_view(OperationView)
self.app.ctx.ipap = self.ipap.value
self.app.ctx.epap = self.epap.value
self.app.ctx.freq = self.freq.value
self.app.ctx.trigger = self.trigger.value
self.app.ctx.inhale = self.ie.value[0]
self.app.ctx.exhale = self.ie.value[1]
if self.ipap.value <= self.epap.value:
if event == self.ipap.slider.Key:
self.app.ctx.epap = self.epap.value = self.ipap.value - 1
else:
self.app.ctx.ipap = self.ipap.value = self.epap.value + 1
def show(self):
self.expand(expand_x=True, expand_y=True)
super().show()
from .pane import ControlPane
__all__ = ("ControlPane",)
from typing import Dict
import PySimpleGUI as sg
from gui.component import Component
from gui.ipc import ZMQEvent
from .tabs import AlarmsTab, HistoryTab, ParametersTab
class ControlPane(Component):
"""Pane with tabs for system control."""
def __init__(self, app):
super().__init__(app)
self.__locked = False
# Tabs
self.parameters = ParametersTab(app)
self.alarms = AlarmsTab(app)
self.history = HistoryTab(app)
self.__current_tab = None
# Labels
self.tab_label = sg.Text(
size=(10, 1),
font=(app.config["FONT_FAMILY"], app.config["FONT_SIZE_BIG"]),
)
self.mode_label = sg.Text(
size=(5, 1),
font=(app.config["FONT_FAMILY"], app.config["FONT_SIZE_BIG"]),
justification="right",
)
# Buttons
self.parameters_btn = sg.Button(
self.parameters.title,
size=(10, 2),
font=(app.config["FONT_FAMILY"], app.config["FONT_SIZE_SMALL"]),
)
self.alarms_btn = sg.Button(
self.alarms.title,
size=(10, 2),
font=(app.config["FONT_FAMILY"], app.config["FONT_SIZE_SMALL"]),
)
self.history_btn = sg.Button(
self.history.title,
size=(10, 2),
font=(app.config["FONT_FAMILY"], app.config["FONT_SIZE_SMALL"]),
)
# Misc
self.expander = sg.Text()
self.layout(
[
[self.tab_label, self.expander, self.mode_label],
[self.parameters, self.alarms, self.history],
[self.parameters_btn, self.alarms_btn, self.history_btn],
]
)
def handle_event(self, event: str, values: Dict):
self.__current_tab.handle_event(event, values)
if event == self.parameters_btn.Key:
self.show_tab(self.parameters)
elif event == self.alarms_btn.Key:
self.show_tab(self.alarms)
elif event == self.history_btn.Key:
self.show_tab(self.history)
elif event == ZMQEvent.OPER_MODE.name:
self.app.ctx.mode = values[event]["mode"]
self.mode_label.update(self.app.ctx.mode.upper())
self.parameters.switch_mode()
if self.__locked != self.app.ctx.locked:
self.__locked = self.app.ctx.locked
if self.__locked:
self.parameters.lock()
self.alarms.lock()
self.history.lock()
else:
self.parameters.unlock()
self.alarms.unlock()
self.history.unlock()
def show(self):
self.expand(expand_x=True, expand_y=True)
self.expander.expand(expand_x=True, expand_row=False)
self.parameters_btn.expand(expand_x=True, expand_row=False)
self.alarms_btn.expand(expand_x=True, expand_row=False)
self.history_btn.expand(expand_x=True, expand_row=False)
self.parameters.ipap_vcp.value = (
self.parameters.ipap_vps.value
) = self.app.ctx.ipap
self.parameters.epap_vcp.value = (
self.parameters.epap_vps.value
) = self.app.ctx.epap
self.parameters.freq_vcp.value = self.app.ctx.freq
self.parameters.trigger_vcp.value = (
self.parameters.trigger_vps.value
) = self.app.ctx.trigger
self.parameters.ie_vcp.value = (
self.app.ctx.inhale,
self.app.ctx.exhale,
)
self.mode_label.update(self.app.ctx.mode.upper())
super().show()
def show_tab(self, tab):
"""Show the specified tab
Args:
tab (ControlTab): The tab itself.
"""
if self.__current_tab:
self.__current_tab.hide()
if isinstance(self.__current_tab, ParametersTab):
self.parameters_btn.update(disabled=False)
elif isinstance(self.__current_tab, AlarmsTab):
self.alarms_btn.update(disabled=False)
elif isinstance(self.__current_tab, HistoryTab):
self.history_btn.update(disabled=False)
self.__current_tab = tab
self.__current_tab.show()
if isinstance(self.__current_tab, ParametersTab):
self.parameters_btn.update(disabled=True)
elif isinstance(self.__current_tab, AlarmsTab):
self.alarms_btn.update(disabled=True)
elif isinstance(self.__current_tab, HistoryTab):
self.history_btn.update(disabled=True)
self.tab_label.update(self.__current_tab.title)
from .alarms import AlarmsTab
from .history import HistoryTab
from .parameters import ParametersTab
__all__ = ("AlarmsTab", "HistoryTab", "ParametersTab")
from typing import Dict
import PySimpleGUI as sg
from common.ipc import Topic
from gui.widgets.sliders import NumericSlider
from .tab import ControlTab
class AlarmsTab(ControlTab):
"""Tab for alarm range control."""
def __init__(self, app):
super().__init__(app, "Alarmas")
# Sliders
self.pressure_min = NumericSlider(
app,
label="Min",
metric="cmH\N{SUBSCRIPT TWO}O",
values=(
app.config["ALARM_PRESSURE_MIN"],
app.config["ALARM_PRESSURE_MAX"],
),
default_value=app.config["ALARM_PRESSURE_MIN"],
)
self.pressure_max = NumericSlider(
app,
label="Max",
metric="cmH\N{SUBSCRIPT TWO}O",
values=(
app.config["ALARM_PRESSURE_MIN"],
app.config["ALARM_PRESSURE_MAX"],
),
default_value=app.config["ALARM_PRESSURE_MAX"],
)
self.volume_min = NumericSlider(
app,
label="Min",
metric="l/min",
values=(
app.config["ALARM_VOLUME_MIN"],
app.config["ALARM_VOLUME_MAX"],
),
default_value=app.config["ALARM_VOLUME_MIN"],
)
self.volume_max = NumericSlider(
app,
label="Max",
metric="l/min",
values=(
app.config["ALARM_VOLUME_MIN"],
app.config["ALARM_VOLUME_MAX"],
),
default_value=app.config["ALARM_VOLUME_MAX"],
)
self.oxygen_min = NumericSlider(
app,
label="Min",
metric="%",
values=(
app.config["ALARM_OXYGEN_MIN"],
app.config["ALARM_OXYGEN_MAX"],
),
default_value=app.config["ALARM_OXYGEN_MIN"],
)
self.oxygen_max = NumericSlider(
app,
label="Max",
metric="%",
values=(
app.config["ALARM_OXYGEN_MIN"],
app.config["ALARM_OXYGEN_MAX"],
),
default_value=app.config["ALARM_OXYGEN_MAX"],
)
self.freq_max = NumericSlider(
app,
label="Max",
metric="rpm",
values=(
app.config["ALARM_FREQ_MIN"],
app.config["ALARM_FREQ_MAX"],
),
default_value=app.config["ALARM_FREQ_MAX"],
)
# Frames
self.pressure_frame = sg.Frame(
"Presión",
[[self.pressure_min, self.pressure_max]],
font=(app.config["FONT_FAMILY"], app.config["FONT_SIZE_MEDIUM"]),
)
self.volume_frame = sg.Frame(
"Volumen",
[[self.volume_min, self.volume_max]],
font=(app.config["FONT_FAMILY"], app.config["FONT_SIZE_MEDIUM"]),
)
self.oxygen_frame = sg.Frame(
"Oxígeno",
[[self.oxygen_min, self.oxygen_max]],
font=(app.config["FONT_FAMILY"], app.config["FONT_SIZE_MEDIUM"]),
)
self.freq_frame = sg.Frame(
"Frecuencia",
[[self.freq_max]],
font=(app.config["FONT_FAMILY"], app.config["FONT_SIZE_MEDIUM"]),
)
# Buttons
self.commit_btn = sg.Button(
"Aplicar",
size=(10, 2),
font=(app.config["FONT_FAMILY"], app.config["FONT_SIZE_SMALL"]),
)
self.children = [
self.pressure_min,
self.pressure_max,
self.volume_min,
self.volume_max,
self.oxygen_min,
self.oxygen_max,
self.freq_max,
]
self.layout(
[
[self.commit_btn],
[self.pressure_frame],
[self.volume_frame],
[self.oxygen_frame],
[self.freq_frame],
]
)
def handle_event(self, event: str, values: Dict):
super().handle_event(event, values)
if event == self.commit_btn.Key:
self.app.ipc.send(
Topic.OPERATION_ALARMS,
{
"pressure_min": self.app.ctx.pressure_min,
"pressure_max": self.app.ctx.pressure_max,
"volume_min": self.app.ctx.volume_min,
"volume_max": self.app.ctx.volume_max,
"oxygen_min": self.app.ctx.oxygen_min,
"oxygen_max": self.app.ctx.oxygen_max,
"freq_max": self.app.ctx.freq_max,
},
)
self.app.ctx.pressure_min = self.pressure_min.value
self.app.ctx.pressure_max = self.pressure_max.value
self.app.ctx.volume_min = self.volume_min.value
self.app.ctx.volume_max = self.volume_max.value
self.app.ctx.oxygen_min = self.oxygen_min.value
self.app.ctx.oxygen_max = self.oxygen_max.value
self.app.ctx.freq_max = self.freq_max.value
if self.pressure_max.value <= self.pressure_min.value:
if event == self.pressure_min.slider.Key:
self.app.ctx.pressure_max = self.pressure_max.value = (
self.pressure_min.value + 1
)
else:
self.app.ctx.pressure_min = self.pressure_min.value = (
self.pressure_max.value - 1
)
elif self.volume_max.value <= self.volume_min.value:
if event == self.volume_min.slider.Key:
self.app.ctx.volume_max = self.volume_max.value = (
self.volume_min.value + 1
)
else:
self.app.ctx.volume_min = self.volume_min.value = (
self.volume_max.value - 1
)
elif self.oxygen_max.value <= self.oxygen_min.value:
if event == self.oxygen_min.slider.Key:
self.app.ctx.oxygen_max = self.oxygen_max.value = (
self.oxygen_min.value + 1
)
else:
self.app.ctx.oxygen_min = self.oxygen_min.value = (
self.oxygen_max.value - 1
)
def show(self):
self.expand(expand_x=True, expand_y=True)
self.pressure_frame.expand(expand_x=True)
self.volume_frame.expand(expand_x=True)
self.oxygen_frame.expand(expand_x=True)
self.freq_frame.expand(expand_x=True)
self.commit_btn.expand(expand_x=True)
super().show()
def lock(self):
for c in self.children:
c.slider.update(disabled=True)
self.commit_btn.update(disabled=True)
def unlock(self):
for c in self.children:
c.slider.update(disabled=False)
self.commit_btn.update(disabled=False)
from typing import Dict
import PySimpleGUI as sg
from common.alarms import Alarm, Criticality, Type
from common.ipc import Topic
from gui.ipc import ZMQEvent
from .tab import ControlTab
class AlarmCard(sg.Column):
"""Simple card that shows information about an alarm."""
def __init__(self, font_family: str, font_size: int, last: bool = False):
self.timestamp_label = sg.Text(
"",
size=(13, 1),
font=(font_family, font_size),
)
self.type_label = sg.Text(
"",
size=(15, 1),
font=(font_family, font_size),
)
self.priority_label = sg.Text(
"",
size=(10, 1),
font=(font_family, font_size, "bold"),
justification="right",
)
layout = [[self.timestamp_label, self.type_label, self.priority_label]]
if not last:
layout += [[sg.HorizontalSeparator()]]
super().__init__(
layout,
visible=False,
vertical_alignment="center",
)
def expand(self):
super().expand(expand_x=True)
self.type_label.expand(expand_x=True)
def show_alarm(self, alarm: Alarm):
"""Update the card to show information about the given alarm.
Args:
alarm (Alarm): The alarm to show.
"""
# Update type
if alarm.type == Type.PRESSURE_MIN:
self.type_label.update("Presión baja")
elif alarm.type == Type.PRESSURE_MAX:
self.type_label.update("Presión alta")
elif alarm.type == Type.VOLUME_MIN:
self.type_label.update("Volumen bajo")
elif alarm.type == Type.VOLUME_MAX:
self.type_label.update("Volumen alto")
elif alarm.type == Type.OXYGEN_MIN:
self.type_label.update("Oxígeno bajo")
elif alarm.type == Type.OXYGEN_MAX:
self.type_label.update("Oxígeno alto")
elif alarm.type == Type.FREQ_MAX:
self.type_label.update("Frecuencia alta")
elif alarm.type == Type.APNEA:
self.type_label.update("Apnea")
elif alarm.type == Type.DISCONNECTION:
self.type_label.update("Desconexión")
# Update criticality
if alarm.criticality == Criticality.HIGH:
self.priority_label.update("Crítico", text_color="IndianRed1")
else:
self.priority_label.update("Normal", text_color="orange")
# Update timestamp
self.timestamp_label.update(alarm.timestamp.strftime("%d/%m %H:%M:%S"))
class HistoryTab(ControlTab):
"""Tab that shows the history of triggered alarms."""
def __init__(self, app):
super().__init__(app, "Histórico")
# Buttons
self.silence_btn = sg.Button(
"Silenciar alarmas",
size=(10, 2),
font=(app.config["FONT_FAMILY"], app.config["FONT_SIZE_SMALL"]),
)
# Alarm cards
self.alarm_cards = []
for i in range(10):
self.alarm_cards.append(
AlarmCard(
font_family=app.config["FONT_FAMILY"],
font_size=app.config["FONT_SIZE_MEDIUM"],
last=True if i == 9 else False,
)
)
self.layout(
[[self.silence_btn]] + [[card] for card in self.alarm_cards]
)
def handle_event(self, event: str, values: Dict):
if event == self.silence_btn.Key:
self.app.ipc.send(Topic.SILENCE_ALARMS, {})
elif event == ZMQEvent.ALARM.name:
alarm = Alarm(
Type[values[event]["type"]],
Criticality[values[event]["criticality"]],
float(values[event]["timestamp"]),
)
if alarm.criticality != Criticality.NONE:
if len(self.app.ctx.alarms) == 10:
self.app.ctx.alarms = self.app.ctx.alarms[1:]
self.app.ctx.alarms.append(alarm)
for a, c in zip(self.app.ctx.alarms, self.alarm_cards):
c.show_alarm(a)
def show(self):
self.expand(expand_x=True, expand_y=True)
self.silence_btn.expand(expand_x=True)
for card in self.alarm_cards:
card.expand()
super().show()
def lock(self):
self.silence_btn.update(disabled=True)
def unlock(self):
self.silence_btn.update(disabled=False)
from typing import Dict
import PySimpleGUI as sg
from common.ipc import Topic
from gui.widgets.sliders import IESlider, NumericSlider
from .tab import ControlTab
class ParametersTab(ControlTab):
"""Tab for operation control."""
def __init__(self, app):
super().__init__(app, "Parámetros")
# VCP
self.ipap_vcp = NumericSlider(
app,
label="Presión IPAP",
metric="cmH\N{SUBSCRIPT TWO}O",
values=(
app.config["IPAP_MIN"],
app.config["IPAP_MAX"],
),
default_value=app.config["IPAP_DEFAULT"],
)
self.epap_vcp = NumericSlider(
app,
label="Presión EPAP",
metric="cmH\N{SUBSCRIPT TWO}O",
values=(
app.config["EPAP_MIN"],
app.config["EPAP_MAX"],
),
default_value=app.config["EPAP_DEFAULT"],
)
self.freq_vcp = NumericSlider(
app,
label="Frecuencia",
metric="rpm",
values=(
app.config["FREQ_MIN"],
app.config["FREQ_MAX"],
),
default_value=app.config["FREQ_DEFAULT"],
)
self.trigger_vcp = NumericSlider(
app,
label="Trigger de flujo",
metric="ml",
values=(
app.config["TRIGGER_MIN"],
app.config["TRIGGER_MAX"],
),
default_value=app.config["TRIGGER_DEFAULT"],
)
self.ie_vcp = IESlider(
app,
inhale_max=app.config["INHALE_MAX"],
exhale_max=app.config["EXHALE_MAX"],
default_value=(
app.config["INHALE_DEFAULT"],
app.config["EXHALE_DEFAULT"],
),
)
# VPS
self.ipap_vps = NumericSlider(
app,
label="Presión IPAP",
metric="cmH\N{SUBSCRIPT TWO}O",
values=(
app.config["IPAP_MIN"],
app.config["IPAP_MAX"],
),
default_value=app.config["IPAP_DEFAULT"],
)
self.epap_vps = NumericSlider(
app,
label="Presión EPAP",
metric="cmH\N{SUBSCRIPT TWO}O",
values=(
app.config["EPAP_MIN"],
app.config["EPAP_MAX"],
),
default_value=app.config["EPAP_DEFAULT"],
)
self.trigger_vps = NumericSlider(
app,
label="Trigger de flujo",
metric="ml",
values=(
app.config["TRIGGER_MIN"],
app.config["TRIGGER_MAX"],
),
default_value=app.config["TRIGGER_DEFAULT"],
)
# Tabs
self.vcp_tab = sg.Column(
[
[self.ipap_vcp],
[self.epap_vcp],
[self.freq_vcp],
[self.trigger_vcp],
[self.ie_vcp],
],
visible=False,
)
self.vps_tab = sg.Column(
[
[self.ipap_vps],
[self.epap_vps],
[self.trigger_vps],
],
visible=False,
)
self.children = [
self.ipap_vcp,
self.epap_vcp,
self.freq_vcp,
self.trigger_vcp,
self.ie_vcp,
self.ipap_vps,
self.epap_vps,
self.trigger_vps,
]
self.layout([[self.vcp_tab, self.vps_tab]])
def handle_event(self, event: str, values: Dict):
super().handle_event(event, values)
if self.app.ctx.mode.upper() == "VCP":
self.app.ctx.ipap = self.ipap_vps.value = self.ipap_vcp.value
self.app.ctx.epap = self.epap_vps.value = self.epap_vcp.value
self.app.ctx.trigger = (
self.trigger_vps.value
) = self.trigger_vcp.value
self.app.ctx.freq = self.freq_vcp.value
self.app.ctx.inhale = self.ie_vcp.value[0]
self.app.ctx.exhale = self.ie_vcp.value[1]
else:
self.app.ctx.ipap = self.ipap_vcp.value = self.ipap_vps.value
self.app.ctx.epap = self.epap_vcp.value = self.epap_vps.value
self.app.ctx.trigger = (
self.trigger_vcp.value
) = self.trigger_vps.value
if self.app.ctx.ipap <= self.app.ctx.epap and event in {
self.ipap_vcp.slider.Key,
self.ipap_vps.slider.Key,
}:
self.app.ctx.epap = self.epap_vcp.value = self.epap_vps.value = (
self.app.ctx.ipap - 1
)
elif self.app.ctx.epap >= self.app.ctx.ipap and event in {
self.epap_vcp.slider.Key,
self.epap_vps.slider.Key,
}:
self.app.ctx.ipap = self.ipap_vcp.value = self.ipap_vps.value = (
self.app.ctx.epap + 1
)
for c in self.children:
if event == c.slider.Key:
self.app.ipc.send(
Topic.OPERATION_PARAMS,
{
"ipap": self.app.ctx.ipap,
"epap": self.app.ctx.epap,
"freq": self.app.ctx.freq,
"trigger": self.app.ctx.trigger,
"inhale": self.app.ctx.inhale,
"exhale": self.app.ctx.exhale,
},
)
break
def switch_mode(self):
if self.app.ctx.mode.upper() == "VCP":
self.vps_tab.update(visible=False)
self.vcp_tab.expand(expand_x=True, expand_y=True)
self.ipap_vcp.expand()
self.epap_vcp.expand()
self.freq_vcp.expand()
self.trigger_vcp.expand()
self.ie_vcp.expand()
else:
self.vcp_tab.update(visible=False)
self.vps_tab.expand(expand_x=True, expand_y=True)
self.ipap_vps.expand()
self.epap_vps.expand()
self.trigger_vps.expand()
def show(self):
self.expand(expand_x=True, expand_y=True)
self.switch_mode()
super().show()
def lock(self):
for c in self.children:
c.slider.update(disabled=True)
def unlock(self):
for c in self.children:
c.slider.update(disabled=False)
from abc import ABCMeta, abstractmethod
from gui.component import Component
class ControlTab(Component, metaclass=ABCMeta):
"""Base class for tabs in the control pane."""
def __init__(self, app, title: str):
super().__init__(app, visible=False)
self.title = title
@abstractmethod
def lock(self):
return
@abstractmethod
def unlock(self):
return
from typing import Dict
import PySimpleGUI as sg
from common.alarms import Alarm, Criticality, Type
from gui.component import Component
from gui.ipc import ZMQEvent
class DisplayUnit(sg.Column):
def __init__(
self,
title: str,
metric: str,
size: int,
font_family: str,
font_size: int,
):
self.__value = 0.0
self.__metric = metric
self.title_label = sg.Text(
f"{title}:", font=(font_family, font_size, "bold")
)
self.value_label = sg.Text(
f"- {self.__metric}",
font=(font_family, font_size, "bold"),
size=(size, 1),
justification="right",
)
super().__init__([[self.title_label, self.value_label]])
@property
def value(self) -> float:
return self.__value
@value.setter
def value(self, new_value: float):
self.__value = round(new_value, 1)
self.value_label.update(f"{self.__value} {self.__metric}")
def show_alarm(self, criticality: Criticality):
"""Change the color of the unit to reflect an alarm.
Args:
criticality (str): Level of criticality.
"""
if criticality == Criticality.NONE:
self.title_label.update(text_color="white")
self.value_label.update(text_color="white")
elif criticality == Criticality.MEDIUM:
self.title_label.update(text_color="orange")
self.value_label.update(text_color="orange")
elif criticality == Criticality.HIGH:
self.title_label.update(text_color="IndianRed1")
self.value_label.update(text_color="IndianRed1")
class MonitorBar(Component):
"""System bar for parameter monitoring."""
def __init__(self, app):
super().__init__(app)
# Displays
self.ipap = DisplayUnit(
title="IPAP",
metric="cmH\N{SUBSCRIPT TWO}O",
size=10,
font_family=app.config["FONT_FAMILY"],
font_size=app.config["FONT_SIZE_BIG"],
)
self.epap = DisplayUnit(
title="EPAP",
metric="cmH\N{SUBSCRIPT TWO}O",
size=10,
font_family=app.config["FONT_FAMILY"],
font_size=app.config["FONT_SIZE_BIG"],
)
self.freq = DisplayUnit(
title="Frecuencia",
metric="rpm",
size=8,
font_family=app.config["FONT_FAMILY"],
font_size=app.config["FONT_SIZE_BIG"],
)
self.vc_in = DisplayUnit(
title="V (in)",
metric="ml",
size=5,
font_family=app.config["FONT_FAMILY"],
font_size=app.config["FONT_SIZE_SMALL"],
)
self.vc_out = DisplayUnit(
title="V (out)",
metric="ml",
size=5,
font_family=app.config["FONT_FAMILY"],
font_size=app.config["FONT_SIZE_SMALL"],
)
self.oxygen = DisplayUnit(
title="O\N{SUBSCRIPT TWO}",
metric="%",
size=4,
font_family=app.config["FONT_FAMILY"],
font_size=app.config["FONT_SIZE_SMALL"],
)
# Buttons
self.lock_btn = sg.Button(
"Bloquear",
size=(9, 2),
font=(app.config["FONT_FAMILY"], app.config["FONT_SIZE_SMALL"]),
)
# Misc
self.expander = sg.Text()
self.layout(
[
[
self.ipap,
self.epap,
self.freq,
self.vc_in,
self.vc_out,
self.oxygen,
self.expander,
self.lock_btn,
]
]
)
def handle_event(self, event: str, values: Dict):
if event == self.lock_btn.Key:
if not self.app.ctx.locked:
self.app.ctx.locked = True
self.lock_btn.update("Desbloquear")
else:
self.app.ctx.locked = False
self.lock_btn.update("Bloquear")
elif event == ZMQEvent.CYCLE.name:
self.ipap.value = values[event]["ipap"]
self.epap.value = values[event]["epap"]
self.freq.value = values[event]["freq"]
self.vc_in.value = values[event]["vc_in"]
self.vc_out.value = values[event]["vc_out"]
self.oxygen.value = values[event]["oxygen"]
elif event == ZMQEvent.ALARM.name:
alarm = Alarm(
Type[values[event]["type"]],
Criticality[values[event]["criticality"]],
float(values[event]["timestamp"]),
)
if alarm.type == Type.PRESSURE_MIN:
self.epap.show_alarm(alarm.criticality)
elif alarm.type == Type.PRESSURE_MAX:
self.ipap.show_alarm(alarm.criticality)
elif alarm.type == Type.VOLUME_MIN:
pass
elif alarm.type == Type.VOLUME_MAX:
pass
elif alarm.type in {Type.OXYGEN_MIN, Type.OXYGEN_MAX}:
self.oxygen.show_alarm(alarm.criticality)
elif alarm.type == Type.FREQ_MAX:
self.freq.show_alarm(alarm.criticality)
def show(self):
self.expand(expand_x=True, expand_row=False)
self.expander.expand(expand_x=True)
import tkinter
from concurrent.futures import ThreadPoolExecutor
from typing import Dict
import matplotlib.pyplot as plt
import PySimpleGUI as sg
from common.ipc import Topic
from gui.component import Component
from gui.ipc import ZMQEvent
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
FIGURE_SECONDS = 20 # seconds
SAMPLE_PERIOD = 0.02 # samples
class PlotCanvas(Component):
"""Custom canvas with three plots."""
def __init__(self, app, size):
super().__init__(app, pad=(0, 0))
self.nsamples = FIGURE_SECONDS / SAMPLE_PERIOD
self.timestamp_old = 0.0
self.pressure_old = 0.0
self.airflow_old = 0.0
self.volume_old = 0.0
self.canvas = sg.Canvas(size=size, pad=(0, 0))
self.layout([[self.canvas]])
def handle_event(self, event: str, values: Dict):
if event == ZMQEvent.READING.name:
self.__add_reading(values[event])
self.app.ipc.send(Topic.REQUEST_READING, {})
def show(self):
with plt.rc_context(
{
"figure.facecolor": "black",
"axes.facecolor": "black",
"axes.edgecolor": "white",
"axes.titlecolor": "white",
"axes.labelcolor": "white",
"xtick.color": "white",
"ytick.color": "white",
}
):
self.fig, self.axes = plt.subplots(3, 1, constrained_layout=True)
self.fig.set_figheight(6.5)
self.fig.set_figwidth(8)
# Configure axes and create lines
self.lines = []
for ax, meta in zip(
self.axes,
[
("PRESIÓN", "cmH\N{SUBSCRIPT TWO}O", (-2, 35), "violet"),
("FLUJO", "l/min", (-40, 55), "greenyellow"),
("VOLUMEN", "ml", (-2, 60), "cyan"),
],
):
ax.set_title(meta[0])
ax.set_ylabel(meta[1])
ax.set_xlim(0, self.nsamples)
ax.set_ylim(meta[2][0], meta[2][1])
ax.grid()
ax.tick_params(
axis="x", # changes apply to the x-axis
which="both", # both major and minor ticks are affected
bottom=False, # ticks along the bottom edge are off
top=False, # ticks along the top edge are off
labelbottom=False, # no label on the bottom edge
)
(line,) = ax.plot(0, animated=True, color=meta[3])
self.lines.append(line)
# Create GUI element
self.graph = FigureCanvasTkAgg(self.fig, self.canvas.TKCanvas)
self.graph.draw()
self.graph.get_tk_widget().pack(
side=tkinter.TOP, fill=tkinter.BOTH, expand=1
)
self.background = self.graph.copy_from_bbox(self.fig.bbox)
def __add_reading(self, reading: Dict):
pressure = float(reading["pressure"])
airflow = float(reading["airflow"])
volume = float(reading["volume"])
timestamp = float(reading["timestamp"])
period = timestamp - self.timestamp_old
nsamples = int(period / SAMPLE_PERIOD)
# Clear graph area
self.graph.restore_region(self.background)
# Update lines' data concurrently
with ThreadPoolExecutor(max_workers=3) as executor:
for ax, ln, value, value_old in zip(
self.axes,
self.lines,
[pressure, airflow, volume],
[self.pressure_old, self.airflow_old, self.volume_old],
):
executor.submit(
self.__interpolate,
ax,
ln,
value,
value_old,
nsamples,
period,
)
# Draw new lines
self.graph.blit(self.fig.bbox)
self.graph.flush_events()
# Update previous values
self.pressure_old = pressure
self.airflow_old = airflow
self.volume_old = volume
self.timestamp_old = timestamp
def __interpolate(self, ax, ln, value, value_old, nsamples, period):
data = ln.get_ydata()
if len(data) == 1:
data = data.tolist()
data.append(value)
elif nsamples >= 1:
m = (value - value_old) / period
b = value_old
h = 1
while nsamples > 0:
if len(data) == self.nsamples:
data = data[1:]
data.append(m * (SAMPLE_PERIOD * h) + b)
nsamples -= 1
h += 1
ln.set_xdata(range(len(data)))
ln.set_ydata(data)
ax.draw_artist(ln)
from typing import Dict, Tuple, Union
import PySimpleGUI as sg
from gui.component import Component
class NumericSlider(Component):
"""Slider for numerical parameters."""
def __init__(
self,
app,
label: str,
metric: str,
values: Tuple[int, int],
default_value: int,
):
super().__init__(app)
self.__value = default_value
self.title_label = sg.Text(
label,
font=(app.config["FONT_FAMILY"], app.config["FONT_SIZE_MEDIUM"]),
)
self.metric_label = sg.Text(
metric,
font=(app.config["FONT_FAMILY"], app.config["FONT_SIZE_MEDIUM"]),
)
self.value_label = sg.Text(
default_value,
size=(5, 1),
justification="right",
font=(app.config["FONT_FAMILY"], app.config["FONT_SIZE_MEDIUM"]),
)
self.slider = sg.Slider(
range=values,
default_value=self.__value,
disable_number_display=True,
orientation="h",
size=(0, 50),
enable_events=True,
)
self.layout(
[
[self.title_label, self.value_label, self.metric_label],
[self.slider],
]
)
@property
def value(self) -> int:
return self.__value
@value.setter
def value(self, value: int):
self.__value = value
self.value_label.update(value)
self.slider.update(value)
def handle_event(self, event: str, values: Dict):
if event == self.slider.Key:
self.value = int(values[event])
def show(self):
self.expand(expand_x=True)
self.title_label.expand(expand_x=True)
self.slider.expand(expand_x=True)
super().show()
class IESlider(Component):
"""Special slider for the inhale-exhale relation."""
def __init__(
self,
app,
inhale_max: int,
exhale_max: int,
default_value: Tuple[int, int],
):
super().__init__(app)
self.__value = self.__ie_to_int(default_value)
self.title_label = sg.Text(
"Relación I:E",
font=(app.config["FONT_FAMILY"], app.config["FONT_SIZE_MEDIUM"]),
)
self.value_label = sg.Text(
f"{default_value[0]}:{default_value[1]}",
size=(5, 1),
justification="right",
font=(app.config["FONT_FAMILY"], app.config["FONT_SIZE_MEDIUM"]),
)
self.slider = sg.Slider(
range=((inhale_max - 1) * -1, exhale_max - 1),
default_value=self.__value,
disable_number_display=True,
orientation="h",
size=(0, 50),
enable_events=True,
)
self.layout([[self.title_label, self.value_label], [self.slider]])
@property
def value(self) -> Tuple[int, int]:
inhale = exhale = 1
if self.__value > 0:
exhale = self.__value + 1
elif self.__value < 0:
inhale = abs(self.__value) + 1
return inhale, exhale
@value.setter
def value(self, value: Union[Tuple[int, int], float]):
if not isinstance(value, tuple):
self.__value = value
self.value_label.update(
":".join(str(i) for i in self.__int_to_ie(self.__value))
)
self.slider.update(self.__value)
else:
self.__value = self.__ie_to_int(value)
self.value_label.update(f"{value[0]}:{value[1]}")
self.slider.update(self.__value)
def handle_event(self, event: str, values: Dict):
if event == self.slider.Key:
self.value = int(values[event])
def show(self):
self.expand(expand_x=True)
self.title_label.expand(expand_x=True)
self.slider.expand(expand_x=True)
super().show()
def __ie_to_int(self, values: Tuple[int, int]) -> int:
"""Obtain the single integer version of the inhale-exhale relation.
Args:
values (Tuple[int, int]): Value as a tuple.
Returns:
int: The single integer representation.
"""
int_value = 0
if values[0] > 1:
int_value = (values[0] - 1) * -1
elif values[1] > 1:
int_value = values[1] - 1
return int_value
def __int_to_ie(self, value: int) -> Tuple[int, int]:
"""Obtain the tuple representation of the inhale-exhale relation.
Args:
value (int): Single integer representation.
Returns:
Tuple[int, int]: Value as a tuple.
"""
if value > 0:
return 1, value + 1
elif value < 0:
return abs(value) + 1, 1
else:
return 1, 1
-r requirements.txt
appdirs==1.4.4
black==20.8b1
click==7.1.2
flake8==3.9.0
mccabe==0.6.1
mypy-extensions==0.4.3
pathspec==0.8.1
pycodestyle==2.7.0
pyflakes==2.3.0
regex==2020.11.13
toml==0.10.2
typed-ast==1.4.2
typing-extensions==3.7.4.3
bidict==0.21.2
click==7.1.2
cycler==0.10.0
Flask==1.1.2
Flask-SocketIO==5.0.1
gevent==21.1.2
greenlet==1.0.0
itsdangerous==1.1.0
Jinja2==2.11.3
kiwisolver==1.3.1
MarkupSafe==1.1.1
matplotlib==3.3.4
numpy==1.20.1
pigpio==1.78
Pillow==8.1.2
pyparsing==2.4.7
PySimpleGUI==4.36.0
python-dateutil==2.8.1
python-engineio==4.0.1
python-socketio==5.1.0
pyzmq==22.0.3
six==1.15.0
Werkzeug==1.0.1
zope.event==4.5.0
zope.interface==5.2.0
#!/bin/bash
taskset -c 0 python -m controls & CONTROLS_PID=$!
taskset -c 1 python -m gui & GUI_PID=$!
taskset -c 2 python -m api & API_PID=$!
trap finish SIGINT
finish() {
kill -9 $CONTROLS_PID
kill -9 $GUI_PID
kill -9 $API_PID
exit
}
sleep infinity
#!/bin/bash
export ENV=development
source run
#!/bin/bash
export ENV=test
source run
import pathlib
import setuptools
HERE = pathlib.Path(__file__).parent
AUTHORS = (HERE / "AUTHORS").read_text()
README = (HERE / "README.md").read_text()
setuptools.setup(
name="respir-os",
version="1.0.0",
author=", ".join(AUTHORS.splitlines()),
license="LGPL-3",
description="""
Open design and implementation of a low-cost ventilator for COVID-19
patients.
""",
long_description=README,
long_description_content_type="text/markdown",
classifiers=[
"Programming Language :: Python :: 3.8",
"License :: OSI Approved :: GNU Lesser General Public License v3 (LGPLv3)",
"Intended Audience :: Healthcare Industry",
"Natural Language :: Spanish",
"Operating System :: Unix",
],
packages=["controls", "gui", "api"],
python_requires=">=3.8.6",
install_requires=[
"flask",
"flask-socketio",
"pigpio",
"pysimplegui",
"matplotlib",
"numpy",
"gevent",
"pyzmq",
],
scripts=["scripts/run", "scripts/run-dev", "scripts/run-test"],
)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment