Hi,
I am trying to asynchronously initialize a umqtt simple MQTT-Client in the main function and also check for new messages in this asynconous task. What i am basically doing in the task is initializing the client and checking for messages as follows:
while True:
await asyncio.sleep_ms(100)
client.check_msg()
To run in asynchronously, I am using uasyncio and the following:
loop.create_task(mqtt_facade.run())
loop.run_forever()
When I start the asynchronous Task in the main function, it does not work for some reason. I also have a simple http server, which works in a similiar manner. This http server works also when I am starting it using uasncio from the main.py.
The strange thing is, that when I am starting the MQTT client from the WebREPL, everythings works as expected. Only starting it from the main.py does not work.
Are there some special restrictions on what can be used in the main.py?
Problems with running umqtt client asynchronously in main.py
- pythoncoder
- Posts: 5956
- Joined: Fri Jul 18, 2014 8:01 am
- Location: UK
- Contact:
Re: Problems with running umqtt client asynchronously in main.py
In general there are no restrictions on using main.py to launch a MicroPython program. However there are many situations in which the official MQTT client can block and possibly you're encountering one of these.
An asynchronous MQTT client involves a significant rewrite involving the use of nonblocking sockets. See this solution, which, in addition to being asynchronous, aims to fix other shortfalls in the official version.
An asynchronous MQTT client involves a significant rewrite involving the use of nonblocking sockets. See this solution, which, in addition to being asynchronous, aims to fix other shortfalls in the official version.
Peter Hinch
Index to my micropython libraries.
Index to my micropython libraries.
Re: Problems with running umqtt client asynchronously in main.py
I've already thought about using your alternative MQTT Client. But I am still wondering, why something works when launched from the WebREPL but not when launched from the main.py.
I did only experienced this for the MQTT Client. I already tried to change it slightly, but without success
Any ideas, what actually is the problem in my case. Is there a built in way to get output from main.py or do I have to implement this on my own?
I did only experienced this for the MQTT Client. I already tried to change it slightly, but without success
Any ideas, what actually is the problem in my case. Is there a built in way to get output from main.py or do I have to implement this on my own?
-
- Posts: 969
- Joined: Sat Feb 03, 2018 7:02 pm
Re: Problems with running umqtt client asynchronously in main.py
I used the official mqtt once and wrapped it in asyncio. I attached the file below for you, maybe it helps. Just ignore all the functions you don't need. It worked quite well but I changed to the enhanced version of Peter Hinch a while ago as it has a lot more features.
Code: Select all
import gc
import json
from pysmartnode import config
from pysmartnode import logging
from umqtt.simple import MQTTClient
import uasyncio as asyncio
log = logging.getLogger("MQTT")
gc.collect()
# BUG: reconnecting still makes a recursion from reconnect -> publish -> reconnecty
class mqttHandler:
def __init__(self):
self.subscriptions = {}
self.payload_on = ["ON", True, "True"]
self.payload_off = ["OFF", False, "False"]
self.isReconnecting = False
self._wildcard_subs_found = False
def init(self, mqttConnectEvent):
self.mqttConnectEvent = mqttConnectEvent
self.loop = asyncio.get_event_loop()
self.retained = []
self.id = config.id
self.mqtt = MQTTClient(self.id, config.MQTT_HOST, user=config.MQTT_USER,
password=config.MQTT_PASSWORD, keepalive=config.MQTT_KEEPALIVE)
self.loop.create_task(self.init_async())
async def init_async(self):
self.mqtt.set_last_will("home/" + self.id + "/status", "OFFLINE", retain=True)
self.mqtt.connect(False)
self.mqtt.set_callback(self.execute_sync)
self.mqttConnectEvent.set()
log.setMQTT(self)
await self.publish("home/" + self.id + "/version", config.VERSION, True)
await self.publish("home/" + self.id + "/status", "ONLINE", True)
self.loop.create_task(self.check_msg())
self.loop.create_task(self.send_ping())
async def send_ping(self):
interval = config.MQTT_KEEPALIVE - 10
while True:
await asyncio.sleep(interval)
try:
self.mqtt.ping()
except OSError as e:
print("Error pinging broker: {!r}".format(e))
if self.isReconnecting == False:
await self.reconnect()
self.isReconnecting = False
else:
await asyncio.sleep(1)
async def login_finish(self, data):
if self.loginEvent.is_set() == False:
if "message" not in data:
print("Message not in data")
return False
if type(data["message"]) != dict:
print("Login config is no dict, either error in dict or no config provided")
# try:
# data["message"]=json.loads(data["message"])
# except:
# print("Could not convert login config to dict")
# return False
print("login", data["message"])
erg = data["message"]
config.registerComponents(erg)
self.loginEvent.set()
else:
log.warn("Got new login data although logged in")
# if host is not running, this reboots client on line open_connection (most of the time...)
async def login(self, loginEvent):
# await asyncio.sleep(1)
await self.mqttConnectEvent
print("logging in")
print("ram", gc.mem_free())
self.loginEvent = loginEvent
await self.subscribe("home/login/" + self.id, self.login_finish)
log.info("Client version: {!s}".format(config.VERSION))
await self.publish("home/login", {"command": "login", "id": self.id, "version": config.VERSION})
async def check_msg(self):
while True:
try:
self.mqtt.check_msg()
except OSError as e:
if self.isReconnecting == False:
print("Error checking mqtt messages: {!r}".format(e))
await self.reconnect()
else:
pass
await asyncio.sleep_ms(20)
# print("checking")
async def await_retained(self, topic):
await asyncio.sleep(2)
if topic in self.retained:
del self.retained[self.retained.index(topic)] # probably no retained message available
#unsubscribe, not added in mqttclient
def execute_sync(self, topic, msg): # mqtt library only handles sync callbacks so add it to async loop
self.loop.create_task(self.execute(topic, msg))
async def execute(self, data, msg=None):
if msg is not None: # remnant of using this for executionHandler with network sockets
data = {"topic": data.decode(), "message": msg.decode()}
try:
data["message"] = json.loads(data["message"])
except:
pass
if "topic" not in data:
log.error("No topic found in data")
return "ERROR: No topic"
if "message" not in data:
log.error("No message found in data")
return "ERROR: No message"
topic = data["topic"] # this is the original topic, data["topic"] is the working topic
if self._wildcard_subs_found:
for subs in self.subscriptions:
if subs.rfind("/#") != -1:
if topic.find(subs[:-1]) != -1:
data["topic"] = subs
break
if data["topic"] in self.retained:
ending = "/set" if data["topic"].rfind("#") == -1 else ""
if data["topic"] + ending in self.subscriptions:
try:
if type(self.subscriptions[data["topic"] + ending]) != dict:
callback = self.subscriptions[data["topic"] + ending]
await callback({"message": data["message"], "topic": topic, "retained": True})
else:
for callback in self.subscriptions[data["topic"]]:
await callback({"message": data["message"], "topic": topic, "retained": True})
except Exception as e:
log.error("Error executing retained mqtt topic {!r}: {!s}".format(data["topic"], e))
if self._wildcard_subs_found == False or data["topic"].rfind("#") == -1:
try:
del self.retained[self.retained.index(data["topic"])]
except:
pass
return True
else:
#log.warn("Got retained message for topic {!r} but did not subscribe to {!r}/set")
return False
if data["topic"] not in self.subscriptions:
return True
if type(self.subscriptions[data["topic"]]) != list:
try:
erg = await self.subscriptions[data["topic"]]({"message": data["message"], "topic": topic})
# so that an integer 0 is interpreted as a result to send back
if (type(erg) == int and erg is not None) or erg == True:
if erg == True and type(erg) != int:
erg = data["message"]
if topic.rfind("/set") != -1:
topic_a = topic[:topic.rfind("/set")]
await self.publish(topic_a, erg, retain=True)
except Exception as e:
erg = False
log.error("Error in function for topic {!r}: {!s}".format(topic, e))
else:
for callback in self.subscriptions[data["topic"]]:
try:
erg = await callback({"message": data["message"], "topic": topic})
except Exception as e:
log.error("Error in function for topic {!r}: {!s}".format(data["topic"], e))
erg = False
if topic.rfind("/set") != -1:
topic_a = topic[:topic.rfind("/set")]
await self.publish(topic_a, erg, retain=True)
return erg
async def __resubscribe(self):
for topic in self.subscriptions:
try:
self.mqtt.subscribe(topic)
except:
print("error resubscribing")
async def subscribe(self, topic, callback): # only async callbacks allowed
if topic not in self.subscriptions:
if topic.find("/set") != -1: # subscribe to topic without /set to get retained message
self.retained.append(topic[:topic.find("/set")])
await self.mqttConnectEvent
try:
self.mqtt.subscribe(topic[:topic.find("/set")])
except:
log.error("Could not subscribe to topic {!r}".format(topic[:topic.find("/set")]))
self.loop.create_task(self.await_retained(topic[:topic.find("/set")]))
if topic.find("/#") != -1:
self._wildcard_subs_found = True
self.retained.append(topic)
await self.mqttConnectEvent
try:
self.mqtt.subscribe(topic)
erg = True
except Exception as e:
log.error("Could not subscribe to topic {!r}: {!s}".format(topic, e))
erg = False
self.loop.create_task(self.await_retained(topic))
else:
await self.mqttConnectEvent
try:
self.mqtt.subscribe(topic)
erg = True
except:
erg = False
if erg == True:
self.subscriptions[topic] = callback
return True
else:
try:
del self.retained[self.retained.index(topic[:topic.find("/set")])]
except:
pass
if self._wildcard_subs_found:
try:
del self.retained[self.retained.index(topic)]
except:
pass
log.critical("Could not subscribe to topic {!r}".format(topic))
return False
else:
if self.subscriptions[topic] == callback:
log.warn("Subscription {!r} with callback already registered")
return True
else:
if type(self.subscriptions[topic]) == str:
self.subscriptions[topic] = [self.subscriptions[topic]]
self.subscriptions[topic].append(callback)
return True
else:
if callback not in self.subscriptions[topic]:
self.subscriptions[topic].append(callback)
return True
else:
log.warn("Subscription {!r} with callback already registered")
return True
async def unsubscribe(self, topic, callback=None):
# not supported by mqtt client library
# if type(self.subscriptions[topic])==str or callback is None:
# erg=self.mqtt.unsubscribe(topic)
# if erg==False:
# log.error("Could not unsubscribe topic {!r}, doing it internally".format(topic))
if topic not in self.subscriptions:
log.warn("Topic {!r} not subscribed".format(topic))
return True
else:
if type(self.subscriptions[topic]) == str:
if callback:
if self.subscriptions[topic] == callback:
del self.subscriptions[topic]
return True
else:
log.warn("Different callback registered to subscription {!r}".format(topic))
return False
else:
del self.subscriptions[topic]
return True
else:
if callback:
if callback in self.subscriptions[topic]:
del self.subscriptions[topic][self.subscriptions[topic].index(callback)]
return True
else:
log.warn("Callback not registered to topic {!r}".format(topic))
return True
else:
del self.subscriptions[topic]
return True
async def reconnect(self):
i = 0
self.isReconnecting = True
while True:
try:
self.mqtt.connect(False)
await self.__resubscribe()
return True
except OSError as e:
# can't use log as mqtt unavailable
print("Error reconnecting to mqtt broker: {!r}".format(e))
await asyncio.sleep(2 * i)
i = i + 1 if i < 600 else 600
finally:
self.isReconnecting = False
await self.publish("home/" + self.id + "/status", "ONLINE", retain=True)
async def publish(self, topic, message, retain=False):
await self.mqttConnectEvent
while True:
try:
return self.mqtt.publish(topic.encode(), str(message).encode(), retain)
except OSError as e:
print("Error publishing to mqtt broker: {!r}".format(e))
if self.isReconnecting == False:
await self.reconnect()
self.isReconnecting = False
else:
await asyncio.sleep(1)
def __getTopic(self, attribute, command=False, home=None, device_id=None):
home = home or (config.MQTT_HOME if hasattr(config, "MQTT_HOME") else "home")
device_id = device_id or config.id
topic = "{}/{}/{}".format(home, device_id, attribute)
if command:
topic = "{}/set".format(topic)
return topic
def scheduleSubscribe(self, topic, callback):
self.loop.create_task(self.subscribe(topic, callback))
mqtt = mqttHandler()
Kevin Köck
Micropython Smarthome Firmware (with Home-Assistant integration): https://github.com/kevinkk525/pysmartnode
Micropython Smarthome Firmware (with Home-Assistant integration): https://github.com/kevinkk525/pysmartnode
Re: Problems with running umqtt client asynchronously in main.py
Just changed my code again with some input from your script and now it seems to be working. I still don't know what change made the difference but I'll try to figure it out. I will keep you informed. I am currently using non-blocking check_msg from MQTT client with periodically calling
await asyncio.sleep_ms(100)
which seems to work. Of course, this is not a perfect way for concurrently running MQTT and HTTP server, but for my purposes it is sufficient at the moment.
await asyncio.sleep_ms(100)
which seems to work. Of course, this is not a perfect way for concurrently running MQTT and HTTP server, but for my purposes it is sufficient at the moment.
-
- Posts: 144
- Joined: Mon Jul 25, 2022 9:45 pm
Re: Problems with running umqtt client asynchronously in main.py
Updated script? Post it here?
- pythoncoder
- Posts: 5956
- Joined: Fri Jul 18, 2014 8:01 am
- Location: UK
- Contact:
Re: Problems with running umqtt client asynchronously in main.py
If asynchronous operation is important mqtt_as is a proven solution.
Peter Hinch
Index to my micropython libraries.
Index to my micropython libraries.