Skip to content
Snippets Groups Projects
Commit 869288b9 authored by Peter Švihra's avatar Peter Švihra
Browse files

protocol refactor with proper sequence send/receive counting

- safety checks on continuous packet mismatch
- lowered all sending timeouts to 5 ms
- better ACK/NACK propagation
parent ede4d99b
Branches
No related merge requests found
...@@ -3,9 +3,9 @@ ...@@ -3,9 +3,9 @@
#include <Arduino.h> #include <Arduino.h>
#define CONST_TIMEOUT_ALARM 5 #define COSNT_TIMEOUT_TRANSFER 5
#define CONST_TIMEOUT_DATA 10 #define CONST_PACKET_RETRIES 10
#define CONST_TIMEOUT_CMD 50 #define CONST_MISMATCH_COUNTER 20
#define PAYLOAD_MAX_SIZE_BUFFER 128 #define PAYLOAD_MAX_SIZE_BUFFER 128
...@@ -30,8 +30,8 @@ ...@@ -30,8 +30,8 @@
#define COMMS_CONTROL_SUPERVISORY 0x01 #define COMMS_CONTROL_SUPERVISORY 0x01
#define COMMS_CONTROL_TYPES 0x0F #define COMMS_CONTROL_TYPES 0x0F
#define COMMS_CONTROL_ACK 0x00 | COMMS_CONTROL_SUPERVISORY #define COMMS_CONTROL_ACK (0x00 | COMMS_CONTROL_SUPERVISORY)
#define COMMS_CONTROL_NACK 0x04 | COMMS_CONTROL_SUPERVISORY #define COMMS_CONTROL_NACK (0x04 | COMMS_CONTROL_SUPERVISORY)
#define PACKET_TYPE 0xC0 #define PACKET_TYPE 0xC0
#define PACKET_ALARM 0xC0 #define PACKET_ALARM 0xC0
......
...@@ -15,14 +15,6 @@ CommsControl::CommsControl(uint32_t baudrate) { ...@@ -15,14 +15,6 @@ CommsControl::CommsControl(uint32_t baudrate) {
memset(_comms_received, 0, sizeof(_comms_received)); memset(_comms_received, 0, sizeof(_comms_received));
memset(_comms_send , 0, sizeof(_comms_send )); memset(_comms_send , 0, sizeof(_comms_send ));
// _ring_buff_alarm = RingBuf<CommsFormat, COMMS_MAX_SIZE_RB_SENDING>();
// _ring_buff_data = RingBuf<CommsFormat, COMMS_MAX_SIZE_RB_SENDING>();
// _ring_buff_cmd = RingBuf<CommsFormat, COMMS_MAX_SIZE_RB_SENDING>();
// _ring_buff_received = RingBuf<Payload, COMMS_MAX_SIZE_RB_RECEIVING>();
// _comms_tmp = CommsFormat(COMMS_MAX_SIZE_PACKET - COMMS_MIN_SIZE_PACKET );
CommsFormat::generateACK(_comms_ack); CommsFormat::generateACK(_comms_ack);
CommsFormat::generateNACK(_comms_nck); CommsFormat::generateNACK(_comms_nck);
...@@ -41,16 +33,14 @@ void CommsControl::beginSerial() { ...@@ -41,16 +33,14 @@ void CommsControl::beginSerial() {
// main function to always call and try and send data // main function to always call and try and send data
// _last_trans_time is changed when transmission occurs in sendQueue // _last_trans_time is changed when transmission occurs in sendQueue
void CommsControl::sender() { void CommsControl::sender() {
if (static_cast<uint32_t>(millis()) - _last_trans_time > CONST_TIMEOUT_ALARM) { if (static_cast<uint32_t>(millis()) - _last_trans_time > COSNT_TIMEOUT_TRANSFER) {
sendQueue(&_ring_buff_alarm); if (_packet_set) {
} resendPacket();
} else {
if (static_cast<uint32_t>(millis()) - _last_trans_time > CONST_TIMEOUT_CMD) { if (sendQueue(&_ring_buff_alarm)) { ; }
sendQueue(&_ring_buff_cmd); else if (sendQueue(&_ring_buff_cmd )) { ; }
} else if (sendQueue(&_ring_buff_data )) { ; }
}
if (static_cast<uint32_t>(millis()) - _last_trans_time > CONST_TIMEOUT_DATA) {
sendQueue(&_ring_buff_data);
} }
} }
...@@ -69,7 +59,7 @@ void CommsControl::receiver() { ...@@ -69,7 +59,7 @@ void CommsControl::receiver() {
_last_trans_index += Serial.readBytes(_last_trans + _last_trans_index, 1); _last_trans_index += Serial.readBytes(_last_trans + _last_trans_index, 1);
// if managed to read at least 1 byte // if managed to read at least 1 byte
if (_last_trans_index > 0 && _last_trans_index < COMMS_MAX_SIZE_BUFFER) { if (_last_trans_index > 0 && _last_trans_index < COMMS_MAX_SIZE_BUFFER - 1) {
current_trans_index = _last_trans_index - 1; current_trans_index = _last_trans_index - 1;
// find the boundary of frames // find the boundary of frames
...@@ -82,7 +72,7 @@ void CommsControl::receiver() { ...@@ -82,7 +72,7 @@ void CommsControl::receiver() {
// if managed to decode and compare CRC // if managed to decode and compare CRC
if (decoder(_last_trans, _start_trans_index, _last_trans_index)) { if (decoder(_last_trans, _start_trans_index, _last_trans_index)) {
_sequence_receive = (*(_comms_tmp.getControl()) >> 1 ) & 0x7F; uint8_t sequence_received = _comms_tmp.getSequenceReceive();
// to decide ACK/NACK/other; for other gain sequenceReceive // to decide ACK/NACK/other; for other gain sequenceReceive
uint8_t control = *(_comms_tmp.getControl() + 1); uint8_t control = *(_comms_tmp.getControl() + 1);
uint8_t address = *_comms_tmp.getAddress(); uint8_t address = *_comms_tmp.getAddress();
...@@ -94,27 +84,31 @@ void CommsControl::receiver() { ...@@ -94,27 +84,31 @@ void CommsControl::receiver() {
switch(control & COMMS_CONTROL_TYPES) { switch(control & COMMS_CONTROL_TYPES) {
case COMMS_CONTROL_NACK: case COMMS_CONTROL_NACK:
// received NACK // received NACK
// TODO: modify timeout for next sent frame?
// resendPacket(&address);
break; break;
case COMMS_CONTROL_ACK: case COMMS_CONTROL_ACK:
// received ACK // received ACK
finishPacket(type); finishPacket(sequence_received);
break; break;
default: default:
uint8_t sequence_receive = (control >> 1 ) & 0x7F;
sequence_receive += 1;
// received INFORMATION // received INFORMATION
if (receivePacket(type)) { uint8_t sequence = _comms_tmp.getSequenceSend();
_comms_ack.setAddress(&address); CommsFormat * response = &_comms_ack;
_comms_ack.setSequenceReceive(sequence_receive);
sendPacket(_comms_ack); // check counters
if (_sequence_receive != sequence) {
trackMismatch(sequence);
response = &_comms_nck;
} else { } else {
_comms_nck.setAddress(&address); resetReceiver(sequence + 1);
_comms_nck.setSequenceReceive(sequence_receive);
sendPacket(_comms_nck);
} }
// check proper unpacking
if(!receivePacket(type)) {
response = &_comms_nck;
}
response->setAddress(&address);
response->setSequenceReceive(_sequence_receive);
sendPacket(*response);
break; break;
} }
} }
...@@ -128,7 +122,7 @@ void CommsControl::receiver() { ...@@ -128,7 +122,7 @@ void CommsControl::receiver() {
break; break;
} }
} }
} else if (_last_trans_index >= COMMS_MAX_SIZE_BUFFER) { } else if (_last_trans_index >= COMMS_MAX_SIZE_BUFFER - 1) {
_last_trans_index = 0; _last_trans_index = 0;
} }
} }
...@@ -215,32 +209,55 @@ bool CommsControl::decoder(uint8_t* data, uint8_t data_start, uint8_t data_stop) ...@@ -215,32 +209,55 @@ bool CommsControl::decoder(uint8_t* data, uint8_t data_start, uint8_t data_stop)
} }
// sending anything of commsDATA format // sending anything of commsDATA format
void CommsControl::sendQueue(RingBuf<CommsFormat, COMMS_MAX_SIZE_RB_SENDING> *queue) { bool CommsControl::sendQueue(RingBuf<CommsFormat, COMMS_MAX_SIZE_RB_SENDING> *queue) {
// if have data to send // if have data to send
if (!queue->isEmpty()) { if (!queue->isEmpty()) {
queue->operator [](0).setSequenceSend(_sequence_send); _packet_set = queue->pop(_packet);
sendPacket(queue->operator [](0)); if (_packet_set) {
_packet.setSequenceSend(_sequence_send);
// reset sending counter sendPacket(_packet);
_last_trans_time = static_cast<uint32_t>(millis()); }
} }
return _packet_set;
} }
void CommsControl::sendPacket(CommsFormat &packet) { void CommsControl::sendPacket(CommsFormat &packet) {
// reset sending counter
_last_trans_time = static_cast<uint32_t>(millis());
// if encoded and able to write data // if encoded and able to write data
if (encoder(packet.getData(), packet.getSize()) ) { if (encoder(packet.getData(), packet.getSize()) ) {
if (Serial.availableForWrite() >= _comms_send_size) { if (Serial.availableForWrite() >= _comms_send_size) {
Serial.write(_comms_send, _comms_send_size); Serial.write(_comms_send, _comms_send_size);
} }
} }
} }
// resending the packet, can lower the timeout since either NACK or wrong FCS already checked void CommsControl::resendPacket() {
//WIP if ((++_packet_retries) < CONST_PACKET_RETRIES) {
void CommsControl::resendPacket(RingBuf<CommsFormat, COMMS_MAX_SIZE_RB_SENDING> *queue) { sendPacket(_packet);
; } else {
resetPacket();
}
}
void CommsControl::trackMismatch(uint8_t sequence_receive) {
if (_mismatch_counter++ > CONST_MISMATCH_COUNTER) {
resetReceiver(sequence_receive);
}
}
void CommsControl::resetReceiver(uint8_t sequence_receive) {
_mismatch_counter = 0;
if (sequence_receive != 0xFF) {
_sequence_receive = (sequence_receive & 0x7F);
}
} }
void CommsControl::resetPacket() {
_packet_set = false;
_packet_retries = 0;
}
// receiving anything of commsFormat // receiving anything of commsFormat
bool CommsControl::receivePacket(PRIORITY &type) { bool CommsControl::receivePacket(PRIORITY &type) {
...@@ -258,25 +275,19 @@ bool CommsControl::receivePacket(PRIORITY &type) { ...@@ -258,25 +275,19 @@ bool CommsControl::receivePacket(PRIORITY &type) {
} }
// if FCS is ok, remove from queue // if FCS is ok, remove from queue
void CommsControl::finishPacket(PRIORITY &type) { void CommsControl::finishPacket(uint8_t &sequence_received) {
RingBuf<CommsFormat, COMMS_MAX_SIZE_RB_SENDING> *queue = getQueue(type); // get the sequence send from first entry in the queue, add one as that should be return
// 0x7F to deal with possible overflows (0 should follow after 127)
if (queue != nullptr && !queue->isEmpty()) { uint8_t sequence = ((_sequence_send + 1) & 0x7F);
// get the sequence send from first entry in the queue, add one as that should be return if (sequence == sequence_received) {
// 0x7F to deal with possible overflows (0 should follow after 127) _sequence_send = sequence;
if (((queue->operator [](0).getSequenceSend() + 1) & 0x7F) == _sequence_receive) { resetPacket();
_sequence_send = (_sequence_send + 1) % 128;
CommsFormat comms_rm;
if (queue->pop(comms_rm)) {
;
}
}
} }
} }
PRIORITY CommsControl::getInfoType(uint8_t &address) { PRIORITY CommsControl::getInfoType(uint8_t &address) {
// return enum element corresponding to the address // return enum element corresponding to the address
return (PRIORITY)(address & PACKET_TYPE); return static_cast<PRIORITY>(address & PACKET_TYPE);
} }
// get link to queue according to packet format // get link to queue according to packet format
......
...@@ -29,10 +29,13 @@ private: ...@@ -29,10 +29,13 @@ private:
RingBuf<CommsFormat, COMMS_MAX_SIZE_RB_SENDING> *getQueue(PRIORITY &type); RingBuf<CommsFormat, COMMS_MAX_SIZE_RB_SENDING> *getQueue(PRIORITY &type);
PRIORITY getInfoType(uint8_t &address); PRIORITY getInfoType(uint8_t &address);
void sendQueue (RingBuf<CommsFormat, COMMS_MAX_SIZE_RB_SENDING> *queue); bool sendQueue(RingBuf<CommsFormat, COMMS_MAX_SIZE_RB_SENDING> *queue);
void resendPacket (RingBuf<CommsFormat, COMMS_MAX_SIZE_RB_SENDING> *queue); void resendPacket ();
void resetPacket ();
void trackMismatch(uint8_t sequence_receive);
void resetReceiver(uint8_t sequence_receive = 0xFF);
bool receivePacket(PRIORITY &type); bool receivePacket(PRIORITY &type);
void finishPacket (PRIORITY &type); void finishPacket (uint8_t &sequence_received);
bool encoder(uint8_t* payload, uint8_t data_size); bool encoder(uint8_t* payload, uint8_t data_size);
bool decoder(uint8_t* payload, uint8_t dataStart, uint8_t data_stop); bool decoder(uint8_t* payload, uint8_t dataStart, uint8_t data_stop);
...@@ -46,6 +49,10 @@ private: ...@@ -46,6 +49,10 @@ private:
CommsFormat _comms_ack; CommsFormat _comms_ack;
CommsFormat _comms_nck; CommsFormat _comms_nck;
uint8_t _mismatch_counter = 0;
CommsFormat _packet;
uint8_t _packet_retries = 0;
bool _packet_set = false;
RingBuf<CommsFormat, COMMS_MAX_SIZE_RB_SENDING> _ring_buff_alarm; RingBuf<CommsFormat, COMMS_MAX_SIZE_RB_SENDING> _ring_buff_alarm;
RingBuf<CommsFormat, COMMS_MAX_SIZE_RB_SENDING> _ring_buff_data; RingBuf<CommsFormat, COMMS_MAX_SIZE_RB_SENDING> _ring_buff_data;
RingBuf<CommsFormat, COMMS_MAX_SIZE_RB_SENDING> _ring_buff_cmd; RingBuf<CommsFormat, COMMS_MAX_SIZE_RB_SENDING> _ring_buff_cmd;
......
...@@ -143,6 +143,7 @@ class CommsPacket(object): ...@@ -143,6 +143,7 @@ class CommsPacket(object):
def __init__(self, rawdata): def __init__(self, rawdata):
self._data = rawdata self._data = rawdata
self._sequence_receive = 0 self._sequence_receive = 0
self._sequence_receive_ack = 0
self._address = None self._address = None
self._byteArray = None self._byteArray = None
self._datavalid = False self._datavalid = False
...@@ -155,6 +156,10 @@ class CommsPacket(object): ...@@ -155,6 +156,10 @@ class CommsPacket(object):
@property @property
def sequence_receive(self): def sequence_receive(self):
return self._sequence_receive return self._sequence_receive
@property
def sequence_send(self):
return self._sequence_send
@property @property
def address(self): def address(self):
...@@ -186,7 +191,7 @@ class CommsPacket(object): ...@@ -186,7 +191,7 @@ class CommsPacket(object):
tmp_comms.copyBytes(byteArray) tmp_comms.copyBytes(byteArray)
if tmp_comms.compareCrc(): if tmp_comms.compareCrc():
control = tmp_comms.getData()[tmp_comms.getControl()+1] control = tmp_comms.getData()[tmp_comms.getControl()+1]
self._sequence_receive = (tmp_comms.getData()[tmp_comms.getControl()] >> 1) & 0x7F self._sequence_receive = tmp_comms.getSequenceReceive()
self._address = tmp_comms.getData()[1] self._address = tmp_comms.getData()[1]
# get type of packet # get type of packet
...@@ -199,7 +204,7 @@ class CommsPacket(object): ...@@ -199,7 +204,7 @@ class CommsPacket(object):
self._acked = True self._acked = True
else: else:
# received data # received data
self._sequence_receive = ((control >> 1) & 0x7F) + 1 self._sequence_send = tmp_comms.getSequenceSend()
self._byteArray = tmp_comms.getData()[tmp_comms.getInformation():tmp_comms.getFcs()] self._byteArray = tmp_comms.getData()[tmp_comms.getInformation():tmp_comms.getFcs()]
else: else:
raise CommsChecksumError raise CommsChecksumError
......
...@@ -17,6 +17,9 @@ from CommsFormat import CommsPacket, CommsACK, CommsNACK, CommsChecksumError, ge ...@@ -17,6 +17,9 @@ from CommsFormat import CommsPacket, CommsACK, CommsNACK, CommsChecksumError, ge
logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s') logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s')
logging.getLogger().setLevel(logging.INFO) logging.getLogger().setLevel(logging.INFO)
class SequenceReceiveMismatch(Exception):
pass
class CommsLLI: class CommsLLI:
def __init__(self, loop, throttle=1, file='', number=10000): def __init__(self, loop, throttle=1, file='', number=10000):
super().__init__() super().__init__()
...@@ -46,7 +49,7 @@ class CommsLLI: ...@@ -46,7 +49,7 @@ class CommsLLI:
self._dv_data = asyncio.Event(loop=self._loop) self._dv_data = asyncio.Event(loop=self._loop)
# maps between address and queues/events/timeouts # maps between address and queues/events/timeouts
self._queues = {0xC0: self._alarms, 0x80: self._commands, 0x40: self._data} self._queues = {0xC0: self._alarms, 0x80: self._commands, 0x40: self._data}
self._timeouts = {0xC0: 10, 0x80: 50, 0x40: 200} self._timeouts = {0xC0: 5, 0x80: 5, 0x40: 5}
self._acklist = {0xC0: self._dv_alarms, 0x80: self._dv_commands, 0x40: self._dv_data} self._acklist = {0xC0: self._dv_alarms, 0x80: self._dv_commands, 0x40: self._dv_data}
# receive # receive
...@@ -58,7 +61,7 @@ class CommsLLI: ...@@ -58,7 +61,7 @@ class CommsLLI:
# packet counting # packet counting
self._sequence_send = 0 self._sequence_send = 0
self._sequence_receive = 0 self.resetReceiver(0)
async def main(self, device, baudrate): async def main(self, device, baudrate):
try: try:
...@@ -79,11 +82,12 @@ class CommsLLI: ...@@ -79,11 +82,12 @@ class CommsLLI:
logging.debug("Waiting for Command") logging.debug("Waiting for Command")
packet = await queue.get() packet = await queue.get()
packet.setSequenceSend(self._sequence_send) packet.setSequenceSend(self._sequence_send)
for send_attempt in range(5): for send_attempt in range(10):
# try to send the packet 5 times # try to send the packet 10 times
try: try:
await self.sendPacket(packet) await self.sendPacket(packet)
await asyncio.wait_for(self._acklist[address].wait(), timeout=self._timeouts[address] / 1000) await asyncio.wait_for(self._acklist[address].wait(), timeout=self._timeouts[address] / 1000)
self._acklist[address].clear()
except asyncio.TimeoutError: except asyncio.TimeoutError:
pass pass
except Exception: except Exception:
...@@ -140,9 +144,11 @@ class CommsLLI: ...@@ -140,9 +144,11 @@ class CommsLLI:
async def sendPacket(self, packet): async def sendPacket(self, packet):
if isinstance(packet, CommsACK): if isinstance(packet, CommsACK):
# don't log acks # don't log acks
logging.debug(f"Sending ACK: {binascii.hexlify(packet.encode())}")
pass pass
elif isinstance(packet, CommsNACK): elif isinstance(packet, CommsNACK):
logging.warning(f"Sending NACK: {binascii.hexlify(packet.encode())}") # logging.debug(f"Sending NACK: {binascii.hexlify(packet.encode())}")
pass
else: else:
logging.info(f"Sending {binascii.hexlify(packet.encode())}") logging.info(f"Sending {binascii.hexlify(packet.encode())}")
self._writer.write(packet.encode()) self._writer.write(packet.encode())
...@@ -157,20 +163,33 @@ class CommsLLI: ...@@ -157,20 +163,33 @@ class CommsLLI:
break break
return CommsPacket(bytearray(rawdata)) return CommsPacket(bytearray(rawdata))
def finishPacket(self, address): def finishPacket(self, address, sequence_received):
self._sequence_send = (self._sequence_send + 1) % 128
try: try:
self._queues[address].task_done() sequence = (self._sequence_send + 1) % 128
if sequence == sequence_received:
self._queues[address].task_done()
self._sequence_send = sequence
except ValueError: except ValueError:
# task has already been purged from queue # task has already been purged from queue
pass pass
else: else:
self._acklist[address].set() self._acklist[address].set()
def resetReceiver(self, sequence_receive = -1):
self._mismatch_counter = 0
if sequence_receive >= 0:
self._sequence_receive = sequence_receive
def trackMismatch(self, sequence_receive):
if self._mismatch_counter > 20 :
self.resetReceiver(sequence_receive)
logging.warning(f"Received more than 20 sequence_receive mismatches, resetting")
else:
self._mismatch_counter += 1
async def recv(self): async def recv(self):
while self._connected: while self._connected:
packet = await self.readPacket() packet = await self.readPacket()
try: try:
data = packet.decode() data = packet.decode()
except CommsChecksumError: except CommsChecksumError:
...@@ -183,28 +202,46 @@ class CommsLLI: ...@@ -183,28 +202,46 @@ class CommsLLI:
if packet.acked: if packet.acked:
logging.info("Received ACK") logging.info("Received ACK")
# increase packet counter # increase packet counter
self.finishPacket(packet.address) self.finishPacket(packet.address, packet.sequence_receive)
else: else:
logging.debug("Received NACK") logging.info("Received NACK")
else: else:
# packet should contain valid data # packet should contain valid data
try: try:
payload = PayloadFormat.fromByteArray(packet.byteArray) payload = PayloadFormat.fromByteArray(packet.byteArray)
self.sequence_receive = packet.sequence_send
self.resetReceiver()
self.payloadrecv = payload self.payloadrecv = payload
logging.debug(f"Received payload type {payload.getType()} for timestamp {payload.timestamp}") # logging.debug(f"Received payload type {payload.getType()} for timestamp {payload.timestamp}")
comms_response = CommsACK(packet.address) comms_response = CommsACK(packet.address)
except (StructError, ValueError): except (StructError, ValueError):
# invalid payload, but valid checksum - this is bad! # invalid payload, but valid checksum - this is bad!
logging.error(f"Invalid payload: {payload}") logging.error(f"Invalid payload: {payload}")
# restart/reflash/swap to redundant microcontroller? # restart/reflash/swap to redundant microcontroller?
comms_response = CommsNACK(packet.address) comms_response = CommsNACK(packet.address)
except SequenceReceiveMismatch:
# logging.debug(f"Mismatch sequence receive, expected: {self.sequence_receive}; received: {packet.sequence_send}")
comms_response = CommsNACK(packet.address)
self.trackMismatch(packet.sequence_send)
except HEVVersionError as e: except HEVVersionError as e:
logging.critical(f"HEVVersionError: {e}") logging.critical(f"HEVVersionError: {e}")
exit(1) exit(1)
finally: finally:
comms_response.setSequenceReceive(packet.sequence_receive) comms_response.setSequenceReceive(self.sequence_receive)
await self.sendPacket(comms_response) await self.sendPacket(comms_response)
@property
def sequence_receive(self):
return self._sequence_receive
@sequence_receive.setter
def sequence_receive(self, sequence):
if self._sequence_receive == sequence:
self._sequence_receive = (self._sequence_receive + 1) % 128
else:
raise SequenceReceiveMismatch
# callback to dependants to read the received payload # callback to dependants to read the received payload
@property @property
def payloadrecv(self): def payloadrecv(self):
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment