Serial binary data packets using COBS and uasyncio

General discussions and questions abound development of code with MicroPython that is not hardware specific.
Target audience: MicroPython Users.
Post Reply
Planet9
Posts: 7
Joined: Tue Nov 12, 2019 7:10 pm

Serial binary data packets using COBS and uasyncio

Post by Planet9 » Thu Feb 20, 2020 5:54 pm

Sending binary data over a uart serial link can be a bit of a chore.
One of the better ways is using COBS (Consistent Overhead Byte Stuffing) to packetise data.
See the Cheshire & Baker paper linked at end of the wiki article:
https://en.wikipedia.org/wiki/Consisten ... e_Stuffing

Here is a micropython asyncio example of a COBS encoder / decoder for uart binary data packets.
Not exactly like the Cheshire & Baker paper as it adds 0x00 delimiter bytes and includes a CRCITT over the packet data.
This will simply echo back any valid packet received.

Code: Select all

"""
This code will only run in MicroPython and it requires the uasyncio module.

This is a MicroPython implementation of Consistent Overhead Byte Stuffing,
an efficient method of creating binary data packets for transfer over a serial
channel. For details do an internet search for this openly available paper:
Cheshire & Baker, Consistent Overhead Byte Stuffing filetype:pdf
IEEE/ACM TRANSACTIONS ON NETWORKING, VOL.7, NO. 2, APRIL 1999

The UartCobs class provides methods for binary data packet transfer over UART with CRC protection
The code below is a simple example which just echos back any valid packet recieved.
Import to run.
"""

import uasyncio as asyncio
from machine import UART
import os, pyb
from asyn import Event


