Problems with running umqtt client asynchronously in main.py

General discussions and questions abound development of code with MicroPython that is not hardware specific.
Target audience: MicroPython Users.
Post Reply
schwinnez
Posts: 5
Joined: Sun Apr 22, 2018 12:19 pm

Problems with running umqtt client asynchronously in main.py

Post by schwinnez » Sun Apr 22, 2018 12:43 pm

Hi,

I am trying to asynchronously initialize a umqtt simple MQTT-Client in the main function and also check for new messages in this asynconous task. What i am basically doing in the task is initializing the client and checking for messages as follows:

while True:
await asyncio.sleep_ms(100)
client.check_msg()

To run in asynchronously, I am using uasyncio and the following:

loop.create_task(mqtt_facade.run())
loop.run_forever()

When I start the asynchronous Task in the main function, it does not work for some reason. I also have a simple http server, which works in a similiar manner. This http server works also when I am starting it using uasncio from the main.py.
The strange thing is, that when I am starting the MQTT client from the WebREPL, everythings works as expected. Only starting it from the main.py does not work.

Are there some special restrictions on what can be used in the main.py?

User avatar
pythoncoder
Posts: 5956
Joined: Fri Jul 18, 2014 8:01 am
Location: UK
Contact:

Re: Problems with running umqtt client asynchronously in main.py

Post by pythoncoder » Mon Apr 23, 2018 8:41 am

In general there are no restrictions on using main.py to launch a MicroPython program. However there are many situations in which the official MQTT client can block and possibly you're encountering one of these.

An asynchronous MQTT client involves a significant rewrite involving the use of nonblocking sockets. See this solution, which, in addition to being asynchronous, aims to fix other shortfalls in the official version.
Peter Hinch
Index to my micropython libraries.

schwinnez
Posts: 5
Joined: Sun Apr 22, 2018 12:19 pm

Re: Problems with running umqtt client asynchronously in main.py

Post by schwinnez » Mon Apr 23, 2018 9:07 am

I've already thought about using your alternative MQTT Client. ;) But I am still wondering, why something works when launched from the WebREPL but not when launched from the main.py.
I did only experienced this for the MQTT Client. I already tried to change it slightly, but without success :(
Any ideas, what actually is the problem in my case. Is there a built in way to get output from main.py or do I have to implement this on my own?

kevinkk525
Posts: 969
Joined: Sat Feb 03, 2018 7:02 pm

Re: Problems with running umqtt client asynchronously in main.py

Post by kevinkk525 » Mon Apr 23, 2018 7:53 pm

I used the official mqtt once and wrapped it in asyncio. I attached the file below for you, maybe it helps. Just ignore all the functions you don't need. It worked quite well but I changed to the enhanced version of Peter Hinch a while ago as it has a lot more features.

Code: Select all

import gc
import json

from pysmartnode import config
from pysmartnode import logging
from umqtt.simple import MQTTClient
import uasyncio as asyncio


log = logging.getLogger("MQTT")
gc.collect()

# BUG: reconnecting still makes a recursion from reconnect -> publish -> reconnecty


class mqttHandler:
    def __init__(self):
        self.subscriptions = {}
        self.payload_on = ["ON", True, "True"]
        self.payload_off = ["OFF", False, "False"]
        self.isReconnecting = False
        self._wildcard_subs_found = False

    def init(self, mqttConnectEvent):
        self.mqttConnectEvent = mqttConnectEvent
        self.loop = asyncio.get_event_loop()
        self.retained = []
        self.id = config.id
        self.mqtt = MQTTClient(self.id, config.MQTT_HOST, user=config.MQTT_USER,
                               password=config.MQTT_PASSWORD, keepalive=config.MQTT_KEEPALIVE)
        self.loop.create_task(self.init_async())

    async def init_async(self):
        self.mqtt.set_last_will("home/" + self.id + "/status", "OFFLINE", retain=True)
        self.mqtt.connect(False)
        self.mqtt.set_callback(self.execute_sync)
        self.mqttConnectEvent.set()
        log.setMQTT(self)
        await self.publish("home/" + self.id + "/version", config.VERSION, True)
        await self.publish("home/" + self.id + "/status", "ONLINE", True)
        self.loop.create_task(self.check_msg())
        self.loop.create_task(self.send_ping())

    async def send_ping(self):
        interval = config.MQTT_KEEPALIVE - 10
        while True:
            await asyncio.sleep(interval)
            try:
                self.mqtt.ping()
            except OSError as e:
                print("Error pinging broker: {!r}".format(e))
                if self.isReconnecting == False:
                    await self.reconnect()
                    self.isReconnecting = False
                else:
                    await asyncio.sleep(1)

    async def login_finish(self, data):
        if self.loginEvent.is_set() == False:
            if "message" not in data:
                print("Message not in data")
                return False
            if type(data["message"]) != dict:
                print("Login config is no dict, either error in dict or no config provided")
            # try:
            #    data["message"]=json.loads(data["message"])
            # except:
            #    print("Could not convert login config to dict")
            #    return False
            print("login", data["message"])
            erg = data["message"]
            config.registerComponents(erg)
            self.loginEvent.set()
        else:
            log.warn("Got new login data although logged in")

    # if host is not running, this reboots client on line open_connection (most of the time...)
    async def login(self, loginEvent):
        # await asyncio.sleep(1)
        await self.mqttConnectEvent
        print("logging in")
        print("ram", gc.mem_free())
        self.loginEvent = loginEvent
        await self.subscribe("home/login/" + self.id, self.login_finish)
        log.info("Client version: {!s}".format(config.VERSION))
        await self.publish("home/login", {"command": "login", "id": self.id, "version": config.VERSION})

    async def check_msg(self):
        while True:
            try:
                self.mqtt.check_msg()
            except OSError as e:
                if self.isReconnecting == False:
                    print("Error checking mqtt messages: {!r}".format(e))
                    await self.reconnect()
                else:
                    pass
            await asyncio.sleep_ms(20)
            # print("checking")

    async def await_retained(self, topic):
        await asyncio.sleep(2)
        if topic in self.retained:
            del self.retained[self.retained.index(topic)]  # probably no retained message available
        #unsubscribe, not added in mqttclient

    def execute_sync(self, topic, msg):  # mqtt library only handles sync callbacks so add it to async loop
        self.loop.create_task(self.execute(topic, msg))

    async def execute(self, data, msg=None):
        if msg is not None:  # remnant of using this for executionHandler with network sockets
            data = {"topic": data.decode(), "message": msg.decode()}
        try:
            data["message"] = json.loads(data["message"])
        except:
            pass
        if "topic" not in data:
            log.error("No topic found in data")
            return "ERROR: No topic"
        if "message" not in data:
            log.error("No message found in data")
            return "ERROR: No message"
        topic = data["topic"]  # this is the original topic, data["topic"] is the working topic
        if self._wildcard_subs_found:
            for subs in self.subscriptions:
                if subs.rfind("/#") != -1:
                    if topic.find(subs[:-1]) != -1:
                        data["topic"] = subs
                        break
        if data["topic"] in self.retained:
            ending = "/set" if data["topic"].rfind("#") == -1 else ""
            if data["topic"] + ending in self.subscriptions:
                try:
                    if type(self.subscriptions[data["topic"] + ending]) != dict:
                        callback = self.subscriptions[data["topic"] + ending]
                        await callback({"message": data["message"], "topic": topic, "retained": True})
                    else:
                        for callback in self.subscriptions[data["topic"]]:
                            await callback({"message": data["message"], "topic": topic, "retained": True})

                except Exception as e:
                    log.error("Error executing retained mqtt topic {!r}: {!s}".format(data["topic"], e))
                if self._wildcard_subs_found == False or data["topic"].rfind("#") == -1:
                    try:
                        del self.retained[self.retained.index(data["topic"])]
                    except:
                        pass
                return True
            else:
                #log.warn("Got retained message for topic {!r} but did not subscribe to {!r}/set")
                return False
        if data["topic"] not in self.subscriptions:
            return True
        if type(self.subscriptions[data["topic"]]) != list:
            try:
                erg = await self.subscriptions[data["topic"]]({"message": data["message"], "topic": topic})
                # so that an integer 0 is interpreted as a result to send back
                if (type(erg) == int and erg is not None) or erg == True:
                    if erg == True and type(erg) != int:
                        erg = data["message"]
                    if topic.rfind("/set") != -1:
                        topic_a = topic[:topic.rfind("/set")]
                        await self.publish(topic_a, erg, retain=True)
            except Exception as e:
                erg = False
                log.error("Error in function for topic {!r}: {!s}".format(topic, e))
        else:
            for callback in self.subscriptions[data["topic"]]:
                try:
                    erg = await callback({"message": data["message"], "topic": topic})
                except Exception as e:
                    log.error("Error in function for topic {!r}: {!s}".format(data["topic"], e))
                    erg = False
            if topic.rfind("/set") != -1:
                topic_a = topic[:topic.rfind("/set")]
                await self.publish(topic_a, erg, retain=True)
        return erg

    async def __resubscribe(self):
        for topic in self.subscriptions:
            try:
                self.mqtt.subscribe(topic)
            except:
                print("error resubscribing")

    async def subscribe(self, topic, callback):  # only async callbacks allowed
        if topic not in self.subscriptions:
            if topic.find("/set") != -1:  # subscribe to topic without /set to get retained message
                self.retained.append(topic[:topic.find("/set")])
                await self.mqttConnectEvent
                try:
                    self.mqtt.subscribe(topic[:topic.find("/set")])
                except:
                    log.error("Could not subscribe to topic {!r}".format(topic[:topic.find("/set")]))
                self.loop.create_task(self.await_retained(topic[:topic.find("/set")]))
            if topic.find("/#") != -1:
                self._wildcard_subs_found = True
                self.retained.append(topic)
                await self.mqttConnectEvent
                try:
                    self.mqtt.subscribe(topic)
                    erg = True
                except Exception as e:
                    log.error("Could not subscribe to topic {!r}: {!s}".format(topic, e))
                    erg = False
                self.loop.create_task(self.await_retained(topic))
            else:
                await self.mqttConnectEvent
                try:
                    self.mqtt.subscribe(topic)
                    erg = True
                except:
                    erg = False
            if erg == True:
                self.subscriptions[topic] = callback
                return True
            else:
                try:
                    del self.retained[self.retained.index(topic[:topic.find("/set")])]
                except:
                    pass
                if self._wildcard_subs_found:
                    try:
                        del self.retained[self.retained.index(topic)]
                    except:
                        pass
                log.critical("Could not subscribe to topic {!r}".format(topic))
                return False
        else:
            if self.subscriptions[topic] == callback:
                log.warn("Subscription {!r} with callback already registered")
                return True
            else:
                if type(self.subscriptions[topic]) == str:
                    self.subscriptions[topic] = [self.subscriptions[topic]]
                    self.subscriptions[topic].append(callback)
                    return True
                else:
                    if callback not in self.subscriptions[topic]:
                        self.subscriptions[topic].append(callback)
                        return True
                    else:
                        log.warn("Subscription {!r} with callback already registered")
                        return True

    async def unsubscribe(self, topic, callback=None):
        # not supported by mqtt client library
        # if type(self.subscriptions[topic])==str or callback is None:
        #    erg=self.mqtt.unsubscribe(topic)
        #    if erg==False:
        #        log.error("Could not unsubscribe topic {!r}, doing it internally".format(topic))
        if topic not in self.subscriptions:
            log.warn("Topic {!r} not subscribed".format(topic))
            return True
        else:
            if type(self.subscriptions[topic]) == str:
                if callback:
                    if self.subscriptions[topic] == callback:
                        del self.subscriptions[topic]
                        return True
                    else:
                        log.warn("Different callback registered to subscription {!r}".format(topic))
                        return False
                else:
                    del self.subscriptions[topic]
                    return True
            else:
                if callback:
                    if callback in self.subscriptions[topic]:
                        del self.subscriptions[topic][self.subscriptions[topic].index(callback)]
                        return True
                    else:
                        log.warn("Callback not registered to topic {!r}".format(topic))
                        return True
                else:
                    del self.subscriptions[topic]
                    return True

    async def reconnect(self):
        i = 0
        self.isReconnecting = True
        while True:
            try:
                self.mqtt.connect(False)
                await self.__resubscribe()
                return True
            except OSError as e:
                # can't use log as mqtt unavailable
                print("Error reconnecting to mqtt broker: {!r}".format(e))
                await asyncio.sleep(2 * i)
                i = i + 1 if i < 600 else 600
            finally:
                self.isReconnecting = False
                await self.publish("home/" + self.id + "/status", "ONLINE", retain=True)

    async def publish(self, topic, message, retain=False):
        await self.mqttConnectEvent
        while True:
            try:
                return self.mqtt.publish(topic.encode(), str(message).encode(), retain)
            except OSError as e:
                print("Error publishing to mqtt broker: {!r}".format(e))
                if self.isReconnecting == False:
                    await self.reconnect()
                    self.isReconnecting = False
                else:
                    await asyncio.sleep(1)

    def __getTopic(self, attribute, command=False, home=None, device_id=None):
        home = home or (config.MQTT_HOME if hasattr(config, "MQTT_HOME") else "home")
        device_id = device_id or config.id
        topic = "{}/{}/{}".format(home, device_id, attribute)
        if command:
            topic = "{}/set".format(topic)
        return topic

    def scheduleSubscribe(self, topic, callback):
        self.loop.create_task(self.subscribe(topic, callback))


mqtt = mqttHandler()
Kevin Köck
Micropython Smarthome Firmware (with Home-Assistant integration): https://github.com/kevinkk525/pysmartnode

schwinnez
Posts: 5
Joined: Sun Apr 22, 2018 12:19 pm

Re: Problems with running umqtt client asynchronously in main.py

Post by schwinnez » Mon Apr 23, 2018 8:25 pm

Just changed my code again with some input from your script and now it seems to be working. I still don't know what change made the difference but I'll try to figure it out. I will keep you informed. I am currently using non-blocking check_msg from MQTT client with periodically calling
await asyncio.sleep_ms(100)
which seems to work. Of course, this is not a perfect way for concurrently running MQTT and HTTP server, but for my purposes it is sufficient at the moment.

Jibun no kage
Posts: 144
Joined: Mon Jul 25, 2022 9:45 pm

Re: Problems with running umqtt client asynchronously in main.py

Post by Jibun no kage » Wed Jul 27, 2022 8:43 pm

Updated script? Post it here?

User avatar
pythoncoder
Posts: 5956
Joined: Fri Jul 18, 2014 8:01 am
Location: UK
Contact:

Re: Problems with running umqtt client asynchronously in main.py

Post by pythoncoder » Thu Jul 28, 2022 8:36 am

If asynchronous operation is important mqtt_as is a proven solution.
Peter Hinch
Index to my micropython libraries.

Post Reply