I'm still learning, and although having designed systems execs, operating systems, and real time safety critical systems, I'm sure that I still don't understand the mechanisms at work within micropython's asyncio.
I'm sure that I'm not using it correctly, I can see latency on the processor with my added co-routine compared to the other processors running Peter's unadulterated asynchronous resilient MQTT. I'm sure that it's the manner that the co-routines are being instantiated and added to the queue, but I'm guessing.
I'll post the modified code below, it's mainly unadulterated exemplar, but with just a simple asynchronous co-routine to flash an external LED.
I can observe the message receipts on all three esp32's via the blue LED: When running the identical unaltered exemplar code, they all receive pretty simultaneously; when one esp32 runs with my test coroutine, I can see latency, or indeed variable lag.
I'd like to understand how to use asyncio. What happens when I declare a co-routine via asyncio def function. Is a function instantiated then, and does it place it on the queue. Is there a method by which I can observe the queue, and what is the effect of the various event or sleep/wait triggers? Otherwise I'm merely experimenting.
The micropython documentation that I've discovered is good at getting you started, but as it's a particular implementation to fit the constraints of these microprocessors, I often fall foul of misuse.
If someone could point me to a specification of the objects/functions, help me switch on diagnostics so that I can the queue, and whether I'm abusing the mechanisms.
Indeed I'm pretty sure that in my simple example I'm not using the declarations in the right place, nor instigating the waits properly. I'm sure, because someone has already told me. Understanding those and telling me what I'm doing wrong would be an absolute boon.
So, help with specification (micropython), the mechanism of instantiation of co-routine, its placement in the queue, it's removal, and diagnostics would be great
Thanks
Robin
Here's the code:-
Code: Select all
# range.py Test of asynchronous mqtt client with clean session False.
# (C) Copyright Peter Hinch 2017-2019.
# Released under the MIT licence.
# Public brokers https://github.com/mqtt/mqtt.github.io/wiki/public_brokers
# This demo is for wireless range tests. If OOR the red LED will light.
# In range the blue LED will pulse for each received message.
# Uses clean sessions to avoid backlog when OOR.
# red LED: ON == WiFi fail
# blue LED pulse == message received
# Publishes connection statistics.
from mqtt_as import MQTTClient, config
from config import wifi_led, blue_led
import uasyncio as asyncio
import machine
import gc
loop = asyncio.get_event_loop()
outages = 0
async def pulse(): # This demo pulses blue LED each time a subscribed msg arrives.
blue_led(True)
#await client.publish('foo_topic', 'Message received')
await asyncio.sleep(1)
blue_led(False)
def sub_cb(topic, msg):
print((topic, msg))
loop.create_task(pulse())
async def wifi_han(state):
global outages
wifi_led(not state) # Light LED when WiFi down
if state:
print('We are connected to broker.')
else:
outages += 1
print('WiFi or broker is down.')
await asyncio.sleep(1)
async def conn_han(client):
await client.subscribe('foo_topic', 1)
async def main(client):
try:
await client.connect()
except OSError:
print('Connection failed.')
return
n = 0
while True:
await asyncio.sleep(5)
print('publish', n)
# If WiFi is down the following will pause for the duration.
await client.publish('result', '{} repubs: {} outages: {}'.format(n, client.REPUB_COUNT, outages), qos = 1)
n += 1
#import machine
#pin2=machine.Pin(2,machine.Pin.OUT) #the inernal blue led
pin4=machine.Pin(4,machine.Pin.OUT) #externally attached led
async def testfunction ():
while True:
#gc.collect()
#print ('LED On')
pin4.on()
await asyncio.sleep(0.05)
#print('LED Off')
pin4.off()
await asyncio.sleep(5)
#print('Out')
# Define configuration
config['subs_cb'] = sub_cb
config['wifi_coro'] = wifi_han
config['will'] = ('result', 'Goodbye cruel world!', False, 0)
config['connect_coro'] = conn_han
config['keepalive'] = 120
# Set up client
MQTTClient.DEBUG = True # Optional
client = MQTTClient(config)
loop.create_task(testfunction())
try:
loop.run_until_complete(main(client))
finally: # Prevent LmacRxBlk:1 errors.
client.close()
blue_led(True)