class UartCobs():

    def __init__(self, uartn:int, baud:int, max_tx_len, max_rx_len):
        """Note the UART receive ring buffer (read_buf_len) should be
        appropriate for size and rate of incoming COBS frames
        """
        self._max_tx_len = max_tx_len + round(max_tx_len/5) + 7 # Allow space for two byte CRC and COBS formatting bytes
        self._max_rx_msg_len = max_rx_len + 3 # Allow space for two byte CRC and the COBS phantom byte
        self._max_frame_rx_len = max_rx_len + round(max_rx_len/5) + 5 # Received frame will have CRC and COBS formatting bytes
        
        self.uart = UART(uartn)
        #You may need to increase uart rx ringbuffer depending on demand for processor time by other tasks, default is 64 bytes
        self.uart.init(baudrate = baud, bits=8, parity=None, stop=1, timeout=0, flow=0, timeout_char=0, read_buf_len=128)
        self._swriter = asyncio.StreamWriter(self.uart, {})
        self._txfrm = bytearray(self._max_tx_len)
        self.rxmsg = bytearray(self._max_rx_msg_len)


    async def send(self, txmsg:bytearray, msg_len):
        """Task for encoding and sending COBS frames over the UART
        txmsg muust have space for the two byte CRC to be appended
        """
        crc = self._crc16_ccitt(0,txmsg,msg_len)        
        txmsg[msg_len] = crc >> 8
        txmsg[msg_len+1] = crc % 256
        msg_len += 2
        frame_len = self._encode(txmsg, msg_len)
        await self._swriter.awrite(self._txfrm, 0, frame_len)


    async def _getbyte(self):
        #Using await streamreader here was very slow, so using
        #this method which will hog processor until the uart ringbuffer is emptied.
        #Shouldn't be an issue since uart comms is relatively slow.
        while self.uart.any() == 0:
            await asyncio.sleep_ms(2) #Be careful not to overflow uart ringbuffer (2ms at 115Kbaud is about 23 bytes) but other tasks may keep processor for longer
        return self.uart.read(1)


    async def receive(self, event_rx):
        """State machine task for COBS receiver"""
        rxpkt = self.rxmsg
        while True:

            while True:   #Wait for 0x00 frame start
                c = await self._getbyte()
                if c[0] == 0:
                    break
            while True:   #Wait for non-zero byte (the first code byte of a frame)
                c = await self._getbyte()
                if c[0] != 0:
                    break

            cnt=0 #count of bytes decoded from the frame and output to the packet
            while True:
                #Start a new COBS block because of new frame or
                #we just finished a block in the current frame
                code = c[0]
                i=1
                while i < code:
                    c = await self._getbyte()
                    i += 1
                    rxpkt[cnt] = c[0]
                    cnt += 1
                    if c[0] == 0:
                        break

                if c[0] == 0:
                    #Finish this frame                    
                    if cnt > 0:
                        cnt -= 1 #remove the phantom trailing 0 that cobs produces
                    if self._crc16_ccitt(0,rxpkt,cnt) == 0:
                        event_rx.set(cnt-2)
                    break

                #Finish this block
                if code < 0xff:
                    rxpkt[cnt]=0
                    cnt += 1

                c = await self._getbyte()

        
    def _encode(self, msg, len):
        """COBS Encoder
        Process input message bytes to create a COBS frame with CRC protection
        Output frame will have 0x00 in first and last byte and no other zero bytes
        
        Keyword arguments:
        msg -- message in bytearray. Must have space for this encoder to append 2-byte CRC.
        len -- num bytes in msg (specify length so statically allocated buffer can be used)
        frame -- pre-allocated bytearray long enough to hold the output frame
                frame length will not exceed 5 + int(len(msg)*1.1)
                output will have COBS encoded message with 0x00 frame delimiters
        """
        frame = self._txfrm       
        assert type(msg) == bytearray
        assert type(frame) == bytearray
        frame[0]=0
        n = 2
        code = 1
        code_idx = 1
        for i in range(len):
            b=msg[i]
            if b == 0:
                frame[code_idx] = code  #FinishBlock
                code = 1
                code_idx = n
                n += 1
            else:
                frame[n] = b
                n += 1
                code += 1
                if code == 0xFF:
                    frame[code_idx] = code  #FinishBlock
                    code = 1
                    code_idx = n
                    n += 1
        frame[code_idx] = code  #FinishBlock
        frame[n] = 0
        return n+1


    @micropython.viper
    def _crc16_ccitt(self, crc:int, data:ptr8, n:int)->int:
        """Update the 16-bit CRC CCITT (poly 1021) for a bytearray or bytes object
        Takes circa 8ms for 10000 bytes (pyboard v1.1 168MHz)
        Keyword arguments:
        crc -- the CRC to be updated (0 for new calculation)
        data -- byte-wide data to be CRC'd
        n -- number of data bytes to process
        """
        x=0
        for i in range(n):
            x = ((crc >> 8) ^ data[i]) & 0xff
            x = x ^ (x >> 4)
            crc = (crc << 8) ^ (x << 12) ^ (x << 5) ^ x
            crc = crc & 0xffff
        return crc

    """
    def _crc16_ccitt(self, crc:int, data:bytes, n:int)->int:
        x:int=0
        #for i in range(len(data)):
        for i in range(n):
            x = ((crc >> 8) ^ data[i]) & 0xff
            x = x ^ (x >> 4)
            crc = (crc << 8) ^ (x << 12) ^ (x << 5) ^ x
            crc = crc & 0xffff
        return crc
    """





async def blink(objLED, event):
    delay=10
    objLED.off()
    while True:
        await event
        delay = event.value()
        event.clear()
        objLED.on()
        await asyncio.sleep_ms(delay)
        objLED.off()
        

async def process_msg(rx_event, blink_event, cc):
    txmsg = bytearray(1000)
    while True:
        await rx_event       
        blink_event.set(10)  # blink LED when valid COBS frame received
        
        #Echo back the very same message
        #Copy rxmsg into txmsg
        msg_len = rx_event.value()
        #print("msg_len=",msg_len)
        for i in range(msg_len):
            txmsg[i]=cc.rxmsg[i]
        rx_event.clear() #rxpkt buffer available for use
        await cc.send(txmsg,msg_len)






max_rx_pkt_len = 500
max_tx_pkt_len = 500
cc = UartCobs(1, 115200, max_rx_pkt_len, max_tx_pkt_len)
    
loop = asyncio.get_event_loop()

event_blink = Event()
loop.create_task(blink(pyb.LED(1), event_blink))
    
event_rx_cobs_pkt = Event()
loop.create_task(cc.receive(event_rx_cobs_pkt))

loop.create_task(process_msg(event_rx_cobs_pkt, event_blink, cc))

loop.run_forever()


Planet9
Posts: 7
Joined: Tue Nov 12, 2019 7:10 pm

Re: Serial binary data packets using COBS and uasyncio

Post by Planet9 » Thu Feb 20, 2020 5:57 pm

The following is a rudimentary COBS encoder/decoder for CPython.
Setup the serial port appropriately to communicate with a pyboard, this code will send packets and look for an identical echo, as produced by the micropython code above.

Code: Select all

import serial,os
from time import sleep
from random import randint
from datetime import datetime,timedelta


def ms_since(start_time):
    '''
    Returns the elapsed milliseconds since start_time parameter
    Usage:
             start_time = datetime.now()
             ms_since(start_time)
    '''
    dt = datetime.now() - start_time
    ms = (dt.days * 24 * 60 * 60 + dt.seconds) * 1000 + dt.microseconds / 1000.0
    return ms


def crc16_ccitt_bytes(crc:int, data:bytes):
    x:int=0
    for i in range(len(data)):
        x = ((crc >> 8) ^ data[i]) & 0xff
        x = x ^ (x >> 4)
        crc = (crc << 8) ^ (x << 12) ^ (x << 5) ^ x
        crc = crc & 0xffff
    return crc


def cob_encode(msg):
    '''
    COBS encoder/stuffer
    Input bytes are encoded such that there are no zero bytes in the output
    '''
    if type(msg) != bytes and type(msg) != bytearray:
        raise TypeError('Need bytes or bytearray input')
    code = i = 1
    code_idx = 0
    frame = bytearray( 5 + int(len(msg)*1.1) )
    for b in msg:
        if b == 0:
            frame[code_idx] = code  #FinishBlock
            code = 1
            code_idx = i
            i += 1
        else:
            frame[i] = b
            i += 1
            code += 1
            if code == 0xFF:
                frame[code_idx] = code  #FinishBlock
                code = 1
                code_idx = i
                i += 1
    frame[code_idx] = code  #FinishBlock
    return frame[0:i]



def cob_decode(frame):
    '''
    COBS decoder/unstuffer
    '''
    if type(frame) != bytes and type(frame) != bytearray:
        raise TypeError('Need bytes or bytearray input')
    msg = bytearray()
    i=0
    while i < len(frame):
        code = frame[i]
        i += 1
        for j in range(1,code):
            msg.append(frame[i])
            i += 1
        if code < 0xFF:
            msg.append(0)
    return msg[0:-1]




def frame_is_valid(frm):
    '''
    The last four bytes of a decoded frame hold length and checksum (little endian)
    Thus a valid frame should be at least 5 bytes long
    '''
    if len(frm) < 5:
        return False
    # Message length field should tie up with received COBS frame length
    hdr_msg_len = 256 * frm[-1] + frm[-2]
    if len(frm) != (hdr_msg_len + 4):
        return False
    # Test checksum
    csum = 256 * frm[-3] + frm[-4]
    if checksum(frm[0:-4]) != csum:
        return False
    return True




