Skip to content
Snippets Groups Projects
Commit fac137f3 authored by Dónal Murray's avatar Dónal Murray
Browse files

Make queues into asyncio queues and set timeouts for sending - fully working with CommsDebug

parent 7488b6be
Branches
Tags
No related merge requests found
......@@ -123,9 +123,9 @@ class PAYLOAD_TYPE(IntEnum):
@dataclass
class PayloadFormat():
# class variables excluded from init args and output dict
_RPI_VERSION: ClassVar[int] = field(default=0xA2, init=False, repr=False)
_dataStruct: ClassVar[Any] = field(default=Struct("<BIB"), init=False, repr=False)
_byteArray: bytearray = field(default=None, init=False, repr=False)
_RPI_VERSION: ClassVar[int] = field(default=0xA2, init=False, repr=False)
_dataStruct: ClassVar[Any] = field(default=Struct("<BIB"), init=False, repr=False)
_byteArray: ClassVar[bytearray] = field(default=None, init=False, repr=False)
# Meta information
version: int = 0
......@@ -168,10 +168,10 @@ class PayloadFormat():
return self._RPI_VERSION == self.version
def getSize(self) -> int:
return len(self._byteArray)
return len(self.byteArray)
def getType(self) -> Any:
return self.payload_type.name if isinstance(self.payload_type, IntEnum) else self.payload_type
return self.payload_type
def getDict(self) -> Dict:
return {k: v.name if isinstance(v, IntEnum) or isinstance(v, Enum) else v for k, v in asdict(self).items()}
......@@ -184,6 +184,7 @@ class PayloadFormat():
class DataFormat(PayloadFormat):
# subclass dataformat
_dataStruct = Struct("<BIBBHHHHHHHHHHHfff")
payload_type: PAYLOAD_TYPE = PAYLOAD_TYPE.DATA
# subclass member variables
fsm_state: BL_STATES = BL_STATES.IDLE
......@@ -237,6 +238,7 @@ class DataFormat(PayloadFormat):
@dataclass
class ReadbackFormat(PayloadFormat):
_dataStruct = Struct("<BIBHHHHHHHHHHHBBBBBBBBBBBBBBf")
payload_type: PAYLOAD_TYPE = PAYLOAD_TYPE.READBACK
duration_calibration: int = 0
duration_buff_purge: int = 0
......@@ -311,6 +313,7 @@ class ReadbackFormat(PayloadFormat):
class CycleFormat(PayloadFormat):
# subclass dataformat
_dataStruct = Struct("<BIBfffffffffHHHHBHHB")
payload_type: PAYLOAD_TYPE = PAYLOAD_TYPE.CYCLE
respiratory_rate: float = 0.0
tidal_volume: float = 0.0
......@@ -369,6 +372,7 @@ class CycleFormat(PayloadFormat):
@dataclass
class CommandFormat(PayloadFormat):
_dataStruct = Struct("<BIBBBI")
payload_type: PAYLOAD_TYPE = PAYLOAD_TYPE.CMD
cmd_type: int = 0
cmd_code: int = 0
......@@ -390,6 +394,7 @@ class CommandFormat(PayloadFormat):
@dataclass
class AlarmFormat(PayloadFormat):
_dataStruct = Struct("<BIBBBI")
payload_type: PAYLOAD_TYPE = PAYLOAD_TYPE.ALARM
alarm_type: int = 0
alarm_code: ALARM_CODES = ALARM_CODES.UNKNOWN
......
#!/usr/bin/env python3
from CommsControl import CommsControl
from CommsLLI import CommsLLI
from CommsCommon import *
import asyncio
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
import sys
import time
comms = CommsControl(port = sys.argv[1])
class Dependant(object):
def __init__(self, lli):
......@@ -17,24 +16,42 @@ class Dependant(object):
def update_llipacket(self, payload):
logging.info(f"payload received: {payload}")
#logging.info(f"payload received: {payload.fsm_state}")
if payload.getType() == 1:
logging.info(f"Fsm state: {payload.fsm_state}")
#logging.info(f"payload received: {payload.timestamp}")
#logging.info(f"payload received: {payload.readback_valve_o2_in} {payload.readback_valve_inhale} {payload.readback_valve_exhale} {payload.readback_valve_purge} {payload.fsm_state}")
self._llipacket = payload.getDict() # returns a dict
# pop from queue - protects against Dependant going down and not receiving packets
self._lli.pop_payloadrecv()
dep = Dependant(comms)
# initialise as start command, automatically executes toByteArray()
cmd = CommandFormat(cmd_type=CMD_TYPE.GENERAL.value, cmd_code=CMD_GENERAL.START.value, param=0)
time.sleep(4)
comms.writePayload(cmd)
print('sent cmd start')
while True:
time.sleep(20)
cmd.cmd_code = CMD_GENERAL.STOP.value # automatically executes toByteArray()
async def commsDebug():
cmd = CommandFormat(cmd_type=CMD_TYPE.GENERAL.value, cmd_code=CMD_GENERAL.START.value, param=0)
await asyncio.sleep(4)
comms.writePayload(cmd)
print('sent cmd stop')
pass
print('sent cmd start')
while True:
await asyncio.sleep(20)
cmd.cmd_code = CMD_GENERAL.STOP.value # automatically executes toByteArray()
comms.writePayload(cmd)
print('sent cmd stop')
try:
# setup serial device and init server
loop = asyncio.get_event_loop()
comms = CommsLLI(loop)
dep = Dependant(comms)
# create tasks
lli = comms.main(sys.argv[1], 115200)
debug = commsDebug()
tasks = [lli, debug]
# run tasks
asyncio.gather(*tasks, return_exceptions=True)
loop.run_forever()
except asyncio.CancelledError:
pass
except KeyboardInterrupt:
logging.info("Closing LLI")
finally:
loop.close()
......@@ -21,17 +21,20 @@ def commsFromBytes(byteArray):
def generateAlarm(payload):
comms = CommsFormat(info_size = payload.getSize(), address = 0xC0)
comms.setInformation(payload.byteArray)
bytelist = bytes([byte for byte in bytearray(payload.byteArray)])
comms.setInformation(bytelist)
return comms
def generateCmd(payload):
comms = CommsFormat(info_size = payload.getSize(), address = 0x80)
comms.setInformation(payload.byteArray)
bytelist = bytes([byte for byte in bytearray(payload.byteArray)])
comms.setInformation(bytelist)
return comms
def generateData(payload):
comms = CommsFormat(info_size = payload.getSize(), address = 0x40)
comms.setInformation(payload.byteArray)
bytelist = bytes([byte for byte in bytearray(payload.byteArray)])
comms.setInformation(bytelist)
return comms
class CommsChecksumError(Exception):
......
......@@ -12,10 +12,10 @@ from CommsFormat import CommsPacket, CommsACK, CommsNACK, CommsChecksumError, ge
# Set up logging
logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s')
logging.getLogger().setLevel(logging.DEBUG)
logging.getLogger().setLevel(logging.INFO)
class CommsLLI:
def __init__(self, loop, queue):
def __init__(self, loop):
super().__init__()
# IO
self._loop = loop
......@@ -25,13 +25,18 @@ class CommsLLI:
# send
self._queue_size = 16
self._alarms = deque(maxlen = self._queue_size)
self._commands = deque(maxlen = self._queue_size)
self._data = deque(maxlen = self._queue_size)
# map of {timeout: queue}
self._queues = {10: self._alarms, 50: self._commands, 200: self._data}
self._datavalid = asyncio.Event()
self._timeLastTransmission = int(round(time.time() * 1000)) # in seconds
self._alarms = asyncio.Queue(maxsize=self._queue_size, loop=self._loop)
self._commands = asyncio.Queue(maxsize=self._queue_size, loop=self._loop)
self._data = asyncio.Queue(maxsize=self._queue_size, loop=self._loop)
# acks from arduino
self._dvAlarms = asyncio.Event(loop=self._loop)
self._dvCommands = asyncio.Event(loop=self._loop)
self._dvData = asyncio.Event(loop=self._loop)
# maps between address and queues/events/timeouts
self._queues = {0xC0: self._alarms, 0x80: self._commands, 0x40: self._data}
self._timeouts = {0xC0: 10, 0x80: 50, 0x40: 200}
self._acklist = {0xC0: self._dvAlarms, 0x80: self._dvCommands, 0x40: self._dvData}
# receive
#self._payloadrecv = asyncio.Queue(maxsize=self._queue_size, loop=self._loop)
......@@ -46,52 +51,70 @@ class CommsLLI:
self._reader, self._writer = await serial_asyncio.open_serial_connection(url=device, baudrate=baudrate, timeout = 2, dsrdtr = True)
self._connected = True
while self._connected:
sender = self.send()
#sendAlarm = self.send(0xC0)
sendCmd = self.send(0x80)
#sendData = self.send(0x40)
receiver = self.recv()
await asyncio.gather(*[receiver, sender], return_exceptions=True)
await asyncio.gather(*[receiver, sendCmd], return_exceptions=True)
async def send(self):
async def send(self, address):
queue = self._queues[address]
while self._connected:
await self._datavalid.wait()
for timeout, queue in self._queues:
if len(queue) > 0:
logging.debug(f'Queue length: {len(queue)}')
current_time = int(round(time.time() * 1000))
if current_time > (self._timeLastTransmission + timeout):
self._timeLastTransmission = current_time
queue[0].setSequenceSend(self._sequence_send)
self.sendPacket(queue[0])
if len([packet for queue in self._queues.values() for packet in queue]) == 0:
self._datavalid.clear()
logging.debug("Waiting for Command")
packet = await queue.get()
packet.setSequenceSend(self._sequence_send)
for send_attempt in range(5):
# try to send the packet 5 times
try:
await self.sendPacket(packet)
await asyncio.wait_for(self._acklist[address].wait(), timeout=self._timeouts[address] / 1000)
except asyncio.TimeoutError:
pass
except Exception:
# catch everything else and propagate it
raise
else:
# acknowledged
break
def writePayload(self, payload):
PAYLOAD_TYPE_TO_GEN = {
1: generateData,
2: generateData,
3: generateData,
4: generateData,
5: generateCmd,
6: generateAlarm,
}
generatePacket = PAYLOAD_TYPE_TO_GEN[payload.getType()]
tmp_comms = generatePacket(payload)
PAYLOAD_TYPE_TO_QUEUE = {
1: self._queues[2],
2: self._queues[2],
3: self._queues[2],
4: self._queues[2],
5: self._queues[1],
6: self._queues[0],
}
queue = PAYLOAD_TYPE_TO_QUEUE[payload.getType()]
queue.append(tmp_comms)
self._datavalid.set()
return True
try:
# TODO replace these two dicts with address mappings like everywhere else
PAYLOAD_TYPE_TO_GEN = {
1: generateData,
2: generateData,
3: generateData,
4: generateData,
5: generateCmd,
6: generateAlarm,
}
generatePacket = PAYLOAD_TYPE_TO_GEN[payload.getType()]
tmp_comms = generatePacket(payload)
qlist = [v for v in self._queues.values()]
PAYLOAD_TYPE_TO_QUEUE = {
1: qlist[2],
2: qlist[2],
3: qlist[2],
4: qlist[2],
5: qlist[1],
6: qlist[0],
}
queue = PAYLOAD_TYPE_TO_QUEUE[payload.getType()]
queue.put_nowait(tmp_comms)
return True
except KeyError:
return False
async def sendPacket(self, packet):
logging.debug(f"Sending {binascii.hexlify(packet.encode())}")
if isinstance(packet, CommsACK):
# don't log acks
pass
elif isinstance(packet, CommsNACK):
logging.warning(f"Sending NACK: {binascii.hexlify(packet.encode())}")
else:
logging.info(f"Sending {binascii.hexlify(packet.encode())}")
self._writer.write(packet.encode())
async def readPacket(self):
......@@ -105,17 +128,9 @@ class CommsLLI:
return CommsPacket(bytearray(rawdata))
def finishPacket(self, address):
ADDRESS_TO_QUEUE = {
0x40: self._queues[2],
0x80: self._queues[1],
0xC0: self._queues[0],
}
queue = ADDRESS_TO_QUEUE[address]
if len(queue) > 0:
# 0x7F to deal with possible overflows (0 should follow after 127)
if ((queue[0].getSequenceSend() + 1) & 0x7F) == self._sequence_receive:
self._sequence_send = (self._sequence_send + 1) % 128
queue.popleft()
self._sequence_send = (self._sequence_send + 1) % 128
self._queues[address].task_done()
self._acklist[address].set()
async def recv(self):
while self._connected:
......@@ -131,8 +146,8 @@ class CommsLLI:
if data is None:
# packet is an ack/nack from previously sent data
if packet.acked:
logging.debug("Received ACK")
#pop packet from sending queue
logging.info("Received ACK")
# increase packet counter
self.finishPacket(packet.address)
else:
logging.debug("Received NACK")
......@@ -141,7 +156,7 @@ class CommsLLI:
try:
payload = PayloadFormat.fromByteArray(packet.byteArray)
self.payloadrecv = payload
logging.info(f"Received payload type {payload.getType()} for timestamp {payload.timestamp}")
logging.debug(f"Received payload type {payload.getType()} for timestamp {payload.timestamp}")
comms_response = CommsACK(packet.address)
except (StructError, ValueError):
# invalid payload, but valid checksum - this is bad!
......@@ -159,36 +174,21 @@ class CommsLLI:
@payloadrecv.setter
def payloadrecv(self, payload):
self._payloadrecv.append(payload)
logging.debug(f"Pushed {payload} to FIFO")
for callback in self._observers:
# peek at the leftmost item, don't pop until receipt confirmed
callback(self._payloadrecv[0])
callback(payload)
def bind_to(self, callback):
self._observers.append(callback)
def pop_payloadrecv(self):
# from callback. confirmed receipt, pop value
poppedval = self._payloadrecv.popleft()
logging.debug(f"Popped {poppedval} from FIFO")
if len(self._payloadrecv) > 0:
# purge full queue if Dependant goes down when it comes back up
for callback in self._observers:
callback(self._payloadrecv[0])
if __name__ == "__main__":
try:
# setup sender and receiver
read_queue = asyncio.Queue()
write_queue = asyncio.Queue()
# schedule async tasks
loop = asyncio.get_event_loop()
# setup serial devices
comms = CommsLLI(loop, read_queue)
comms = CommsLLI(loop)
asyncio.gather(comms.main("/dev/ttyUSB0", 115200), return_exceptions=True)
loop.run_forever()
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment