One of the better ways is using COBS (Consistent Overhead Byte Stuffing) to packetise data.
See the Cheshire & Baker paper linked at end of the wiki article:
https://en.wikipedia.org/wiki/Consisten ... e_Stuffing
Here is a micropython asyncio example of a COBS encoder / decoder for uart binary data packets.
Not exactly like the Cheshire & Baker paper as it adds 0x00 delimiter bytes and includes a CRCITT over the packet data.
This will simply echo back any valid packet received.
Code: Select all
"""
This code will only run in MicroPython and it requires the uasyncio module.
This is a MicroPython implementation of Consistent Overhead Byte Stuffing,
an efficient method of creating binary data packets for transfer over a serial
channel. For details do an internet search for this openly available paper:
Cheshire & Baker, Consistent Overhead Byte Stuffing filetype:pdf
IEEE/ACM TRANSACTIONS ON NETWORKING, VOL.7, NO. 2, APRIL 1999
The UartCobs class provides methods for binary data packet transfer over UART with CRC protection
The code below is a simple example which just echos back any valid packet recieved.
Import to run.
"""
import uasyncio as asyncio
from machine import UART
import os, pyb
from asyn import Event
class UartCobs():
def __init__(self, uartn:int, baud:int, max_tx_len, max_rx_len):
"""Note the UART receive ring buffer (read_buf_len) should be
appropriate for size and rate of incoming COBS frames
"""
self._max_tx_len = max_tx_len + round(max_tx_len/5) + 7 # Allow space for two byte CRC and COBS formatting bytes
self._max_rx_msg_len = max_rx_len + 3 # Allow space for two byte CRC and the COBS phantom byte
self._max_frame_rx_len = max_rx_len + round(max_rx_len/5) + 5 # Received frame will have CRC and COBS formatting bytes
self.uart = UART(uartn)
#You may need to increase uart rx ringbuffer depending on demand for processor time by other tasks, default is 64 bytes
self.uart.init(baudrate = baud, bits=8, parity=None, stop=1, timeout=0, flow=0, timeout_char=0, read_buf_len=128)
self._swriter = asyncio.StreamWriter(self.uart, {})
self._txfrm = bytearray(self._max_tx_len)
self.rxmsg = bytearray(self._max_rx_msg_len)
async def send(self, txmsg:bytearray, msg_len):
"""Task for encoding and sending COBS frames over the UART
txmsg muust have space for the two byte CRC to be appended
"""
crc = self._crc16_ccitt(0,txmsg,msg_len)
txmsg[msg_len] = crc >> 8
txmsg[msg_len+1] = crc % 256
msg_len += 2
frame_len = self._encode(txmsg, msg_len)
await self._swriter.awrite(self._txfrm, 0, frame_len)
async def _getbyte(self):
#Using await streamreader here was very slow, so using
#this method which will hog processor until the uart ringbuffer is emptied.
#Shouldn't be an issue since uart comms is relatively slow.
while self.uart.any() == 0:
await asyncio.sleep_ms(2) #Be careful not to overflow uart ringbuffer (2ms at 115Kbaud is about 23 bytes) but other tasks may keep processor for longer
return self.uart.read(1)
async def receive(self, event_rx):
"""State machine task for COBS receiver"""
rxpkt = self.rxmsg
while True:
while True: #Wait for 0x00 frame start
c = await self._getbyte()
if c[0] == 0:
break
while True: #Wait for non-zero byte (the first code byte of a frame)
c = await self._getbyte()
if c[0] != 0:
break
cnt=0 #count of bytes decoded from the frame and output to the packet
while True:
#Start a new COBS block because of new frame or
#we just finished a block in the current frame
code = c[0]
i=1
while i < code:
c = await self._getbyte()
i += 1
rxpkt[cnt] = c[0]
cnt += 1
if c[0] == 0:
break
if c[0] == 0:
#Finish this frame
if cnt > 0:
cnt -= 1 #remove the phantom trailing 0 that cobs produces
if self._crc16_ccitt(0,rxpkt,cnt) == 0:
event_rx.set(cnt-2)
break
#Finish this block
if code < 0xff:
rxpkt[cnt]=0
cnt += 1
c = await self._getbyte()
def _encode(self, msg, len):
"""COBS Encoder
Process input message bytes to create a COBS frame with CRC protection
Output frame will have 0x00 in first and last byte and no other zero bytes
Keyword arguments:
msg -- message in bytearray. Must have space for this encoder to append 2-byte CRC.
len -- num bytes in msg (specify length so statically allocated buffer can be used)
frame -- pre-allocated bytearray long enough to hold the output frame
frame length will not exceed 5 + int(len(msg)*1.1)
output will have COBS encoded message with 0x00 frame delimiters
"""
frame = self._txfrm
assert type(msg) == bytearray
assert type(frame) == bytearray
frame[0]=0
n = 2
code = 1
code_idx = 1
for i in range(len):
b=msg[i]
if b == 0:
frame[code_idx] = code #FinishBlock
code = 1
code_idx = n
n += 1
else:
frame[n] = b
n += 1
code += 1
if code == 0xFF:
frame[code_idx] = code #FinishBlock
code = 1
code_idx = n
n += 1
frame[code_idx] = code #FinishBlock
frame[n] = 0
return n+1
@micropython.viper
def _crc16_ccitt(self, crc:int, data:ptr8, n:int)->int:
"""Update the 16-bit CRC CCITT (poly 1021) for a bytearray or bytes object
Takes circa 8ms for 10000 bytes (pyboard v1.1 168MHz)
Keyword arguments:
crc -- the CRC to be updated (0 for new calculation)
data -- byte-wide data to be CRC'd
n -- number of data bytes to process
"""
x=0
for i in range(n):
x = ((crc >> 8) ^ data[i]) & 0xff
x = x ^ (x >> 4)
crc = (crc << 8) ^ (x << 12) ^ (x << 5) ^ x
crc = crc & 0xffff
return crc
"""
def _crc16_ccitt(self, crc:int, data:bytes, n:int)->int:
x:int=0
#for i in range(len(data)):
for i in range(n):
x = ((crc >> 8) ^ data[i]) & 0xff
x = x ^ (x >> 4)
crc = (crc << 8) ^ (x << 12) ^ (x << 5) ^ x
crc = crc & 0xffff
return crc
"""
async def blink(objLED, event):
delay=10
objLED.off()
while True:
await event
delay = event.value()
event.clear()
objLED.on()
await asyncio.sleep_ms(delay)
objLED.off()
async def process_msg(rx_event, blink_event, cc):
txmsg = bytearray(1000)
while True:
await rx_event
blink_event.set(10) # blink LED when valid COBS frame received
#Echo back the very same message
#Copy rxmsg into txmsg
msg_len = rx_event.value()
#print("msg_len=",msg_len)
for i in range(msg_len):
txmsg[i]=cc.rxmsg[i]
rx_event.clear() #rxpkt buffer available for use
await cc.send(txmsg,msg_len)
max_rx_pkt_len = 500
max_tx_pkt_len = 500
cc = UartCobs(1, 115200, max_rx_pkt_len, max_tx_pkt_len)
loop = asyncio.get_event_loop()
event_blink = Event()
loop.create_task(blink(pyb.LED(1), event_blink))
event_rx_cobs_pkt = Event()
loop.create_task(cc.receive(event_rx_cobs_pkt))
loop.create_task(process_msg(event_rx_cobs_pkt, event_blink, cc))
loop.run_forever()