def test_cobs_python_methods(iterations, verbose=False):
    '''
    Loopback test through COBS encoder and decoder
    '''
    for n in range(iterations):
        msg = bytes(os.urandom(randint(5,1000)))
        data_enc = cob_enc(msg)
        data_dec = cob_decode(data_enc)
        if verbose:
            print("Test ", n, ":")
            print("msg:", msg.hex())
            print("enc:", data_enc.hex())
            print("dec:", data_dec.hex())
        if data_dec != msg:
            print("FAIL")
            print(msg.hex())
            print("enc:", data_enc.hex())
            print("dec:", data_dec.hex())
            return
    print("Done")



'''
Methods which access the serial port
'''


def get_frame(serialport, timeout_ms):
    '''
    Build bytearray of uart characters until 0x00 received.
    If timeout occurs any characters received so far are returned.
    '''
    start_time = datetime.now()
    frame = bytearray()
    while ms_since(start_time) < timeout_ms:
        sleep(0.010)
        n = serialport.inWaiting()
        for i in range(n):
            c = serialport.read(1)
            if len(c) != 0:
                if c[0] == 0:
                    if len(frame) > 0:
                        timeout = False
                        return [frame, timeout]
                else:
                    frame.append(c[0])
    timeout = True
    return [frame, timeout]



def send_msg(serialport, msg, verbose=False):
    '''
    Append csum and length fields to message, encode and send it!
    '''
    m = bytearray()
    for b in msg:
        m.append(b)

    csum = crc16_ccitt_bytes(0,msg)
    length = len(msg)

    m.append(csum >> 8)
    m.append(csum & 0x0FF) #low byte last for crc ccitt
    
    menc = cob_encode(m)
    if verbose:
        print("len:", length, "bytes, csum:", hex(csum))
        print("msg:", msg.hex())
        print("menc:", menc.hex())
    serialport.write([0])  # Frame delim character
    serialport.write(menc) # No 0x00 bytes in here
    serialport.write([0])  # Frame delim character
    #sleep(0.1)



def get_msg(serialport, timeout_ms, verbose=False):
    '''
    Get a frame, verify checksum
    '''
    [frame, timeout] = get_frame(serialport, timeout_ms)
    if len(frame) < 5 or timeout:
        return bytearray()
    fd = cob_decode(frame)
    if frame_is_valid(fd) == False:
        if verbose:
            print("FAIL Invalid Frame Received")
            print("rcv: ",frame.hex())
        return bytearray()
    return fd[0:-4]





def test_cobs_loopback(serialport):
    '''
    Loopback test through microcontroller which should be programmed
    to echo messages
    '''
    bytecnt = 0
    start_time = datetime.now()
    for n in range(50):
        print("Test",n)
        #msg = bytearray.fromhex('8501bebd000b131ac4163d93721a5b3176')
        msg = os.urandom(randint(1022,1024))
        print(len(msg))
        #msg = os.urandom(10)
        bytecnt += len(msg)
        send_msg(serialport,msg)
        [rcv, timeout] = get_frame(serialport, 1000)
        if len(rcv) == 0:
            print ("No response")
            return
        if len(rcv) < 3 or timeout:
            print ("Bad response length = ",len(rcv))
            return

        resp = rcv
        #print("Rx Msg:")
        #print (resp.hex().upper())
        #print("Rx Msg Decoded:")
        data_dec = cob_decode(resp)
        #print (data_dec.hex().upper())
        if frame_is_valid(data_dec) == False:
            print("FAIL")
            print("msg: ",msg.hex())
            print("rcv: ",rcv.hex())
            print("rsp: ",data_dec.hex())
            return
    print("Success: ", bytecnt, "bytes echoed in", int( 10*ms_since(start_time)/1000 )/10.0, "secs")




'''
-----------------   Allow execution as a program or as a module  -----------------------
'''

