Resilient aysnchronous MQTT optimim use

All ESP32 boards running MicroPython.
Target audience: MicroPython users with an ESP32 board.
RobinMosedale
Posts: 40
Joined: Fri Jul 26, 2019 9:40 pm

Resilient aysnchronous MQTT optimim use

Post by RobinMosedale » Sun Jul 28, 2019 3:43 pm

I'm delighted that I've managed to at last successfully run Peter's resilient MQTT application, and moreover, adding a separate test asyn co-routine to simply observe the characteristics.

I'm still learning, and although having designed systems execs, operating systems, and real time safety critical systems, I'm sure that I still don't understand the mechanisms at work within micropython's asyncio.

I'm sure that I'm not using it correctly, I can see latency on the processor with my added co-routine compared to the other processors running Peter's unadulterated asynchronous resilient MQTT. I'm sure that it's the manner that the co-routines are being instantiated and added to the queue, but I'm guessing.

I'll post the modified code below, it's mainly unadulterated exemplar, but with just a simple asynchronous co-routine to flash an external LED.
I can observe the message receipts on all three esp32's via the blue LED: When running the identical unaltered exemplar code, they all receive pretty simultaneously; when one esp32 runs with my test coroutine, I can see latency, or indeed variable lag.

I'd like to understand how to use asyncio. What happens when I declare a co-routine via asyncio def function. Is a function instantiated then, and does it place it on the queue. Is there a method by which I can observe the queue, and what is the effect of the various event or sleep/wait triggers? Otherwise I'm merely experimenting.

The micropython documentation that I've discovered is good at getting you started, but as it's a particular implementation to fit the constraints of these microprocessors, I often fall foul of misuse.
If someone could point me to a specification of the objects/functions, help me switch on diagnostics so that I can the queue, and whether I'm abusing the mechanisms.

Indeed I'm pretty sure that in my simple example I'm not using the declarations in the right place, nor instigating the waits properly. I'm sure, because someone has already told me. Understanding those and telling me what I'm doing wrong would be an absolute boon.

So, help with specification (micropython), the mechanism of instantiation of co-routine, its placement in the queue, it's removal, and diagnostics would be great

Thanks

Robin
Here's the code:-

Code: Select all

# range.py Test of asynchronous mqtt client with clean session False.
# (C) Copyright Peter Hinch 2017-2019.
# Released under the MIT licence.

# Public brokers https://github.com/mqtt/mqtt.github.io/wiki/public_brokers

# This demo is for wireless range tests. If OOR the red LED will light.
# In range the blue LED will pulse for each received message.
# Uses clean sessions to avoid backlog when OOR.

# red LED: ON == WiFi fail
# blue LED pulse == message received
# Publishes connection statistics.

from mqtt_as import MQTTClient, config
from config import wifi_led, blue_led
import uasyncio as asyncio
import machine
import gc

loop = asyncio.get_event_loop()
outages = 0

async def pulse():  # This demo pulses blue LED each time a subscribed msg arrives.
    blue_led(True)
    #await client.publish('foo_topic', 'Message received')
    await asyncio.sleep(1)
    blue_led(False)

def sub_cb(topic, msg):
    print((topic, msg))
    loop.create_task(pulse())

async def wifi_han(state):
    global outages
    wifi_led(not state)  # Light LED when WiFi down
    if state:
        print('We are connected to broker.')
    else:
        outages += 1
        print('WiFi or broker is down.')
    await asyncio.sleep(1)

async def conn_han(client):
    await client.subscribe('foo_topic', 1)

async def main(client):
    try:
        await client.connect()
    except OSError:
        print('Connection failed.')
        return
    n = 0
    while True:
        await asyncio.sleep(5)
        print('publish', n)
        # If WiFi is down the following will pause for the duration.
        await client.publish('result', '{} repubs: {} outages: {}'.format(n, client.REPUB_COUNT, outages), qos = 1)
        n += 1

#import machine

#pin2=machine.Pin(2,machine.Pin.OUT) #the inernal blue led
pin4=machine.Pin(4,machine.Pin.OUT) #externally attached led

async def testfunction ():
    while True:
        #gc.collect()
        #print ('LED On')
        pin4.on()
        await asyncio.sleep(0.05)
        #print('LED Off')
        pin4.off()
        await asyncio.sleep(5)
        #print('Out')
    
# Define configuration
config['subs_cb'] = sub_cb
config['wifi_coro'] = wifi_han
config['will'] = ('result', 'Goodbye cruel world!', False, 0)
config['connect_coro'] = conn_han
config['keepalive'] = 120

# Set up client
MQTTClient.DEBUG = True  # Optional
client = MQTTClient(config)
loop.create_task(testfunction())
try:
    loop.run_until_complete(main(client))
    
finally:  # Prevent LmacRxBlk:1 errors.
    client.close()
    blue_led(True)


kevinkk525
Posts: 969
Joined: Sat Feb 03, 2018 7:02 pm

Re: Resilient aysnchronous MQTT optimim use

Post by kevinkk525 » Sun Jul 28, 2019 5:32 pm

