Skip to content
Snippets Groups Projects
Unverified Commit 1d21b835 authored by svihra's avatar svihra Committed by GitHub
Browse files

Merge pull request #2 from svihra/master

Python protocol
parents 30a90bfd 4c19894c
Branches
No related merge requests found
......@@ -3,7 +3,7 @@
#include <Arduino.h>
#define CONST_MAX_SIZE_QUEUE 1
#define CONST_MAX_SIZE_QUEUE 16
#define CONST_MAX_SIZE_PACKET 64
#define CONST_MAX_SIZE_BUFFER 128
#define CONST_MIN_SIZE_PACKET 7
......
#!/usr/bin/env python3
# Communication protocol between rasp and arduino based on HDLC format
# author Peter Svihra <peter.svihra@cern.ch>
import serial
from serial.tools import list_ports
import time
import commsFormat
from collections import deque
import logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
import asyncio
import serial_asyncio
# communication class that governs talking between devices
class commsControl(asyncio.Protocol):
def __init__(self, queueSize = 16):
# takes care of all serial port initializations
super().__init__()
self.transport_ = None
# queues are FIFO ring-buffers of the defined size
self.alarms_ = deque(maxlen = queueSize)
self.commands_ = deque(maxlen = queueSize)
self.data_ = deque(maxlen = queueSize)
# needed to find packet frames
self.received_ = []
self.foundStart_ = False
self.timeLastTransmission_ = int(round(time.time() * 1000))
# virtual function from asyncio.Protocol
def connection_made(self, transport):
self.transport_ = transport
logging.info('port opened')
self.transport_.serial.rts = False # no idea what this is, copy-pasted
# virtual function from asyncio.Protocol, reads from serial port, "random" number of bytes
def data_received(self, data):
self.receiver(data)
# virtual function from asyncio.Protocol
def connection_lost(self, exc):
logging.info('port closed')
self.transport_.loop.stop()
# virtual function from asyncio.Protocol
def pause_writing(self):
logging.info('pause writing')
logging.debug(self.transport_.get_write_buffer_size())
# virtual function from asyncio.Protocol
def resume_writing(self):
logging.debug(self.transport_.get_write_buffer_size())
logging.info('resume writing')
# have yet to figure out how to call this automatically
def sender(self):
self.checkQueue(self.alarms_ , 10)
self.checkQueue(self.commands_, 50)
self.checkQueue(self.data_ , 100)
def checkQueue(queue, timeout):
if len(queue) > 0:
if int(round(time.time() * 1000)) > (self.timeLastTransmission_ + timeout):
self.send(queue[0])
def receiver(self, data):
for byte in data:
byte = bytes([byte])
# TODO: this could be written in more pythonic way
# force read byte by byte
self.received_.append(byte)
logging.debug(byte)
if not self.foundStart_ and byte == bytes([0x7E]):
self.foundStart_ = True
self.receivedStart_ = len(self.received_)
elif byte == bytes([0x7E]) :
decoded = self.decoder(self.received_, self.receivedStart_)
if decoded is not None:
logging.debug("Preparing ACK")
self.send(commsFormat.commsACK(address = decoded[1]))
else:
logging.debug("Preparing NACK")
self.send(commsFormat.commsNACK(address = decoded[1]))
self.received_.clear()
self.foundStart_ = False
self.receivedStart_ = -1
def registerData(self, value):
tmpData = commsFormat.commsDATA()
tmpData.setInformation(value)
self.data_.append(tmpData)
def send(self, comms):
logging.debug("Sending data...")
self.transport_.write(self.encoder(comms.getData()))
# escape any 0x7D or 0x7E with 0x7D and swap bit 5
def escapeByte(self, byte):
if byte == 0x7D or byte == 0x7E:
return [0x7D, byte ^ (1<<5)]
else:
return [byte]
# encoding data according to the protocol - escape any 0x7D or 0x7E with 0x7D and swap 5 bit
def encoder(self, data):
try:
stream = [escaped for byte in data[1:-1] for escaped in self.escapeByte(byte)]
result = bytearray([data[0]] + stream + [data[-1]])
return result
except:
return None
# decoding data according to the defined protocol - escape any 0x7D or 0x7E with 0x7D and swap 5 bit
def decoder(self, data, start):
try:
packets = data[start:-1]
indRemove = [idx for idx in range(len(packets)) if packets[idx] == bytes([0x7D])]
indChange = [idx+1 for idx in indRemove]
stream = [packets[idx][0] ^ (1<<5) if idx in indChange else packets[idx][0] for idx in range(len(packets)) if idx not in indRemove]
result = bytearray([data[start - 1][0]] + stream + [data[-1][0]])
return result
except:
return None
if __name__ == "__main__" :
# get port number for arduino, mostly for debugging
for port in list_ports.comports():
try:
if "ARDUINO" in port.manufacturer.upper():
port = port.device
except:
pass
loop = asyncio.get_event_loop()
connection = serial_asyncio.create_serial_connection(loop, commsControl, port, baudrate=115200)
loop.run_until_complete(connection)
loop.run_forever()
loop.close()
\ No newline at end of file
#!/usr/bin/env python3
# Communication protocol based on HDLC format
# author Peter Svihra <peter.svihra@cern.ch>
import libscrc
# basic format based on HDLC
class commsFormat:
def __init__(self, infoSize = 0, address = 0x00, control = [0x00, 0x00]):
self.data_ = bytearray(7 + infoSize)
self.infoSize_ = infoSize
self.crc_ = None
self.assignBytes(self.getStart() , bytes([0x7E]))
self.assignBytes(self.getAddress(), bytes([address]))
self.assignBytes(self.getControl(), bytes(control))
self.assignBytes(self.getStop() , bytes([0x7E]))
def getStart(self):
return 0
def getAddress(self):
return 1
def getControl(self):
return 2
def getInformation(self):
return 4
def getFcs(self):
return 4 + self.infoSize_
def getStop(self):
return 4 + self.infoSize_ + 2
def assignBytes(self, start, values, isCrc = False):
for idx in range(len(values)):
self.data_[start + idx] = values[idx]
if not isCrc:
self.generateCrc()
# generate checksum
def generateCrc(self, assign = True):
self.crc_ = libscrc.x25(bytes(self.data_[self.getAddress():self.getFcs()])).to_bytes(2, byteorder='little')
if assign:
self.assignBytes(self.getFcs(), self.crc_, isCrc = True)
def compareCrc(self):
self.generateCrc(False)
return (self.crc_ == self.fcs_)
def setInformation(self, value, size = 2):
# convert provided value
self.assignBytes(self.getInformation(), value.to_bytes(size, byteorder='little'))
def getData(self):
return self.data_
def setData(self, data):
self.infoSize_ = len(data)
self.data_ = data.to_bytes(self.infoSize_, byteorder='little')
# DATA specific formating
class commsDATA(commsFormat):
def __init__(self):
super().__init__(infoSize = 8, address = 0x40)
# CMD specific formating
class commsCMD(commsFormat):
def __init__(self):
super().__init__(infoSize = 8, address = 0x80)
# ALARM specific formating
class commsALARM(commsFormat):
def __init__(self):
super().__init__(infoSize = 4, address = 0xC0)
# ACK specific formating
class commsACK(commsFormat):
def __init__(self, address):
super().__init__(control = [0x00, 0x01], address = address)
# NACK specific formating
class commsNACK(commsFormat):
def __init__(self, address):
super().__init__(control = [0x00, 0x05], address = address)
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment