MQTT+ ASYNC programming

All ESP32 boards running MicroPython.
Target audience: MicroPython users with an ESP32 board.
Post Reply
lbayo
Posts: 24
Joined: Tue Mar 16, 2021 2:10 pm

MQTT+ ASYNC programming

Post by lbayo » Wed May 05, 2021 3:25 pm

Hi;

Despite the fact that I have been programming in Python for several years, I am a beginner in using async and mqtt and have problems that I cannot understand.
I have taken an original program by Peter Hinch (thanks!), with very small modifications called “range” (name of the topics, qos and some print in the code to visualize what happens).
The result is unpredictable: sometimes it connects and sometimes it doesn't.
I enclose the program and the results, using Thonny IDE, running it several times in succession without modifying anything


Program

Code: Select all

# range.py Test of asynchronous mqtt client with clean session False.
# (C) Copyright Peter Hinch 2017-2019.
# Released under the MIT licence.

# Public brokers https://github.com/mqtt/mqtt.github.io/wiki/public_brokers

# This demo is for wireless range tests. If OOR the red LED will light.
# In range the blue LED will pulse for each received message.
# Uses clean sessions to avoid backlog when OOR.

# red LED: ON == WiFi fail
# blue LED pulse == message received
# Publishes connection statistics. 

from mqtt_as import MQTTClient, config
from config import wifi_led, blue_led
import uasyncio as asyncio
from sys import platform

TOPIC = 'CO2'  # For demo publication and last will use same topic

outages = 0

async def pulse():  # This demo pulses blue LED each time a subscribed msg arrives.
    blue_led(True)
    await asyncio.sleep(1)
    blue_led(False)

def sub_cb(topic, msg, retained):
    print((topic, msg))
    asyncio.create_task(pulse())

async def wifi_han(state):
    global outages
    wifi_led(not state)  # Light LED when WiFi down
    if state:
        print('We are connected to broker.')
    else:
        outages += 1
        print('WiFi or broker is down.')
    await asyncio.sleep(1)

async def conn_han(client):
    await client.subscribe('MONITOR', 1)

async def main(client):
    try:
        await client.connect()
    except OSError:
        print('Connection failed.')
        return
    n = 0
    while True:
        await asyncio.sleep(5)
        print('publish', n)
        # If WiFi is down the following will pause for the duration.
        await client.publish(TOPIC, '{} repubs: {} outages: {}'.format(n, client.REPUB_COUNT, outages), qos = 1)
        n += 1

# Define configuration

config['server'] = '192.168.1.32'  # Change to suit
config['ssid'] = 'MIWIFI_2G_kYXg'
config['wifi_pw'] = 'XX'
config['subs_cb'] = sub_cb
config['wifi_coro'] = wifi_han
config['will'] = (TOPIC, 'Goodbye cruel world!', False, 0)
config['connect_coro'] = conn_han
config['keepalive'] = 120
print (config)
# Set up client. Enable optional debug statements.
MQTTClient.DEBUG = True
client = MQTTClient(config)
try:
    asyncio.run(main(client))
finally:  # Prevent LmacRxBlk:1 errors.
    client.close()
    blue_led(True)
    asyncio.new_event_loop()


Results

Code: Select all

>>> %Run -c $EDITOR_CONTENT
{'wifi_coro': <generator>, 'server': '192.168.1.32', 'ssl': False, 'port': 0, 'ssl_params': {}, 'response_time': 10, 'will': ('CO2', 'Goodbye cruel world!', False, 0), 'subs_cb': <function sub_cb at 0x3ffe77a0>, 'ssid': 'MIWIFI_2G_kYXg', 'wifi_pw': 'XX', 'clean_init': True, 'user': '', 'max_repubs': 4, 'password': '', 'ping_interval': 0, 'connect_coro': <generator>, 'clean': True, 'keepalive': 120, 'client_id': b'240ac4efb324'}
Checking WiFi integrity.
Got reliable connection
Connecting to broker.
Connected to broker.
We are connected to broker.
publish 0
publish 1

────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
MicroPython v1.14 on 2021-02-02; ESP32 module with ESP32
Type "help()" for more information.
>>> %Run -c $EDITOR_CONTENT
{'wifi_coro': <generator>, 'server': '192.168.1.32', 'ssl': False, 'port': 0, 'ssl_params': {}, 'response_time': 10, 'will': ('CO2', 'Goodbye cruel world!', False, 0), 'subs_cb': <function sub_cb at 0x3ffeb190>, 'ssid': 'MIWIFI_2G_kYXg', 'wifi_pw': 'XX', 'clean_init': True, 'user': '', 'max_repubs': 4, 'password': '', 'ping_interval': 0, 'connect_coro': <generator>, 'clean': True, 'keepalive': 120, 'client_id': b'240ac4efb324'}
Checking WiFi integrity.
Got reliable connection
Connecting to broker.
Connection failed.
>>> %Run -c $EDITOR_CONTENT
{'wifi_coro': <generator>, 'server': '192.168.1.32', 'ssl': False, 'port': 0, 'ssl_params': {}, 'response_time': 10, 'will': ('CO2', 'Goodbye cruel world!', False, 0), 'subs_cb': <function sub_cb at 0x3ffeb300>, 'ssid': 'MIWIFI_2G_kYXg', 'wifi_pw': 'XX', 'clean_init': True, 'user': '', 'max_repubs': 4, 'password': '', 'ping_interval': 0, 'connect_coro': <generator>, 'clean': True, 'keepalive': 120, 'client_id': b'240ac4efb324'}
Checking WiFi integrity.
Got reliable connection
Connecting to broker.
Connection failed.
>>> %Run -c $EDITOR_CONTENT
{'wifi_coro': <generator>, 'server': '192.168.1.32', 'ssl': False, 'port': 0, 'ssl_params': {}, 'response_time': 10, 'will': ('CO2', 'Goodbye cruel world!', False, 0), 'subs_cb': <function sub_cb at 0x3ffef8f0>, 'ssid': 'MIWIFI_2G_kYXg', 'wifi_pw': 'XX', 'clean_init': True, 'user': '', 'max_repubs': 4, 'password': '', 'ping_interval': 0, 'connect_coro': <generator>, 'clean': True, 'keepalive': 120, 'client_id': b'240ac4efb324'}
Checking WiFi integrity.
Got reliable connection
Connecting to broker.
Connected to broker.
We are connected to broker.
publish 0
publish 1

50% success, 50% failure !!!

A solution would be to try the connection several times, until the result is obtained. But that is not a deterministic or elegant solution.

Processor: Dev. Kit ESP32 WROOM 32
Broker: Raspberry Pi with Mosquitto
Distance to Access Point: 4 m aprox

Can someone give me an idea to help me determine what happens?

User avatar
pythoncoder
Posts: 5956
Joined: Fri Jul 18, 2014 8:01 am
Location: UK
Contact:

Re: MQTT+ ASYNC programming

Post by pythoncoder » Thu May 06, 2021 6:54 am

The mqtt_as module is designed to be resilient in the sense that once a connection is established the code will recover transparently from outages of the WiFi or the broker. However the design does assume that, on initialisation, the connection can be established. If it can't, it will fail in the way you observe.

I therefore think you have a connectivity problem. This is puzzling as your test hardware is very similar to mine, the ESP32 board and the broker and the Pi host are identical. Is the Pi doing anything else aside from running mosquitto? In my testing, aside from running the Linux GUI, it was idle.

When your script fails it has passed the WiFi test and then failed to connect to the broker. In other words it is behaving as if you had started it with an unresponsive broker. Have you tried accessing the broker from a PC using mosquitto_pub and mosquitto_sub? When you do successfully connect, have you let the program run to see if you experience any outages? These approaches would provide clues as to the integrity of the broker and the stability of your WiFi.
Peter Hinch
Index to my micropython libraries.

lbayo
Posts: 24
Joined: Tue Mar 16, 2021 2:10 pm

Re: MQTT+ ASYNC programming

Post by lbayo » Thu May 27, 2021 7:08 am

Continuing my learning in asynchronous programming and mqtt, a curious problem has appeared that has cost me a lot to detect, but finally detected the problem (not the solution). I write from Catalonia, where we commonly use various Latin languages ​​(Catalan, Spanish and French) with symbols that are not used in English (á à é è ê í ó ú, ñ, ç, ...) and I observe that when using one of these characters in a publish connection drops. Not immediately, but it falls, or so I perceive it.
I attach the code in question, along with the print output and the output in mosquitto


Python code

Code: Select all

# Test asynchrone MQTT

from machine import Pin
from utime import sleep
from mqtt_as import MQTTClient,config
import uasyncio as asyncio
import ubinascii

# publisher daemon
async def mqttSend():
    global mqtt_queue, client, Run
    while Run:
        while len(mqtt_queue)>0:
            top,msg=mqtt_queue.pop(0)
            await client.publish(top, msg, qos = 0)
            await asyncio.sleep(0.1)
        await asyncio.sleep (0.1)

# dummy callback
def sub_cb(a,b,c):
    return
#subscribe to CO2 topic
async def conn_han(client): 
    await client.subscribe("CO2", 0)

async def wifi_han(state):    
    if state:
        print('We are connected to broker.')
    else:
        print('WiFi or broker is down.')
    await asyncio.sleep(1)

# main loop
async def main():
    global client,mqtt_queue, Run
        
    config['ssid']="MIWIFI_2G_kYXg"
    config['wifi_pw'] = "xx"
    config['subs_cb'] = sub_cb                                    # callback routine -----> def sub_cb(topic, msg, retained)
    config['wifi_coro'] = wifi_han                                # SHOW WIFI STATUS -----> async def wifi_han(state)  
    config['will'] = ("MONITOR", '', False, 0)             # mensaje de despedida
    config['connect_coro'] = conn_han                             # connect coroutine
    config['keepalive'] = 30                                     # timempo para considerar que una conexion está perdida
    config['server'] = "192.168.1.32"
    
    MQTTClient.DEBUG = True
    client = MQTTClient(config)
    await client.connect()
    asyncio.create_task(mqttSend())

    # Pin2 is used to stop program when grounded
    inp=Pin(2,Pin.IN, Pin.PULL_UP)
    
    cnt=0
    while inp.value():
        cnt+=1
        mqtt_queue.append(("CO2","counter "+str(cnt)))
        print ("Loop:",cnt,"  connect:",client.isconnected(),"\nqueue:",mqtt_queue)
        if cnt%5==0: # every 5 loops write cóunter with "ó"
            print ("Try to publish 'cóunter'")
            mqtt_queue.append(("CO2","cóunter:"+str(cnt)))
            print ("Publish 'cóunter' done")
        await asyncio.sleep (5)
    Run=False
    await asyncio.sleep(3)


# initialization


mqtt_queue=[]   # queue for publish items
client=None     # global client
Run=True        # running flag

asyncio.run (main())
asyncio.new_event_loop()



Terminal output

Code: Select all

>>> %Run -c $EDITOR_CONTENT
Checking WiFi integrity.
Got reliable connection
Connecting to broker.
Connected to broker.
Loop: 1   connect: True 
queue: [('CO2', 'counter 1')]
We are connected to broker.
Loop: 2   connect: True 
queue: [('CO2', 'counter 2')]
Loop: 3   connect: True 
queue: [('CO2', 'counter 3')]
Loop: 4   connect: True 
queue: [('CO2', 'counter 4')]
Loop: 5   connect: True 
queue: [('CO2', 'counter 5')]
Try to publish 'cóunter'
Publish 'cóunter' done
RAM free 79680 alloc 31488
Loop: 6   connect: True 
queue: [('CO2', 'counter 6')]
Loop: 7   connect: True 
queue: [('CO2', 'counter 7')]
Loop: 8   connect: True 
queue: [('CO2', 'counter 8')]
Loop: 9   connect: True 
queue: [('CO2', 'counter 9')]
WiFi or broker is down.
RAM free 79824 alloc 31344
Loop: 10   connect: False 
queue: [('CO2', 'counter 10')]
Try to publish 'cóunter'
Publish 'cóunter' done
Checking WiFi integrity.
Loop: 11   connect: False 
queue: [('CO2', 'c\xf3unter:10'), ('CO2', 'counter 11')]
Got reliable connection
Connecting to broker.
Connected to broker.
Reconnect OK!
We are connected to broker.
Loop: 12   connect: True 
queue: [('CO2', 'counter 12')]
Loop: 13   connect: True 
queue: [('CO2', 'counter 13')]
WiFi or broker is down.
Loop: 14   connect: False 
queue: [('CO2', 'counter 14')]
Checking WiFi integrity.
Loop: 15   connect: False 
queue: [('CO2', 'counter 15')]
Try to publish 'cóunter'
Publish 'cóunter' done
Got reliable connection
Connecting to broker.
Connected to broker.
Reconnect OK!
We are connected to broker.
Loop: 16   connect: True 
queue: [('CO2', 'counter 16')]
Loop: 17   connect: True 
queue: [('CO2', 'counter 17')]
Loop: 18   connect: True 
queue: [('CO2', 'counter 18')]
WiFi or broker is down.
Checking WiFi integrity.
Loop: 19   connect: False 
queue: [('CO2', 'counter 19')]
Got reliable connection
Connecting to broker.
Connected to broker.
Reconnect OK!
We are connected to broker.
Loop: 20   connect: True 
queue: [('CO2', 'counter 20')]
Try to publish 'cóunter'
Publish 'cóunter' done
Loop: 21   connect: True 
queue: [('CO2', 'counter 21')]
Loop: 22   connect: True 
queue: [('CO2', 'counter 22')]


Mosquitto output

Code: Select all

lluis@mainhost:~$ mosquitto_sub -h 192.168.1.32 -t CO2
counter 1
counter 2
counter 3
counter 4
counter 5
cóunter:
counter 10
cóunter:1
counter 14
counter 15
cóunter:1
counter 19
counter 20
cóunter:2

The program is very simple: it publishes every 5 seconds a message via mqtt ("counter <n>") and every 5 messages another similar but with accented letters (ó). Then the connection drops. The software re-establishes the connection and every time an "ó" character is printed but the connection drops again, every time a "ó" character is published. If there is no "ó" character, everything works correctly.

Terminal output without "ó"

Code: Select all

>>> %Run -c $EDITOR_CONTENT
Checking WiFi integrity.
Got reliable connection
Connecting to broker.
Connected to broker.
Loop: 1   connect: True 
queue: [('CO2', 'counter 1')]
We are connected to broker.
Loop: 2   connect: True 
queue: [('CO2', 'counter 2')]
Loop: 3   connect: True 
queue: [('CO2', 'counter 3')]
Loop: 4   connect: True 
queue: [('CO2', 'counter 4')]
Loop: 5   connect: True 
queue: [('CO2', 'counter 5')]
Try to publish 'counter'
Publish 'counter' done
RAM free 49504 alloc 61664
Loop: 6   connect: True 
queue: [('CO2', 'counter 6')]
Loop: 7   connect: True 
queue: [('CO2', 'counter 7')]
Loop: 8   connect: True 
queue: [('CO2', 'counter 8')]
Loop: 9   connect: True 
queue: [('CO2', 'counter 9')]
RAM free 49584 alloc 61584
Loop: 10   connect: True 
queue: [('CO2', 'counter 10')]
Try to publish 'counter'
Publish 'counter' done
Loop: 11   connect: True 
queue: [('CO2', 'counter 11')]
Loop: 12   connect: True 
queue: [('CO2', 'counter 12')]
Loop: 13   connect: True 
queue: [('CO2', 'counter 13')]
RAM free 49584 alloc 61584
Loop: 14   connect: True 
queue: [('CO2', 'counter 14')]
Mosquitto output without "ó"

Code: Select all

lluis@mainhost:~$ mosquitto_sub -h 192.168.1.32 -t CO2
counter 1
counter 2
counter 3
counter 4
counter 5
counter:5
counter 6
counter 7
counter 8
counter 9
counter 10
counter:10
counter 11
counter 12
counter 13
counter 14

To stop the program cleanly, just put pin 2 to GND
How could this be solved? Obviously, I don't dare to modify the drivers ... yet.

Environment:
ESP32 WROOM 32 dev kit
Thonny IDE (OS: Ubuntu 20.04)
Raspberry pi B+ at IP=192.168.1.32 as a mqtt broker (with mosquitto)

kevinkk525
Posts: 969
Joined: Sat Feb 03, 2018 7:02 pm

Re: MQTT+ ASYNC programming

Post by kevinkk525 » Thu May 27, 2021 9:42 am

you try to send strings which only works in certain cases. Try sending them as bytes by "string.encode()". Maybe that works.
Kevin Köck
Micropython Smarthome Firmware (with Home-Assistant integration): https://github.com/kevinkk525/pysmartnode

Post Reply