Quite a lot of questions, I try to answer all that I remember but be kind as I'm answering from my phone and the answers might be quite short Image

What kind of lag do you see when you run your code?
A lag of 50ms is quite possible but you wouldn't see that unless you measure it.

How are you sending messages to your esp?

To understand how to use uasynio I can refer to a guide from Peter: https://github.com/peterhinch/micropython-async
Generally a coroutine is added to the uasynio queue by loop.create_task and in micropython a coroutine is a simple generator (unlike in python) but that's just how it works behind the scenes.
Every time you add a coroutine, you add latency if you have coroutines running that don't wait long. For example if you have 2 coroutines trying to run every 5 ms but the execution of each takes 5 ms then you will see some lag as it is not a multiprocessing system. Additionally switching coroutines takes a short time too.
But generally this shouldn't be an issue with a typical application.
How to use waiting/sleeping should be covered on the tutorial by Peter.

If you are familiar with python you could look at the uasynio source code, it's not that complicated, but guessing from your question, your knowledge of python isn't so great although your general knowledge of systems and languages seem to be great.
Kevin Köck
Micropython Smarthome Firmware (with Home-Assistant integration): https://github.com/kevinkk525/pysmartnode

RobinMosedale
Posts: 40
Joined: Fri Jul 26, 2019 9:40 pm

Re: Resilient aysnchronous MQTT optimim use

Post by RobinMosedale » Sun Jul 28, 2019 7:38 pm

Kevin,

I've plenty time, so no rush, and I'm grateful

Oh, I'm familiar with Python, I've several thousand lines of code entwined with tkinter. I'm not impressed: it's not a predicative language, but it lent itself to the role that I originally wished, and on a Pi3

The broker on the Pi3 is:-

while :
do
mosquitto_pub -d -t foo_topic -m "first boring message"
sleep 1
mosquitto_pub -d -t foo_topic -m "Another boring Message"
sleep 1
done

The latency is quite noticeable, and can become quite out of step; two baseline esp32 beating simultaneously, and the one with the above code shifts until it's 180 degrees out of phase ( in the middle of the others' 1 flash period). and occasionally hesitates, hiccups, and lingers on for longer.

I'll take a look at the guide thanks.
Can you see anything that would explain that odd latency?

No rush, and many thanks

Robin

kevinkk525
Posts: 969
Joined: Sat Feb 03, 2018 7:02 pm

Re: Resilient aysnchronous MQTT optimim use

Post by kevinkk525 » Sun Jul 28, 2019 9:17 pm

I can see a case where that happens.
You send a message every second. The Esp32 needs 30-100ms to receive a message, then waits 1 second to toggle the led off again. With a publish stetement in the coroutine even 20-50ms longer.
So it is very likely that it receives a new message before the coroutine pulse has finished, starting a new coroutine pulse.

So it is possible that one esp needs more time to process message than the others and gets more out of line.
Try decreasing your message frequency to every 2s and the problem should disappear.
A different test would be to publish a message with a counter so you can see in the esp output if the latency you see is in fact latency or just a hickup of 2 coroutines running at the same time.
Kevin Köck
Micropython Smarthome Firmware (with Home-Assistant integration): https://github.com/kevinkk525/pysmartnode

RobinMosedale
Posts: 40
Joined: Fri Jul 26, 2019 9:40 pm

Re: Resilient aysnchronous MQTT optimim use

Post by RobinMosedale » Mon Jul 29, 2019 2:17 pm

Thank you Kevin.

I've modified both the baseline MQTT exemplars running on the twoesp32's to only flash 0.1s with awaits accordingly, together with the modified one with test led flash ditto.
Pretty good.

Unfortunately and having run 12 hour soak test before without issue, the testcase one before all of this last night failed with queue overrun again. Thanks for pointing me to the tutorial, from where I've found proper specifications, and better, the queue module. I may employ that merely to observe the queue to see what is happening during the runs.

RobinMosedale
Posts: 40
Joined: Fri Jul 26, 2019 9:40 pm

Re: Resilient aysnchronous MQTT optimim use

Post by RobinMosedale » Mon Jul 29, 2019 4:17 pm

queue=asyncio.Queue()

There is no object

Import Queue
There is no module.

Which contradicts the documentation referred in Peter's tutorial

User avatar
pythoncoder
Posts: 5956
Joined: Fri Jul 18, 2014 8:01 am
Location: UK
Contact:

Re: Resilient aysnchronous MQTT optimim use

Post by pythoncoder » Mon Jul 29, 2019 4:41 pm

Rather than making general claims it would be helpful if you could identify the part of my tutorial which you believe to be in error so that I can assess it and correct it if necessary.

I suspect that you have misunderstood the material. The Queue class is available for application code. It is not used internally in uasyncio, which uses ucollections.deque and utimeq.utimeq for its internal queues.

My tutorial aims to guide application development rather than to explain the internal operation of uasyncio. My under the hood guide makes an attempt to describe how it works. Note that I didn't write uasyncio so I can't guarantee its correctness. As a guide it is somewhat terse and assumes some familiarity with the code.
Peter Hinch
Index to my micropython libraries.

