Commit 0658772c authored by Víctor Vázquez Rodríguez's avatar Víctor Vázquez Rodríguez

Merge branch 'devel' into 'master'

Update build system and dependecy management

See merge request !1
parents 145aaaf0 481e2d6e
Pipeline #1870 failed with stage
image: python:3.7
before_script:
- pip install poetry
- poetry config virtualenvs.create false
- poetry install
Publish to PyPI:
stage: deploy
only:
- tags
script:
- poetry build
- poetry publish
# respiros-system
\ No newline at end of file
# Respir-OS
A medical ventilator system for the Raspberry Pi! Detailed information about
the design, installation and usage can be found on [the
wiki](https://ohwr.org/project/respir-os/wikis/home).
from gevent.pywsgi import WSGIServer
from api import create_app
app = create_app()
WSGIServer(("", app.config["PORT"]), app).serve_forever()
from controls import create_app
app = create_app()
app.run()
from gui import create_app
app = create_app()
app.run()
This diff is collapsed.
[tool.poetry]
name = "respir-os"
version = "0.1.0"
description = "Fully-functional ventilator system for the Raspberry Pi."
authors = [
"José Sánchez <josesanchez.oncort@gmail.com>",
"Ismael Martel <ismael.martel@gmail.com>",
"Carlos García <carlos.garcia@iesppg.net>",
"Ladislao Martínez <ladislao.m.garcia@gmail.com>",
"Jaime Lozano <jaimelozano@ugr.es>",
"Elio Valenzuela <elio@ugr.es>",
"Abel Cano <abelcano@ugr.es>"
]
maintainers = [
"Víctor Vázquez <victorvazrod@ugr.es>",
"Carlos Megías <narg@ugr.es>"
]
license = "LGPL-3.0-or-later"
repository = "https://ohwr.org/project/respir-os"
documentation = "https://ohwr.org/project/respir-os/wikis/home"
readme = "README.md"
keywords = [
"Healthcare",
"Raspberry Pi",
"Ventilator"
]
classifiers = [
"Programming Language :: Python :: 3.7",
"License :: OSI Approved :: GNU Lesser General Public License v3 (LGPLv3)",
"Intended Audience :: Healthcare Industry",
"Natural Language :: Spanish",
"Operating System :: Unix"
]
packages = [
{ include = "respir_os" }
]
[tool.poetry.dependencies]
python = "^3.7"
Flask = "^2.0.1"
Flask-SocketIO = "^5.1.0"
pigpio = "^1.78"
PySimpleGUI = "^4.45.0"
matplotlib = "^3.4.2"
numpy = "^1.21.0"
gevent = "^21.1.2"
pyzmq = "^22.1.0"
[tool.poetry.dev-dependencies]
black = "^21.6b0"
[tool.poetry.scripts]
respir-os = "respir_os.__main__:main"
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
-r requirements.txt
appdirs==1.4.4
black==20.8b1
click==7.1.2
flake8==3.9.0
mccabe==0.6.1
mypy-extensions==0.4.3
pathspec==0.8.1
pycodestyle==2.7.0
pyflakes==2.3.0
regex==2020.11.13
toml==0.10.2
typed-ast==1.4.2
typing-extensions==3.7.4.3
bidict==0.21.2
click==7.1.2
cycler==0.10.0
Flask==1.1.2
Flask-SocketIO==5.0.1
gevent==21.1.2
greenlet==1.0.0
itsdangerous==1.1.0
Jinja2==2.11.3
kiwisolver==1.3.1
MarkupSafe==1.1.1
matplotlib==3.3.4
numpy==1.20.1
pigpio==1.78
Pillow==8.1.2
pyparsing==2.4.7
PySimpleGUI==4.36.0
python-dateutil==2.8.1
python-engineio==4.0.1
python-socketio==5.1.0
pyzmq==22.0.3
six==1.15.0
Werkzeug==1.0.1
zope.event==4.5.0
zope.interface==5.2.0
import os
from multiprocessing import Process
import respir_os.api as api
import respir_os.controls as controls
import respir_os.gui as gui
def main():
os.environ["ENV"] = "development"
procs = [
Process(target=api.run),
Process(target=controls.run),
Process(target=gui.run),
]
try:
for p in procs:
p.start()
except KeyboardInterrupt:
for p in procs:
p.kill()
if __name__ == "__main__":
main()
......@@ -2,10 +2,10 @@ import logging
import os
from flask import Flask
from api.config import Config, DevelopmentConfig
from api.ipc import ipc
from api.sockets import socketio
from gevent.pywsgi import WSGIServer
from respir_os.api.config import Config, DevelopmentConfig
from respir_os.api.ipc import ipc
from respir_os.api.sockets import socketio
def create_app() -> Flask:
......@@ -28,11 +28,16 @@ def create_app() -> Flask:
logging_level = logging.INFO
else:
logging_level = logging.WARNING
logging.basicConfig(
filename="logs/api.log", filemode="w", level=logging_level
)
logging.basicConfig(filename="logs/api.log", filemode="w", level=logging_level)
socketio.init_app(app)
ipc.init_app(app)
return app
def run():
"""Run the app."""
app = create_app()
WSGIServer(("", app.config["PORT"]), app).serve_forever()
from common.config import BaseConfig
from respir_os.common.config import BaseConfig
class Config(BaseConfig):
......
import logging
import gevent
from common.ipc import IPCBuilder, Topic
from respir_os.common.ipc import IPCBuilder, Topic
class IPCDirector:
......@@ -27,9 +27,7 @@ class IPCDirector:
app.config["ZMQ_SYNC_API_ADDR"],
)
app.ipc = self.__builder.manager
app.extensions["socketio"].start_background_task(
self.__ipc_watchdog, app
)
app.extensions["socketio"].start_background_task(self.__ipc_watchdog, app)
def __ipc_watchdog(self, app):
while True:
......
......@@ -30,9 +30,7 @@ class IPCManager:
self.poller = None
def send(self, topic: Topic, body: Dict):
self.pub.send_multipart(
[topic.name.encode(), json.dumps(body).encode()]
)
self.pub.send_multipart([topic.name.encode(), json.dumps(body).encode()])
def recv(self, block=True) -> Tuple[Topic, Dict]:
if not self.subs:
......
import logging
import os
from controls.app import ControlApplication
from controls.config import Config, DevelopmentConfig, TestingConfig
from controls.context import ctx
from controls.ipc import ipc
from controls.pcb.controller import pcb
from controls.states.director import sd
from respir_os.controls.app import ControlApplication
from respir_os.controls.config import Config, DevelopmentConfig, TestingConfig
from respir_os.controls.context import ctx
from respir_os.controls.ipc import ipc
from respir_os.controls.pcb.controller import pcb
from respir_os.controls.states.director import sd
def create_app() -> ControlApplication:
......@@ -31,9 +31,7 @@ def create_app() -> ControlApplication:
logging_level = logging.INFO
else:
logging_level = logging.WARNING
logging.basicConfig(
filename="logs/controls.log", filemode="w", level=logging_level
)
logging.basicConfig(filename="logs/controls.log", filemode="w", level=logging_level)
ctx.init_app(app)
pcb.init_app(app)
......@@ -41,3 +39,10 @@ def create_app() -> ControlApplication:
ipc.init_app(app)
return app
def run():
"""Run the app."""
app = create_app()
app.run()
from common.config import BaseConfig
from respir_os.common.config import BaseConfig
class Config(BaseConfig):
......
......@@ -2,7 +2,7 @@ from dataclasses import dataclass
from datetime import datetime
from typing import Set
from common.alarms import Alarm, Type, Criticality
from respir_os.common.alarms import Alarm, Criticality, Type
@dataclass
......@@ -54,9 +54,7 @@ class Context:
self.freq_max = app.config["ALARM_FREQ_MAX"]
timestamp = datetime.now()
self.alarms = set(
[Alarm(t, Criticality.NONE, timestamp) for t in Type]
)
self.alarms = set([Alarm(t, Criticality.NONE, timestamp) for t in Type])
app.ctx = self
......
from common.ipc import IPCBuilder, Topic
from respir_os.common.ipc import IPCBuilder, Topic
class IPCDirector:
......
......@@ -14,9 +14,7 @@ class AnalogConverter:
An MCP3008 analog to digital converter.
"""
def __init__(
self, pi: pigpio.pi, channel: int, baud_rate: int, voltage: float
):
def __init__(self, pi: pigpio.pi, channel: int, baud_rate: int, voltage: float):
"""
Instantiate a new ADC connected to the Pi.
......
......@@ -9,9 +9,7 @@ class Servo:
A continuous servo that moves between 0 and 180 degrees.
"""
def __init__(
self, pi: pigpio.pi, gpio_pin: int, min_width: int, max_width: int
):
def __init__(self, pi: pigpio.pi, gpio_pin: int, min_width: int, max_width: int):
"""
Instantiate a new servo connected to the Pi on the given GPIO pin.
......@@ -42,7 +40,5 @@ class Servo:
if angle < 0 or angle > 180:
raise ValueError("The angle must be between 0 and 180 degrees.")
width = (angle / 180) * (
self.max_width - self.min_width
) + self.min_width
width = (angle / 180) * (self.max_width - self.min_width) + self.min_width
self.pi.set_servo_pulsewidth(self.gpio_pin, width)
import controls.states as states
import respir_os.controls.states as states
class StateDirector:
......
from datetime import datetime
import logging
import time
from datetime import datetime
from typing import Dict
import numpy as np
import zmq
from common.alarms import Alarm, Criticality, Type
from common.ipc import Topic
from controls.context import Context
from respir_os.common.alarms import Alarm, Criticality, Type
from respir_os.common.ipc import Topic
from respir_os.controls.context import Context
from .state import State
......@@ -137,17 +137,11 @@ class OperationState(State):
# operation
if not self.operation:
if (
not insp_exp
and time.time() >= self.time_saved + self.insp_duration
) or (
insp_exp
and time.time() >= self.time_saved + self.esp_duration
):
not insp_exp and time.time() >= self.time_saved + self.insp_duration
) or (insp_exp and time.time() >= self.time_saved + self.esp_duration):
insp_exp = not insp_exp
if (
insp_exp
): # Start espiration, save inspiration parameters
if insp_exp: # Start espiration, save inspiration parameters
self.__start_espiration()
logging.info("Espiration Controlled")
......@@ -158,24 +152,15 @@ class OperationState(State):
# Assisted Mode Operation: I, E transition. Triggered and airflow
# mode operation
else:
if (
insp_exp
): # Espiration. Detect trigger to change to inspiration.
if (
time.time()
> self.time_saved + self.esp_duration * 1.0 / 3.0
):
if insp_exp: # Espiration. Detect trigger to change to inspiration.
if time.time() > self.time_saved + self.esp_duration * 1.0 / 3.0:
if (
not self.airflow_trigger
or self.airflow_trigger
<= self.airflow_pressure_filtered
or self.airflow_trigger <= self.airflow_pressure_filtered
):
self.airflow_trigger = (
self.airflow_pressure_filtered
)
self.airflow_trigger = self.airflow_pressure_filtered
elif (
self.airflow_trigger
- self.airflow_pressure_filtered
self.airflow_trigger - self.airflow_pressure_filtered
>= self.app.ctx.trigger
and self.airflow_pressure_filtered < 0
):
......@@ -199,9 +184,7 @@ class OperationState(State):
):
# Start espiration, save inspiration parameters
logging.info("Airflow max: %f", self.airflow_max)
logging.info(
"Airflow: %f", self.airflow_pressure_filtered
)
logging.info("Airflow: %f", self.airflow_pressure_filtered)
insp_exp = not insp_exp
self.__start_espiration()
logging.info("Espiration Assisted")
......@@ -210,21 +193,17 @@ class OperationState(State):
if insp_exp: # Espiration
self.epap_read = (
(MEAN_IEPAP_LENGTH - 1) / MEAN_IEPAP_LENGTH
) * self.epap_read + (
1 / MEAN_IEPAP_LENGTH
) * self.gauge_pressure
) * self.epap_read + (1 / MEAN_IEPAP_LENGTH) * self.gauge_pressure
self.app.pcb.servo.set_angle(self.epap_angle)
# Check operation mode change
if (
not self.operation
and time.time()
>= self.time_saved + self.esp_duration * 1.0 / 3.0
and time.time() >= self.time_saved + self.esp_duration * 1.0 / 3.0
):
if (
not self.airflow_trigger
or self.airflow_trigger
<= self.airflow_pressure_filtered
or self.airflow_trigger <= self.airflow_pressure_filtered
):
self.airflow_trigger = self.airflow_pressure_filtered
elif (
......@@ -234,18 +213,14 @@ class OperationState(State):
):
logging.info(
"Trigger difference %f",
self.airflow_trigger
- self.airflow_pressure_filtered,
self.airflow_trigger - self.airflow_pressure_filtered,
)
self.operation = True
insp_exp = not insp_exp
self.__start_inspiration()
logging.info("Inspiration Assisted")
elif (
self.operation
and time.time() >= self.time_saved + APNEA_TIME
):
elif self.operation and time.time() >= self.time_saved + APNEA_TIME:
self.operation = False
insp_exp = not insp_exp
self.__start_inspiration()
......@@ -255,9 +230,7 @@ class OperationState(State):
else: # Inspiration
self.ipap_read = (
(MEAN_IEPAP_LENGTH - 1) / MEAN_IEPAP_LENGTH
) * self.ipap_read + (
1 / MEAN_IEPAP_LENGTH
) * self.gauge_pressure
) * self.ipap_read + (1 / MEAN_IEPAP_LENGTH) * self.gauge_pressure
self.app.pcb.servo.set_angle(self.ipap_angle)
......@@ -330,9 +303,7 @@ class OperationState(State):
"""Obtain respiration frequency and send cycle info."""
if self.mode:
self.freq_read = 60 / (
self.insp_duration_read + self.esp_duration_read
)
self.freq_read = 60 / (self.insp_duration_read + self.esp_duration_read)
else:
self.freq_read = 0
......@@ -357,9 +328,7 @@ class OperationState(State):
if self.app.ctx.ipap != self.ipap_old or not self.mode:
if not self.iepap_index[int(self.app.ctx.ipap - 1)]:
self.ipap_angle = (
20.009 * np.log(float(self.app.ctx.ipap)) + 37.997
)
self.ipap_angle = 20.009 * np.log(float(self.app.ctx.ipap)) + 37.997
else:
self.ipap_angle = self.iepap_table[int(self.app.ctx.ipap - 1)]
self.ipap_old = self.app.ctx.ipap
......@@ -379,9 +348,7 @@ class OperationState(State):
if self.app.ctx.epap != self.epap_old or not self.mode:
if not self.iepap_index[int(self.app.ctx.epap - 1)]:
self.epap_angle = (
20.009 * np.log(float(self.app.ctx.epap)) + 37.997
)
self.epap_angle = 20.009 * np.log(float(self.app.ctx.epap)) + 37.997
else:
self.epap_angle = self.iepap_table[int(self.app.ctx.epap - 1)]
self.epap_old = self.app.ctx.epap
......
......@@ -2,7 +2,7 @@ import logging
import time
from typing import Tuple
from common.ipc import Topic
from respir_os.common.ipc import Topic
from .failure import FailureState
from .stand_by import StandByState
......@@ -31,19 +31,13 @@ class SelfCheckState(State):
# TODO: Call the actual functions instead of using static values
dht_box_ok = True
logging.info(
"Box DHT sensor status: %s", "OK" if dht_box_ok else "FAIL"
)
logging.info("Box DHT sensor status: %s", "OK" if dht_box_ok else "FAIL")
dht_air_ok = True
logging.info(
"Air DHT sensor status: %s", "OK" if dht_air_ok else "FAIL"
)
logging.info("Air DHT sensor status: %s", "OK" if dht_air_ok else "FAIL")
gauge_ok, airflow_ok, servo_ok = (True, True, True)
logging.info(
"Gauge pressure sensor status: %s", "OK" if dht_air_ok else "FAIL"
)
logging.info("Gauge pressure sensor status: %s", "OK" if dht_air_ok else "FAIL")
logging.info(
"Airflow pressure sensor status: %s",
"OK" if dht_air_ok else "FAIL",
......@@ -51,9 +45,7 @@ class SelfCheckState(State):
logging.info("Servo status: %s", "OK" if dht_air_ok else "FAIL")
oxygen_ok = True
logging.info(
"Oxygen sensor status: %s", "OK" if dht_air_ok else "FAIL"
)
logging.info("Oxygen sensor status: %s", "OK" if dht_air_ok else "FAIL")
atm_ok = True
logging.info(
......@@ -101,10 +93,7 @@ class SelfCheckState(State):
self.app.pcb.dht_box.trigger()
tmp = self.app.pcb.dht_box.temperature()
hum = self.app.pcb.dht_box.humidity()
if (
BOX_TMP_MIN <= tmp <= BOX_TMP_MAX
or BOX_HUM_MIN <= hum <= BOX_HUM_MAX
):
if BOX_TMP_MIN <= tmp <= BOX_TMP_MAX or BOX_HUM_MIN <= hum <= BOX_HUM_MAX:
return True
time.sleep(2)
......@@ -145,16 +134,11 @@ class SelfCheckState(State):
gauge_pressure = self.app.pcb.gauge_ps.read()
airflow_pressure = self.app.pcb.airflow_ps.read()
if (
angle == 0
and GAUGE_MIN * 0.95 < gauge_pressure < GAUGE_MIN * 1.05
):
if angle == 0 and GAUGE_MIN * 0.95 < gauge_pressure < GAUGE_MIN * 1.05:
flag_gauge_0_ok = True
if (
angle == 0
and AIRFLOW_MAX * 0.9
< airflow_pressure
< AIRFLOW_MIN * 1.05
and AIRFLOW_MAX * 0.9 < airflow_pressure < AIRFLOW_MIN * 1.05
):
flag_flow_0_ok = True
......@@ -165,9 +149,7 @@ class SelfCheckState(State):
flag_gauge_180_ok = True
if (
angle == 180
and AIRFLOW_MAX * 0.95
< airflow_pressure
< AIRFLOW_MIN * 1.05
and AIRFLOW_MAX * 0.95 < airflow_pressure < AIRFLOW_MIN * 1.05
):
flag_flow_180_ok = True
......
import logging
import time
from common.ipc import Topic
from respir_os.common.ipc import Topic
from .operation import OperationState
from .state import State
......@@ -64,9 +64,7 @@ class StandByState(State):
time.sleep(0.0001)
airflow_voltage_offset = self.__calculate_voltage_airflow(
airflow_offset
)
airflow_voltage_offset = self.__calculate_voltage_airflow(airflow_offset)
gauge_voltage_offset = self.__calculate_voltage_gauge(gauge_offset)
self.app.pcb.airflow_ps.func = lambda v: (
......@@ -95,9 +93,7 @@ class StandByState(State):
def __calculate_voltage_airflow(self, airflow_offset):
return (
-172.15
+ (172.15 ** 2 - 4 * (-19.269) * (-276.2 - airflow_offset))
** (0.5)
-172.15 + (172.15 ** 2 - 4 * (-19.269) * (-276.2 - airflow_offset)) ** (0.5)
) / (2 * (-19.269))
def __calculate_voltage_gauge(self, gauge_offset):
......
import logging
import os
from gui.app import GUIApplication
from gui.config import Config, DevelopmentConfig
from gui.context import ctx
from gui.ipc import ipc
from gui.views.director import vd
from respir_os.gui.app import GUIApplication
from respir_os.gui.config import Config, DevelopmentConfig
from respir_os.gui.context import ctx
from respir_os.gui.ipc import ipc
from respir_os.gui.views.director import vd
def create_app() -> GUIApplication:
......@@ -28,12 +28,17 @@ def create_app() -> GUIApplication:
logging_level = logging.INFO
else:
logging_level = logging.WARNING
logging.basicConfig(
filename="logs/gui.log", filemode="w", level=logging_level
)
logging.basicConfig(filename="logs/gui.log", filemode="w", level=logging_level)
ctx.init_app(app)
vd.init_app(app)
ipc.init_app(app)
return app
def run():
"""Run the app."""
app = create_app()
app.run()
from common.config import BaseConfig
from respir_os.common.config import BaseConfig
class Config(BaseConfig):
......
from dataclasses import dataclass
from typing import List
from common.alarms import Alarm
from respir_os.common.alarms import Alarm
@dataclass
......
import threading
from enum import Enum, auto
from common.ipc import IPCBuilder, Topic
from respir_os.common.ipc import IPCBuilder, Topic
class ZMQEvent(Enum):
......@@ -43,9 +43,7 @@ class IPCDirector:
],
)
app.ipc = self.__builder.manager
threading.Thread(
target=self.__ipc_watchdog, args=[app], daemon=True
).start()
threading.Thread(target=self.__ipc_watchdog, args=[app], daemon=True).start()
def __ipc_watchdog(self, app):
while True:
......
import gui.views as views
import respir_os.gui.views as views
class ViewDirector:
......
from typing import Dict
import PySimpleGUI as sg
from gui.component import Component
from gui.ipc import ZMQEvent
from gui.views.parameters import ParametersView
from respir_os.gui.component import Component
from respir_os.gui.ipc import ZMQEvent
from respir_os.gui.views.parameters import ParametersView
class LoadingView(Component):
......@@ -24,9 +24,7 @@ class LoadingView(Component):
self.expander_top = sg.Text()
self.expander_bottom = sg.Text()
self.layout(
[[self.expander_top], [self.loading_label], [self.expander_bottom]]
)
self.layout([[self.expander_top], [self.loading_label], [self.expander_bottom]])
def handle_event(self, event: str, values: Dict):
if event == ZMQEvent.CHECK.name:
......@@ -36,7 +34,5 @@ class LoadingView(Component):
self.expand(expand_x=True, expand_y=True)
self.expander_top.expand(expand_x=True, expand_y=True)
self.expander_bottom.expand(expand_x=True, expand_y=True)
self.loading_label.expand(
expand_x=True, expand_y=True, expand_row=False
)
self.loading_label.expand(expand_x=True, expand_y=True, expand_row=False)
super().show()
from typing import Dict
from common.ipc import Topic
from gui.component import Component
from gui.widgets.control import ControlPane
from gui.widgets.monitor import MonitorBar
from gui.widgets.plots import PlotCanvas
from respir_os.common.ipc import Topic
from respir_os.gui.component import Component
from respir_os.gui.widgets.control import ControlPane
from respir_os.gui.widgets.monitor import MonitorBar
from respir_os.gui.widgets.plots import PlotCanvas
class OperationView(Component):
......
from typing import Dict
import PySimpleGUI as sg
from common.ipc import Topic
from gui.component import Component
from gui.views.operation import OperationView
from gui.widgets.sliders import IESlider, NumericSlider
from respir_os.common.ipc import Topic
from respir_os.gui.component import Component
from respir_os.gui.views.operation import OperationView
from respir_os.gui.widgets.sliders import IESlider, NumericSlider
class ParametersView(Component):
"""Parameter selection for operation start."""
def __init__(self, app):
super().__init__(
app, pad=(373, 0), visible=False, key="ParametersView"
)
super().__init__(app, pad=(373, 0), visible=False, key="ParametersView")
# Sliders
self.ipap = NumericSlider(
......
from typing import Dict
import PySimpleGUI as sg
from gui.component import Component
from gui.ipc import ZMQEvent
from respir_os.gui.component import Component
from respir_os.gui.ipc import ZMQEvent
from .tabs import AlarmsTab, HistoryTab, ParametersTab
......
from typing import Dict
import PySimpleGUI as sg
from common.ipc import Topic
from gui.widgets.sliders import NumericSlider
from respir_os.common.ipc import Topic
from respir_os.gui.widgets.sliders import NumericSlider
from .tab import ControlTab
......
from typing import Dict
import PySimpleGUI as sg
from common.alarms import Alarm, Criticality, Type
from common.ipc import Topic
from gui.ipc import ZMQEvent
from respir_os.common.alarms import Alarm, Criticality, Type
from respir_os.common.ipc import Topic
from respir_os.gui.ipc import ZMQEvent
from .tab import ControlTab
......@@ -104,9 +104,7 @@ class HistoryTab(ControlTab):
)
)
self.layout(
[[self.silence_btn]] + [[card] for card in self.alarm_cards]
)
self.layout([[self.silence_btn]] + [[card] for card in self.alarm_cards])
def handle_event(self, event: str, values: Dict):
if event == self.silence_btn.Key:
......
from typing import Dict
import PySimpleGUI as sg
from common.ipc import Topic
from gui.widgets.sliders import IESlider, NumericSlider
from respir_os.common.ipc import Topic
from respir_os.gui.widgets.sliders import IESlider, NumericSlider
from .tab import ControlTab
......@@ -134,18 +134,14 @@ class ParametersTab(ControlTab):
if self.app.ctx.mode.upper() == "VCP":
self.app.ctx.ipap = self.ipap_vps.value = self.ipap_vcp.value
self.app.ctx.epap = self.epap_vps.value = self.epap_vcp.value
self.app.ctx.trigger = (
self.trigger_vps.value
) = self.trigger_vcp.value
self.app.ctx.trigger = self.trigger_vps.value = self.trigger_vcp.value
self.app.ctx.freq = self.freq_vcp.value
self.app.ctx.inhale = self.ie_vcp.value[0]
self.app.ctx.exhale = self.ie_vcp.value[1]
else:
self.app.ctx.ipap = self.ipap_vcp.value = self.ipap_vps.value
self.app.ctx.epap = self.epap_vcp.value = self.epap_vps.value
self.app.ctx.trigger = (
self.trigger_vcp.value
) = self.trigger_vps.value
self.app.ctx.trigger = self.trigger_vcp.value = self.trigger_vps.value
if self.app.ctx.ipap <= self.app.ctx.epap and event in {
self.ipap_vcp.slider.Key,
......
from abc import ABCMeta, abstractmethod
from gui.component import Component
from respir_os.gui.component import Component
class ControlTab(Component, metaclass=ABCMeta):
......
from typing import Dict
import PySimpleGUI as sg
from common.alarms import Alarm, Criticality, Type
from gui.component import Component
from gui.ipc import ZMQEvent
from respir_os.common.alarms import Alarm, Criticality, Type
from respir_os.gui.component import Component
from respir_os.gui.ipc import ZMQEvent
class DisplayUnit(sg.Column):
......@@ -18,9 +18,7 @@ class DisplayUnit(sg.Column):
self.__value = 0.0
self.__metric = metric
self.title_label = sg.Text(
f"{title}:", font=(font_family, font_size, "bold")
)
self.title_label = sg.Text(f"{title}:", font=(font_family, font_size, "bold"))
self.value_label = sg.Text(
f"- {self.__metric}",
font=(font_family, font_size, "bold"),
......
......@@ -4,10 +4,10 @@ from typing import Dict
import matplotlib.pyplot as plt
import PySimpleGUI as sg
from common.ipc import Topic
from gui.component import Component
from gui.ipc import ZMQEvent
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from respir_os.common.ipc import Topic
from respir_os.gui.component import Component
from respir_os.gui.ipc import ZMQEvent
FIGURE_SECONDS = 20 # seconds
SAMPLE_PERIOD = 0.02 # samples
......
from typing import Dict, Tuple, Union
import PySimpleGUI as sg
from gui.component import Component
from respir_os.gui.component import Component
class NumericSlider(Component):
......
#!/bin/bash
taskset -c 0 python -m controls & CONTROLS_PID=$!
taskset -c 1 python -m gui & GUI_PID=$!
taskset -c 2 python -m api & API_PID=$!
trap finish SIGINT
finish() {
kill -9 $CONTROLS_PID
kill -9 $GUI_PID
kill -9 $API_PID
exit
}
sleep infinity
#!/bin/bash
export ENV=development
source run
#!/bin/bash
export ENV=test
source run
import pathlib
import setuptools
HERE = pathlib.Path(__file__).parent
AUTHORS = (HERE / "AUTHORS").read_text()
README = (HERE / "README.md").read_text()
setuptools.setup(
name="respir-os",
version="1.0.0",
author=", ".join(AUTHORS.splitlines()),
license="LGPL-3",
description="""
Open design and implementation of a low-cost ventilator for COVID-19
patients.
""",
long_description=README,
long_description_content_type="text/markdown",
classifiers=[
"Programming Language :: Python :: 3.8",
"License :: OSI Approved :: GNU Lesser General Public License v3 (LGPLv3)",
"Intended Audience :: Healthcare Industry",
"Natural Language :: Spanish",
"Operating System :: Unix",
],
packages=["controls", "gui", "api"],
python_requires=">=3.8.6",
install_requires=[
"flask",
"flask-socketio",
"pigpio",
"pysimplegui",
"matplotlib",
"numpy",
"gevent",
"pyzmq",
],
scripts=["scripts/run", "scripts/run-dev", "scripts/run-test"],
)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment