umqtt.robust: ensure that broker received the message

Discussion about programs, libraries and tools that work with MicroPython. Mostly these are provided by a third party.
Target audience: All users and developers of MicroPython.
djipey
Posts: 17
Joined: Sun Dec 01, 2019 3:04 pm

umqtt.robust: ensure that broker received the message

Post by djipey » Sun Dec 01, 2019 3:16 pm

Hi, this is my first post on the forum.

I'm familiar with Python and I'm trying to send data from my ESP32 board to a Mosquitto broker, with the MQTT "protocol".

This isn't really hard and I managed to get that working. Here is a short sample of code demonstrating how I send some environmental data to the broker:

Code: Select all

import machine
import time
from umqtt.robust import MQTTClient

import bme280

import max44009
import config

i2c = machine.I2C(scl=machine.Pin(config.SCL_PIN), sda=machine.Pin(config.SDA_PIN))

c = MQTTClient("umqtt_client", config.MQTT_SERVER)

bme = bme280.BME280(i2c=i2c)
max44009 = max44009.MAX44009(i2c)

while True:
    print(bme.values)
    print(max44009.illuminance_lux)

    temperature = bme.values[0].replace("C", "")
    pressure = bme.values[1].replace("hPa", "")
    humidity = bme.values[2].replace("%", "")
    illuminance = str(max44009.illuminance_lux)

    c.connect()

    c.publish(config.TOPIC_TEMPERATURE, temperature, qos=1)
    c.publish(config.TOPIC_PRESSURE, pressure, qos=1)
    c.publish(config.TOPIC_HUMIDITY, humidity, qos=1)
    c.publish(config.TOPIC_ILLUMINANCE, illuminance, qos=1)

    c.disconnect()

    time.sleep(5)
The board connects to my wifi network when it boots. Everything works well.

Now, let's say my bandwidth gets sparse (because I'm watching Netflix for example). The MQTT communication starts to struggle. Sometimes, I notice that the broker doesn't get any messages for like 30 min. Because I pass `qos=1`, the MQTT client will try again and again to publish to the broker. But in this case I might want to do something else, like reconnect to the wifi network even if I'm still connected to it (I checked, trying to do wlan.isconnected() will return True, it just seems like communication is hard between the board and the broker).

So I was wondering, is there a standard way to check if the broker received the message, and perform some actions if it's not the case?

Cheers

User avatar
pythoncoder
Posts: 4369
Joined: Fri Jul 18, 2014 8:01 am
Location: UK
Contact:

Re: umqtt.robust: ensure that broker received the message

Post by pythoncoder » Mon Dec 02, 2019 7:20 am

Various thoughts come to mind. The best solution would be to run a separate WiFi AP for your MQTT stuff, then the problem would go away ;) Failing that I can think of two possibilities.

If you ran a second MQTT client it could subscribe to the topic and publish a response. Your main client could wait for the subscription and do something if it's not received in the requisite period. Note that your second client could run on the same box as the broker - you could write it in CPython using Paho MQTT. Or it could be a simple script calling mosquitto_sub and mosquitto_pub in a loop.

The other approach would be to use the resilient asynchronous MQTT library. Asynchronous code (based on uasyncio) makes this easy. But there is a learning curve for uasyncio: if you're not already familiar with it there is a tutorial in in this repo.
Peter Hinch

Kleriger
Posts: 6
Joined: Mon Dec 30, 2019 9:16 am
Location: Germany

Re: umqtt.robust: ensure that broker received the message

Post by Kleriger » Tue Jan 14, 2020 12:24 pm

pythoncoder wrote:
Mon Dec 02, 2019 7:20 am
If you ran a second MQTT client it could subscribe to the topic and publish a response. Your main client could wait for the subscription and do something if it's not received in the requisite period.

The other approach would be to use the resilient asynchronous MQTT library. Asynchronous code (based on uasyncio) makes this easy. But there is a learning curve for uasyncio: if you're not already familiar with it there is a tutorial in in this repo.
The second mqtt client is a great idea.
I have a problem with umqtt.robust in micropython. It is more precisely about the qos. If I use qos = 0, my data will be sent immediately without any problems. If I use qos = 1, the data is also sent, but with a delay of several seconds. Unfortunately, the data must be sent immediately. At the moment I have no idea where this delay could be caused. Google couldn't help me either ... Any idea?
Unfortunately Uasyncio and their mqtt library don't seem to support tls with the esp32 yet.

Code: Select all

from umqtt.robust import MQTTClient
from json import dumps
mqtt = MQTTClient(mqtt_client, mqtt_broker, mqtt_broker_port, ssl=True, ssl_params={"cert":"ca_cert"})
mqtt.connect()

#publish is triggered when a PIN changes and changes are written into a list, which is then sent one after the other
try:
    mqtt.publish("test_channel", dumps(message), qos=1)
except:
   print("\ncouldn't publish to mqtt broker...")
else:
    print("published to mqtt broker")
    pass
Output with qos = 0:

[[6, 1]]

published to mqtt broker

[[6, 0]]

published to mqtt broker

[[6, 1]]

published to mqtt broker

[[6, 0]]

published to mqtt broker

[[6, 1]]

published to mqtt broker

[[6, 0]]

published to mqtt broker

[[6, 1]]

published to mqtt broker

[[6, 0]]

published to mqtt broker

[[6, 1]]

published to mqtt broker

[[6, 0]]

published to mqtt broker



Output with qos = 1:

[[6, 1]]

[[6, 0], [6, 1]]

[[6, 1], [6, 0], [6, 1]]

[[6, 0], [6, 1], [6, 0], [6, 1]]

[[6, 1], [6, 0], [6, 1], [6, 0], [6, 1]]

[[6, 0], [6, 1], [6, 0], [6, 1], [6, 0], [6, 1]]

published to mqtt broker

published to mqtt broker

published to mqtt broker

published to mqtt broker

published to mqtt broker

published to mqtt broker

published to mqtt broker

User avatar
pythoncoder
Posts: 4369
Joined: Fri Jul 18, 2014 8:01 am
Location: UK
Contact:

Re: umqtt.robust: ensure that broker received the message

Post by pythoncoder » Tue Jan 14, 2020 2:59 pm

With qos==1 publish waits until it gets a response packet which confirms reception. This can take time depending on the server, and I believe TLS slows it down too. If you try it with a local broker without TLS response is fast.

The implementation is broken because if it never gets the response packet it hangs forever. This can happen, especially on wireless networks.
Peter Hinch

djipey
Posts: 17
Joined: Sun Dec 01, 2019 3:04 pm

Re: umqtt.robust: ensure that broker received the message

Post by djipey » Tue Jan 14, 2020 9:09 pm

By the way, I never replied to the thread.

I found a way to solve my problem: with uping and a nightly firmware (uping needs a recent implementation of websockets to work properly), I can first ping the broker. If a ping can reach it, I send the data. If not, I try to reconnect to the network. It works like a charm :) Of course, this means an irregular interval between data measurements, since the program can try to reconnect for a while. But overall, I'm quite satisfied.

Kleriger
Posts: 6
Joined: Mon Dec 30, 2019 9:16 am
Location: Germany

Re: umqtt.robust: ensure that broker received the message

Post by Kleriger » Wed Jan 15, 2020 10:13 am

pythoncoder wrote:
Tue Jan 14, 2020 2:59 pm
With qos==1 publish waits until it gets a response packet which confirms reception. This can take time depending on the server, and I believe TLS slows it down too. If you try it with a local broker without TLS response is fast.

The implementation is broken because if it never gets the response packet it hangs forever. This can happen, especially on wireless networks.
I tried it both with a local broker (mosquitto) and without encryption, but the result is the same.

djipey wrote:
Tue Jan 14, 2020 9:09 pm
By the way, I never replied to the thread.

I found a way to solve my problem: with uping and a nightly firmware (uping needs a recent implementation of websockets to work properly), I can first ping the broker. If a ping can reach it, I send the data. If not, I try to reconnect to the network. It works like a charm :) Of course, this means an irregular interval between data measurements, since the program can try to reconnect for a while. But overall, I'm quite satisfied.
I solved the problem directly with the ping integrated in umqtt, only that you have to ping twice before noticing that the broker is no longer available.

Code: Select all

def mqtt_isconnected():
    try:
        mqtt.ping()
        mqtt.ping()
    except:
        print("\nlost connection to mqtt broker...")
        return False
    else:
        return True    

Kleriger
Posts: 6
Joined: Mon Dec 30, 2019 9:16 am
Location: Germany

Re: umqtt.robust: ensure that broker received the message

Post by Kleriger » Fri Jan 17, 2020 3:52 pm

pythoncoder wrote:
Mon Dec 02, 2019 7:20 am
If you ran a second MQTT client it could subscribe to the topic and publish a response. Your main client could wait for the subscription and do something if it's not received in the requisite period.
I tried that. I created an mqqt client for publish and one for subscribe on the esp32 and compared the values ​​that were returned with the ones that were sent. Everything worked fine until I activated TLS.
Unfortunately, my latency increased so much that there was a permanent loss of connection with the wifi.
Apparently, the esp32 does not have enough power to handle two TLS sessions in a stable manner.
Without TLS, I think it's a very good solution.

I will also keep an eye on the asyncio mqtt. If that supports TLS for esp32 then I could try it out.


edit:
I tested my program after an update to MP version 1.12 and now it also works with encryption. Apparently the version can handle TLS on esp32 better, I guess that's why the asyncio-mqtt version could also work.


My solution:

Code: Select all

"""
mqtt.py
"""
from umqtt.simple import MQTTClient
from json import dumps
from utime import sleep, time

mqtt_broker = "xxx"
mqtt_broker_port = "8883"
mqtt_client_pub = "esp32_pub"
mqtt_client_sub = "esp32_sub"
mqtt_keep_alive = 60 #time in seconds until a keep alive is sent
with open('ca.crt') as f:
    ca_cert = f.read()
sub_received = False
message_sub = "empty"

# sub_received messages from subscriptions will be delivered to this callback
def mqtt_client_sub_callback(topic, msg):
    global sub_received
    global message_sub
    sub_received = True
    message_sub = msg
    #print(msg)
    
mqtt_pub = MQTTClient(mqtt_client_pub, mqtt_broker, mqtt_broker_port, ssl=True, ssl_params={"cert":"ca_cert"})
mqtt_sub = MQTTClient(mqtt_client_sub, mqtt_broker, mqtt_broker_port, ssl=True, ssl_params={"cert":"ca_cert"})
#mqtt_pub = MQTTClient(mqtt_client_pub, mqtt_broker, mqtt_broker_port)
#mqtt_sub = MQTTClient(mqtt_client_sub, mqtt_broker, mqtt_broker_port)
mqtt_sub.set_callback(mqtt_client_sub_callback)
   
#connect and reconnect to mqtt broker
def mqtt_connect():
    print("\nconnect to mqtt broker...")
    try:
        mqtt_pub.connect()
        mqtt_sub.connect(clean_session=False)
    except:
        print("\ncan't connect to mqtt broker...")
        return False
    else:
        print("\nconnected to mqtt broker...")
        return True


def mqtt_subscribe():
    try:
        mqtt_sub.subscribe(mqtt_client_pub,qos=1)
    except:
        print("\ncouldn't subscribe to mqtt broker...")
    else:
        print("subscribed to mqtt broker")
        pass
    

def mqtt_publish(message_pub):
    global message_sub
    global sub_received
    message_sub = "empty"
    sub_received = False
    sub_received_time = time()
        
    while str(message_sub, 'utf-8') != str(message_pub):
        published = False
        while not published:
            try:
                mqtt_pub.publish(mqtt_client_pub, dumps(message_pub), qos=0)
            except:
                print("\ncouldn't publish to mqtt broker...")
            else:
                print("published {} to mqtt broker".format(message_pub))
                published = True
        while not sub_received:
            try:
                mqtt_sub.check_msg()
            except:
                print("\ncouldn't wait for msg from mqtt broker...")
                sleep(1)
            else:
                if time()-sub_received_time > 5:
                    published = False
                    print("\ntimeout while waiting for message from mqtt broker, publish again...")
                    

def mqtt_isconnected():
    try:
        mqtt_pub.ping()
        mqtt_pub.ping()
    except:
        print("\nlost connection to mqtt broker...")
        return False
    else:
        return True    

        
Last edited by Kleriger on Wed Jan 22, 2020 9:00 am, edited 1 time in total.

User avatar
pythoncoder
Posts: 4369
Joined: Fri Jul 18, 2014 8:01 am
Location: UK
Contact:

Re: umqtt.robust: ensure that broker received the message

Post by pythoncoder » Sat Jan 18, 2020 6:04 am

Kleriger wrote:
Fri Jan 17, 2020 3:52 pm
...
I will also keep an eye on the asyncio mqtt. If that supports TLS for esp32 then I could try it out...
In my testing TLS only worked on the Pyboard D. The problem is that the library uses nonblocking sockets and (as I understand it) the TLS implementation on ESP32 and ESP8266 does not support these.
Peter Hinch

User avatar
tve
Posts: 214
Joined: Wed Jan 01, 2020 10:12 pm
Location: Santa Barbara, CA
Contact:

Re: umqtt.robust: ensure that broker received the message

Post by tve » Sun Jan 19, 2020 5:36 pm

pythoncoder wrote:
Sat Jan 18, 2020 6:04 am
In my testing TLS only worked on the Pyboard D. The problem is that the library uses nonblocking sockets and (as I understand it) the TLS implementation on ESP32 and ESP8266 does not support these.
What do you mean by "does not support these"? DO you mean it doesn't work at all or sometimes crashes or otherwise breaks? I'm compiling MP from the 1.12 tag with ESP-IDF 4.1 for the ESP32 and I don't have problems using TLS with mqtt_as.py that I can detect.

User avatar
pythoncoder
Posts: 4369
Joined: Fri Jul 18, 2014 8:01 am
Location: UK
Contact:

Re: umqtt.robust: ensure that broker received the message

Post by pythoncoder » Mon Jan 20, 2020 9:01 am

It sounds as if the firmware issue has been fixed. Last time I tested it TLS failed on ESPx but worked on Pyboard D.

If there were a problem it would be immediately evident: it would fail to establish a secure connection.
Peter Hinch

Post Reply