Skip to content
Snippets Groups Projects
Commit e433ca56 authored by Chris Burr's avatar Chris Burr
Browse files

Add more tests

parent 73bdb051
Branches
No related merge requests found
......@@ -228,10 +228,14 @@ if __name__ == "__main__":
#parser to allow us to pass arguments to hevserver
parser = argparse.ArgumentParser(description='Arguments to run hevserver')
parser.add_argument('--inputFile', type=str, default = '', help='a test file to load data')
parser.add_argument('-d', '--debug', action='store_true', help='Show debug output')
parser.add_argument('-d', '--debug', action='count', default=0, help='Show debug output')
parser.add_argument('--use-test-data', action='store_true', help='Use test data source')
args = parser.parse_args()
if args.debug:
if args.debug == 0:
logging.getLogger().setLevel(logging.WARNING)
elif args.debug == 1:
logging.getLogger().setLevel(logging.INFO)
else:
logging.getLogger().setLevel(logging.DEBUG)
if args.use_test_data:
......
......@@ -21,29 +21,71 @@ class HEVTestData:
# received queue and observers to be notified on update
self._payloadrecv = deque(maxlen = 16)
self._observers = []
self.current_timestamp = int(time.time() * 1000)
sendingWorker = threading.Thread(target=self.generate, daemon=True)
sendingWorker.start()
@property
def current_timestamp(self):
return self._current_timestamp
@current_timestamp.setter
def current_timestamp(self, timestamp):
self._current_timestamp = timestamp
def generate(self) -> None:
current_timestamp = int(time.time() * 1000) % 2**30
logging.info("Running tests with unstable step intervals for 10 seconds")
for i in range(100):
current_timestamp += random.randint(-20, 100)
self._send_message(current_timestamp)
time.sleep(0.1)
logging.info("Running tests with large jumps forward in time")
logging.critical("Running tests with unstable step intervals for 10 seconds")
self.current_timestamp = 0
self._send_message(self.current_timestamp)
time.sleep(2)
while self.current_timestamp < 5_000 or self.current_timestamp > 2**30:
interval = random.randint(-20, 100)
self._send_message(self.current_timestamp + interval)
if interval > 0:
self.current_timestamp += interval
time.sleep(interval / 1_000)
logging.critical("Running tests with large jumps forward in time")
time.sleep(2)
for interval in [1_000, 10_000, 100_000, 1_000_000]:
current_timestamp += interval
self._send_message(current_timestamp)
self.current_timestamp += interval
self._send_message(self.current_timestamp)
time.sleep(2)
logging.info("Looping forever at a high rate")
logging.critical("Running tests for overflow with stable steps for 10 seconds")
time.sleep(2)
self.current_timestamp = 2**32 - 5_000
self._send_message(self.current_timestamp)
interval = 50
for i in range(0, 10_000, interval):
self.current_timestamp += interval
self._send_message(self.current_timestamp)
time.sleep(interval / 1_000)
logging.critical("Rerunning tests with large jumps forward in time")
time.sleep(2)
for interval in [1_000, 10_000, 100_000, 1_000_000]:
self.current_timestamp += interval
self._send_message(self.current_timestamp)
time.sleep(2)
logging.critical("Running tests for overflow with unstable steps for 10 seconds")
time.sleep(2)
self.current_timestamp = 2**32 - 5_000
self._send_message(self.current_timestamp)
while self.current_timestamp < 2**32 + 5_000:
interval = random.randint(-100, 100)
self._send_message(self.current_timestamp + interval)
if interval > 0:
self.current_timestamp += interval
time.sleep(interval / 1_000)
logging.critical("Looping forever at a high rate")
time.sleep(2)
current_tick = 0
while True:
interval = 1
self._send_message(current_timestamp + current_tick * interval)
self._send_message(self.current_timestamp + current_tick * interval)
time.sleep(interval / 1000)
current_tick += 1
......@@ -51,23 +93,23 @@ class HEVTestData:
# directly setting private member variables in this edge case
payload = CommsCommon.DataFormat()
payload._version = payload._RPI_VERSION
payload._timestamp = timestamp
payload._timestamp = timestamp % 2**32
payload._fsm_state = "IDLE"
payload._pressure_air_supply = abs(math.sin((payload._timestamp/1000) * (math.pi)))* 0
payload._pressure_air_regulated = abs(math.sin((payload._timestamp/1000) * (math.pi))) * 0
payload._pressure_o2_supply = abs(math.sin((0.5 + payload._timestamp/1000) * (math.pi))) * 657
payload._pressure_o2_regulated = abs(math.sin((1.0 + payload._timestamp/1000) * (math.pi))) * 653
payload._pressure_buffer = abs(math.sin((1.5 + payload._timestamp/1000) * (math.pi))) * 496
payload._pressure_inhale = abs(math.sin((2.0 + payload._timestamp/1000) * (math.pi))) * 481
payload._pressure_patient = abs(math.sin((2.5 + payload._timestamp/1000) * (math.pi))) * 772
payload._temperature_buffer = math.sin((3.0 + payload._timestamp/1000) * (math.pi)) * 1000
payload._pressure_diff_patient = abs(math.sin((3.5 + payload._timestamp/1000) * (math.pi))) * 61
payload._readback_valve_air_in = abs(math.sin((payload._timestamp/1000) * (math.pi))) * 0
payload._readback_valve_o2_in = abs(math.sin((payload._timestamp/1000) * (math.pi))) * 0
payload._readback_valve_inhale = abs(math.sin((payload._timestamp/1000) * (math.pi))) * 0
payload._readback_valve_exhale = abs(math.sin((payload._timestamp/1000) * (math.pi))) * 0
payload._readback_valve_purge = abs(math.sin((payload._timestamp/1000) * (math.pi))) * 0
payload._readback_mode = abs(math.sin((payload._timestamp/1000) * (math.pi))) * 0
payload._pressure_air_supply = abs(math.sin((timestamp/1000) * (math.pi)))* 0
payload._pressure_air_regulated = abs(math.sin((timestamp/1000) * (math.pi))) * 0
payload._pressure_o2_supply = abs(math.sin((0.5 + timestamp/1000) * (math.pi))) * 657
payload._pressure_o2_regulated = abs(math.sin((1.0 + timestamp/1000) * (math.pi))) * 653
payload._pressure_buffer = abs(math.sin((1.5 + timestamp/1000) * (math.pi))) * 496
payload._pressure_inhale = abs(math.sin((2.0 + timestamp/1000) * (math.pi))) * 481
payload._pressure_patient = abs(math.sin((2.5 + timestamp/1000) * (math.pi))) * 772
payload._temperature_buffer = math.sin((3.0 + timestamp/1000) * (math.pi)) * 1000
payload._pressure_diff_patient = abs(math.sin((3.5 + timestamp/1000) * (math.pi))) * 61
payload._readback_valve_air_in = abs(math.sin((timestamp/1000) * (math.pi))) * 0
payload._readback_valve_o2_in = abs(math.sin((timestamp/1000) * (math.pi))) * 0
payload._readback_valve_inhale = abs(math.sin((timestamp/1000) * (math.pi))) * 0
payload._readback_valve_exhale = abs(math.sin((timestamp/1000) * (math.pi))) * 0
payload._readback_valve_purge = abs(math.sin((timestamp/1000) * (math.pi))) * 0
payload._readback_mode = abs(math.sin((timestamp/1000) * (math.pi))) * 0
self.payloadrecv = payload
# callback to dependants to read the received payload
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment