I used the official mqtt once and wrapped it in asyncio. I attached the file below for you, maybe it helps. Just ignore all the functions you don't need. It worked quite well but I changed to the enhanced version of Peter Hinch a while ago as it has a lot more features.
Code: Select all
import gc
import json
from pysmartnode import config
from pysmartnode import logging
from umqtt.simple import MQTTClient
import uasyncio as asyncio
log = logging.getLogger("MQTT")
gc.collect()
# BUG: reconnecting still makes a recursion from reconnect -> publish -> reconnecty
class mqttHandler:
def __init__(self):
self.subscriptions = {}
self.payload_on = ["ON", True, "True"]
self.payload_off = ["OFF", False, "False"]
self.isReconnecting = False
self._wildcard_subs_found = False
def init(self, mqttConnectEvent):
self.mqttConnectEvent = mqttConnectEvent
self.loop = asyncio.get_event_loop()
self.retained = []
self.id = config.id
self.mqtt = MQTTClient(self.id, config.MQTT_HOST, user=config.MQTT_USER,
password=config.MQTT_PASSWORD, keepalive=config.MQTT_KEEPALIVE)
self.loop.create_task(self.init_async())
async def init_async(self):
self.mqtt.set_last_will("home/" + self.id + "/status", "OFFLINE", retain=True)
self.mqtt.connect(False)
self.mqtt.set_callback(self.execute_sync)
self.mqttConnectEvent.set()
log.setMQTT(self)
await self.publish("home/" + self.id + "/version", config.VERSION, True)
await self.publish("home/" + self.id + "/status", "ONLINE", True)
self.loop.create_task(self.check_msg())
self.loop.create_task(self.send_ping())
async def send_ping(self):
interval = config.MQTT_KEEPALIVE - 10
while True:
await asyncio.sleep(interval)
try:
self.mqtt.ping()
except OSError as e:
print("Error pinging broker: {!r}".format(e))
if self.isReconnecting == False:
await self.reconnect()
self.isReconnecting = False
else:
await asyncio.sleep(1)
async def login_finish(self, data):
if self.loginEvent.is_set() == False:
if "message" not in data:
print("Message not in data")
return False
if type(data["message"]) != dict:
print("Login config is no dict, either error in dict or no config provided")
# try:
# data["message"]=json.loads(data["message"])
# except:
# print("Could not convert login config to dict")
# return False
print("login", data["message"])
erg = data["message"]
config.registerComponents(erg)
self.loginEvent.set()
else:
log.warn("Got new login data although logged in")
# if host is not running, this reboots client on line open_connection (most of the time...)
async def login(self, loginEvent):
# await asyncio.sleep(1)
await self.mqttConnectEvent
print("logging in")
print("ram", gc.mem_free())
self.loginEvent = loginEvent
await self.subscribe("home/login/" + self.id, self.login_finish)
log.info("Client version: {!s}".format(config.VERSION))
await self.publish("home/login", {"command": "login", "id": self.id, "version": config.VERSION})
async def check_msg(self):
while True:
try:
self.mqtt.check_msg()
except OSError as e:
if self.isReconnecting == False:
print("Error checking mqtt messages: {!r}".format(e))
await self.reconnect()
else:
pass
await asyncio.sleep_ms(20)
# print("checking")
async def await_retained(self, topic):
await asyncio.sleep(2)
if topic in self.retained:
del self.retained[self.retained.index(topic)] # probably no retained message available
#unsubscribe, not added in mqttclient
def execute_sync(self, topic, msg): # mqtt library only handles sync callbacks so add it to async loop
self.loop.create_task(self.execute(topic, msg))
async def execute(self, data, msg=None):
if msg is not None: # remnant of using this for executionHandler with network sockets
data = {"topic": data.decode(), "message": msg.decode()}
try:
data["message"] = json.loads(data["message"])
except:
pass
if "topic" not in data:
log.error("No topic found in data")
return "ERROR: No topic"
if "message" not in data:
log.error("No message found in data")
return "ERROR: No message"
topic = data["topic"] # this is the original topic, data["topic"] is the working topic
if self._wildcard_subs_found:
for subs in self.subscriptions:
if subs.rfind("/#") != -1:
if topic.find(subs[:-1]) != -1:
data["topic"] = subs
break
if data["topic"] in self.retained:
ending = "/set" if data["topic"].rfind("#") == -1 else ""
if data["topic"] + ending in self.subscriptions:
try:
if type(self.subscriptions[data["topic"] + ending]) != dict:
callback = self.subscriptions[data["topic"] + ending]
await callback({"message": data["message"], "topic": topic, "retained": True})
else:
for callback in self.subscriptions[data["topic"]]:
await callback({"message": data["message"], "topic": topic, "retained": True})
except Exception as e:
log.error("Error executing retained mqtt topic {!r}: {!s}".format(data["topic"], e))
if self._wildcard_subs_found == False or data["topic"].rfind("#") == -1:
try:
del self.retained[self.retained.index(data["topic"])]
except:
pass
return True
else:
#log.warn("Got retained message for topic {!r} but did not subscribe to {!r}/set")
return False
if data["topic"] not in self.subscriptions:
return True
if type(self.subscriptions[data["topic"]]) != list:
try:
erg = await self.subscriptions[data["topic"]]({"message": data["message"], "topic": topic})
# so that an integer 0 is interpreted as a result to send back
if (type(erg) == int and erg is not None) or erg == True:
if erg == True and type(erg) != int:
erg = data["message"]
if topic.rfind("/set") != -1:
topic_a = topic[:topic.rfind("/set")]
await self.publish(topic_a, erg, retain=True)
except Exception as e:
erg = False
log.error("Error in function for topic {!r}: {!s}".format(topic, e))
else:
for callback in self.subscriptions[data["topic"]]:
try:
erg = await callback({"message": data["message"], "topic": topic})
except Exception as e:
log.error("Error in function for topic {!r}: {!s}".format(data["topic"], e))
erg = False
if topic.rfind("/set") != -1:
topic_a = topic[:topic.rfind("/set")]
await self.publish(topic_a, erg, retain=True)
return erg
async def __resubscribe(self):
for topic in self.subscriptions:
try:
self.mqtt.subscribe(topic)
except:
print("error resubscribing")
async def subscribe(self, topic, callback): # only async callbacks allowed
if topic not in self.subscriptions:
if topic.find("/set") != -1: # subscribe to topic without /set to get retained message
self.retained.append(topic[:topic.find("/set")])
await self.mqttConnectEvent
try:
self.mqtt.subscribe(topic[:topic.find("/set")])
except:
log.error("Could not subscribe to topic {!r}".format(topic[:topic.find("/set")]))
self.loop.create_task(self.await_retained(topic[:topic.find("/set")]))
if topic.find("/#") != -1:
self._wildcard_subs_found = True
self.retained.append(topic)
await self.mqttConnectEvent
try:
self.mqtt.subscribe(topic)
erg = True
except Exception as e:
log.error("Could not subscribe to topic {!r}: {!s}".format(topic, e))
erg = False
self.loop.create_task(self.await_retained(topic))
else:
await self.mqttConnectEvent
try:
self.mqtt.subscribe(topic)
erg = True
except:
erg = False
if erg == True:
self.subscriptions[topic] = callback
return True
else:
try:
del self.retained[self.retained.index(topic[:topic.find("/set")])]
except:
pass
if self._wildcard_subs_found:
try:
del self.retained[self.retained.index(topic)]
except:
pass
log.critical("Could not subscribe to topic {!r}".format(topic))
return False
else:
if self.subscriptions[topic] == callback:
log.warn("Subscription {!r} with callback already registered")
return True
else:
if type(self.subscriptions[topic]) == str:
self.subscriptions[topic] = [self.subscriptions[topic]]
self.subscriptions[topic].append(callback)
return True
else:
if callback not in self.subscriptions[topic]:
self.subscriptions[topic].append(callback)
return True
else:
log.warn("Subscription {!r} with callback already registered")
return True
async def unsubscribe(self, topic, callback=None):
# not supported by mqtt client library
# if type(self.subscriptions[topic])==str or callback is None:
# erg=self.mqtt.unsubscribe(topic)
# if erg==False:
# log.error("Could not unsubscribe topic {!r}, doing it internally".format(topic))
if topic not in self.subscriptions:
log.warn("Topic {!r} not subscribed".format(topic))
return True
else:
if type(self.subscriptions[topic]) == str:
if callback:
if self.subscriptions[topic] == callback:
del self.subscriptions[topic]
return True
else:
log.warn("Different callback registered to subscription {!r}".format(topic))
return False
else:
del self.subscriptions[topic]
return True
else:
if callback:
if callback in self.subscriptions[topic]:
del self.subscriptions[topic][self.subscriptions[topic].index(callback)]
return True
else:
log.warn("Callback not registered to topic {!r}".format(topic))
return True
else:
del self.subscriptions[topic]
return True
async def reconnect(self):
i = 0
self.isReconnecting = True
while True:
try:
self.mqtt.connect(False)
await self.__resubscribe()
return True
except OSError as e:
# can't use log as mqtt unavailable
print("Error reconnecting to mqtt broker: {!r}".format(e))
await asyncio.sleep(2 * i)
i = i + 1 if i < 600 else 600
finally:
self.isReconnecting = False
await self.publish("home/" + self.id + "/status", "ONLINE", retain=True)
async def publish(self, topic, message, retain=False):
await self.mqttConnectEvent
while True:
try:
return self.mqtt.publish(topic.encode(), str(message).encode(), retain)
except OSError as e:
print("Error publishing to mqtt broker: {!r}".format(e))
if self.isReconnecting == False:
await self.reconnect()
self.isReconnecting = False
else:
await asyncio.sleep(1)
def __getTopic(self, attribute, command=False, home=None, device_id=None):
home = home or (config.MQTT_HOME if hasattr(config, "MQTT_HOME") else "home")
device_id = device_id or config.id
topic = "{}/{}/{}".format(home, device_id, attribute)
if command:
topic = "{}/set".format(topic)
return topic
def scheduleSubscribe(self, topic, callback):
self.loop.create_task(self.subscribe(topic, callback))
mqtt = mqttHandler()