RobinMosedale
Posts: 40
Joined: Fri Jul 26, 2019 9:40 pm

Re: Resilient aysnchronous MQTT optimim use

Post by RobinMosedale » Mon Jul 29, 2019 7:00 pm

Peter, thank you.

From:-
https://github.com/peterhinch/micropyth ... UTORIAL.md

Section 3.5
The Queue class is officially supported and the sample program aqtest.py demonstrates its use. A queue is instantiated as follows:

from uasyncio.queues import Queue
q = Queue()

Entering REPL. Use Control-X to exit.

>>> from uasyncio.queues import Queue
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
ImportError: no module named 'uasyncio.queues'
>>>

/home/pi/micropython> repl
Entering REPL. Use Control-X to exit.

/home/pi/micropython> repl
Entering REPL. Use Control-X to exit.
>
MicroPython v1.11-167-g331c224e0 on 2019-07-22; ESP32 module with ESP32
Type "help()" for more information.
>>>
>>> import aqtests.py
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "aqtests.py", line 9, in <module>
ImportError: no module named 'uasyncio.queues'
>>>

User avatar
pythoncoder
Posts: 5956
Joined: Fri Jul 18, 2014 8:01 am
Location: UK
Contact:

Re: Resilient aysnchronous MQTT optimim use

Post by pythoncoder » Tue Jul 30, 2019 6:18 am

That's because you haven't installed the Queues module.

Code: Select all

upip.install('micropython-uasyncio.queues')
I have updated the installation instructions in the tutorial to reflect the fact that, as of MicroPython V1.11, upip now installs the correct version.
Peter Hinch
Index to my micropython libraries.

RobinMosedale
Posts: 40
Joined: Fri Jul 26, 2019 9:40 pm

Re: Resilient aysnchronous MQTT optimim use

Post by RobinMosedale » Tue Jul 30, 2019 3:44 pm

That's excellent, Peter, thank you.

All installed nicely now and can instantiate.
I did search the module library, but you're absolutely right, I'm completely unfamiliar with the micropython installation environment, and would never have guessed that it was those files or those extractions.

I'm probably anyone's worst nightmare; experienced but not in the particularities of this environment, so I'm more than grateful for your patience and help.

I have inserted in the main body:

from uasyncio.queues import Queueq=Queue()
q=Queue()

and in my 5 second triggered test coroutine I've inserted:-
async def testfunction ():
while True:
#gc.collect()
#print ('LED On')
pin4.on()
await asyncio.sleep(0.05)
#print('LED Off')
pin4.off()
print('Queue Size: ',q.qsize())
await asyncio.sleep(5)
#print('Out')

together with a new coroutine:-
async def qttask():
while True:
await asyncio.sleep(0.5)
print('Queue Size 1: ',q.qsize())
await asyncio.sleep(0.5)

Queued as:-
# Set up client
MQTTClient.DEBUG = True # Optional
client = MQTTClient(config)
loop.create_task(testfunction())
loop.create_task(qttask())
try:
loop.run_until_complete(main(client))

finally: # Prevent LmacRxBlk:1 errors.
client.close()
blue_led(True)

Both are consistently reporting 0 length queues:-


Got reliable connection
Connecting to broker.
Queue Size: 0
Connected to broker.
We are connected to broker.
(b'foo_topic', b'first boring message')
(b'foo_topic', b'first boring message')
Queue Size 1: 0
(b'foo_topic', b'Another boring Message')
(b'foo_topic', b'Another boring Message')
Queue Size 1: 0
(b'foo_topic', b'Another boring Message')
(b'foo_topic', b'first boring message')
(b'foo_topic', b'first boring message')
(b'foo_topic', b'first boring message')
Queue Size 1: 0
(b'foo_topic', b'Another boring Message')
(b'foo_topic', b'Another boring Message')
Queue Size 1: 0
(b'foo_topic', b'Another boring Message')
(b'foo_topic', b'first boring message')
(b'foo_topic', b'first boring message')
Queue Size 1: 0
(b'foo_topic', b'first boring message')
publish 0
Queue Size: 0
(b'foo_topic', b'Another boring Message')
(b'foo_topic', b'Another boring Message')
Queue Size 1: 0
(b'foo_topic', b'Another boring Message')
(b'foo_topic', b'first boring message')
Queue Size 1: 0
(b'foo_topic', b'first boring message')
(b'foo_topic', b'first boring message')
(b'foo_topic', b'Another boring Message')
Queue Size 1: 0
(b'foo_topic', b'Another boring Message')
(b'foo_topic', b'Another boring Message')
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "qt2.py", line 104, in <module>
File "qt2.py", line 100, in <module>
File "/lib/uasyncio/core.py", line 180, in run_until_complete
File "/lib/uasyncio/core.py", line 173, in run_forever
File "/lib/uasyncio/__init__.py", line 69, in wait
KeyboardInterrupt:
>>>

I'll have to think about that a bit.

Post Reply