if __name__ == '__main__':
    print("COBS Loopback Test")
    ser = serial.Serial(port='COM12', baudrate=115200, bytesize=8, parity='N', stopbits=1,timeout=1, xonxoff=False, rtscts=0, dsrdtr=False)
    total=0
    bad=0
    for i in range(100):
        msg = os.urandom(randint(100,400))
        send_msg(ser, msg, verbose=False)
        [rcv, timeout] = get_frame(ser, 5000)
        if len(rcv) < 5 or timeout:
            print ("Msg Sent= ", msg.hex().upper())
            print ("Bad Resp= ", rcv.hex().upper())
            bad += 1
            throw
            continue
        
        resp = rcv
        #print ("RxFrm: ", resp.hex().upper())
        data_dec = cob_decode(resp)
        #print ("RxMsg: ", data_dec.hex().upper())
        
        crc = crc16_ccitt_bytes(0,data_dec)
        if crc != 0:
            bad += 1
            print("*** CRC Error ***")
        total += 1

        print("Good: ",total-bad," Bad: ",bad,"\n")
    ser.close()



__all__ = ["send_msg", "get_msg"]



Planet9
Posts: 7
Joined: Tue Nov 12, 2019 7:10 pm

Re: Serial binary data packets using COBS and uasyncio

Post by Planet9 » Thu Feb 20, 2020 6:01 pm

Apologies for the lack of explanation here, hopefully the Cheshire & Baker paper is enough.
This is just a bare-bones code dump for anyone who might be interested in a micropython version of COBS.

User avatar
pythoncoder
Posts: 5956
Joined: Fri Jul 18, 2014 8:01 am
Location: UK
Contact:

Re: Serial binary data packets using COBS and uasyncio

Post by pythoncoder » Fri Feb 21, 2020 8:27 am

This is very interesting. It might find wider use if you publish it on GitHub.

I note your comment about uasyncio StreamReader being slow. The speed of uasyncio I/O has been a bone of contention for a long time. It is quite spectacularly inefficient. I produced a modified version of usayncio in this repo to address this and other issues. Hopefully this will become redundant when Damien releases the entirely new version of uasyncio.

If the new version includes the (requested) option to prioritise I/O you might want to revisit that part of the code.
Peter Hinch
Index to my micropython libraries.

User avatar
pythoncoder
Posts: 5956
Joined: Fri Jul 18, 2014 8:01 am
Location: UK
Contact:

Re: Serial binary data packets using COBS and uasyncio

Post by pythoncoder » Fri Feb 21, 2020 8:57 am

Another thought, assuming a version of uasyncio with efficient I/O. If you changed the COBS delimiter to '\n' would it be possible to use the StreamReader.readline method? Might it be worth making the delimiter configurable to enable this in the future?
Peter Hinch
Index to my micropython libraries.

chuckbook
Posts: 135
Joined: Fri Oct 30, 2015 11:55 pm

Re: Serial binary data packets using COBS and uasyncio

Post by chuckbook » Fri Feb 21, 2020 9:43 am

Would it make senses to use either 3964 or 3964R protocol?

Planet9
Posts: 7
Joined: Tue Nov 12, 2019 7:10 pm

Re: Serial binary data packets using COBS and uasyncio

Post by Planet9 » Fri Feb 21, 2020 10:56 am

Thanks for your comments:

Github - Peter has an excellent group of examples for uasyncio, perhaps this could be included there...with some tidy up first :-)

New uasyncio - Yes, I should look at that when I get some time.

Delimiter - COBS uses 0x00 as the frame delimiter and between blocks within the frame (where a block is part of the original message which has no 0x00 chars in it). I guess another delimiter could be used but that would break with the COBS 'standard' as it is. Maybe StreamReader.readline could be used if another delimiter is chosen.

Would it make senses to use either 3964 or 3964R protocol? - COBS is simply a packetiser for binary data, with low overhead and uses 0x00 chars to delimit blocks/frames (a bit more bandwidth efficient than 3964 which uses timeouts to delimit frames). Anyway, its not meant to compete with fully defined protocols such as modbus/3964 etc, its just an efficient packetiser. Make of it what you will.

I'll be away for a few weeks shortly, so apologise in advance for any unanswered queries.

Post Reply