How to run an mqtt-repl?

General discussions and questions abound development of code with MicroPython that is not hardware specific.
Target audience: MicroPython Users.
User avatar
pythoncoder
Posts: 5956
Joined: Fri Jul 18, 2014 8:01 am
Location: UK
Contact:

Re: How to run an mqtt-repl?

Post by pythoncoder » Sat Jan 18, 2020 7:02 am

tve wrote:
Wed Jan 15, 2020 10:45 pm
...BTW, looking at the synchronization primitives in https://github.com/peterhinch/micropyth ... er/asyn.py I was struck by the fact that they all end up busy waiting, i.e. calling await asyncio.sleep_ms(<some small number>) in a loop :-(...
Sorry, late coming to this thread.

Yes, this is not ideal. I wrote them in this way so that they were compatible with all versions of uasyncio, especially official V2.0. This should change with Damien's forthcoming new uasyncio. He has already implemented efficient Lock and Event classes and I have submitted suggestions for the other primitives.
Peter Hinch
Index to my micropython libraries.

User avatar
tve
Posts: 216
Joined: Wed Jan 01, 2020 10:12 pm
Location: Santa Barbara, CA
Contact:

Re: How to run an mqtt-repl?

Post by tve » Sat Jan 18, 2020 10:48 pm

Welcome to the party :-) :mrgreen:
I did see with excitement that Damien got rid of the busy-polling in the upcoming uasyncio. But then I looked at the implementation of ipoll and saw that it relies on polling: https://github.com/micropython/micropyt ... #L250-L257

User avatar
pythoncoder
Posts: 5956
Joined: Fri Jul 18, 2014 8:01 am
Location: UK
Contact:

Polling in uasyncio

Post by pythoncoder » Sun Jan 19, 2020 10:27 am

The mechanism of select.poll is above my pay-grade; how does CPython do it? I suspect the name carries a clue. The most common object to submit to this mechanism is probably a socket. Is there another way to check a socket for incoming data aside from polling?

Polling a flag, as done in my Event class, while inelegant, has minimal impact on application performance. On a Pyboard 1.x uasyncio V2 can switch context in about 150μs. So a pending event might use about 200μs every complete cycle of the scheduler. Consider a microcontroller application with 8 coroutines, each using 5ms of processing between yields. A complete scheduler cycle therefore takes 40ms, with the overhead of each pending event being about 0.5%.

Obviously these numbers are highly debatable but the take-away is that even current official uasyncio is extremely efficient: to some extent this offsets the cost of polling a flag.

The new uasyncio will be even more efficient and will have non-polling primitives :D
Peter Hinch
Index to my micropython libraries.

User avatar
tve
Posts: 216
Joined: Wed Jan 01, 2020 10:12 pm
Location: Santa Barbara, CA
Contact:

Re: How to run an mqtt-repl?

Post by tve » Mon Jan 20, 2020 12:18 am

I continued doing experiments and now have a proof of concept repl that uses Peter's mqtt_as and asyncio but runs in callbacks, just like the webrepl. It sets things up and then drops back into the repl, yet mqtt continues in the background and accepts commands. Right now it uses eval/exec to run the commands instead of feeding them into the repl stdin, that could be changed easily (dunno which I prefer, actually). The whole thing is still a proof of concept and I'm not sure what all I broke :-). Code in https://github.com/tve/mpy-mqttrepl

> The mechanism of select.poll is above my pay-grade; how does CPython do it? I suspect the name carries a clue.

Yes & no. The big difference is that in Unix all I/O goes through file descriptors, and so when a process calls `poll` it's ok for it to block 'cause there really isn't much else for it to do. Plus there are threads, which allow other processing to proceed.

User avatar
tve
Posts: 216
Joined: Wed Jan 01, 2020 10:12 pm
Location: Santa Barbara, CA
Contact:

Re: How to run an mqtt-repl?

Post by tve » Mon Jan 20, 2020 2:20 am

Brainstorming...
My current thinking is that the asyncio loop gets called 3 ways and each time runs until there is no work or no imminent work to do (timer expiring in less than a couple of milliseconds). The ways it gets called are:
- socket callback when there is unread input available on the socket
- timer callback to catch work that was somehow cut-off by the loop returning, to run periodic timers (like pinging the mqtt server), and to finish flushing any pending writes
- direct call by the application, which ideally is an asyncio app and does a run_forever thing

The socket callback is a bit tricky with TLS 'cause one can't get a callback from the decrypted stream, so it has to be on the unencrypted socket. This means it's possible that something gets stuck in the ssl lib until a timer callback comes around, dunno...

I coded the timer callback so it doesn't do anything if another callback has happened recently (<500ms right now). I'm thinking that instead or in addition I should be able to set a "incomplete business" flag in mqtt_as and that could gate the timer and also perhaps modulate its rate.

Overall, I believe the ideal situation is if an asyncio app is running and the socket/timer callbacks can be turned into no-ops, but if the main loop gets stuck or otherwise dies then the callbacks ensure that the system is not unreachable.

User avatar
tve
Posts: 216
Joined: Wed Jan 01, 2020 10:12 pm
Location: Santa Barbara, CA
Contact:

Re: How to run an mqtt-repl?

Post by tve » Fri Jan 24, 2020 6:30 pm

Quick status update:
- So far running the asyncio loop in callbacks seems to work great, but I haven't run it with a real app nor have I measured timings or such.
- I've made some additions to the mqtt_as library so message bodies can be processed as streams instead of just as byte buffers, this allows a call to `publish` to pass a stream and a subscription callback to receive a stream. This allows very large messages to be processed, such that a large file can be sent/received as one message, up to the limit of the broker (and whatever sits at the other end of the message exchange, e.g. paho-mqtt passes everything in in-memory buffers).
- I started to hack up ampy to create an mqtt version. So far I can eval and also put/get files. The code is all a mess, so I'm not pushing that unless someone is interested in actively helping (I need help in packaging all this stuff for easy upip and pip consumption).

Here's a transcript of putting mqtt_as.py (25258 bytes) and getting it again with verification, which shows my debug statements as diff (I need to log those to stderr...):

Code: Select all

/h/s/e/mqboard> env PYTHONPATH=. python3 -m mqboard --server core.voneicken.com --board mqtest put ../mpy-mqrepl/src/mqtt_as.py src/mqtt_as.py
core.voneicken.com None False None None esp32/mmp mqtest
MQTT topic prefix: esp32/mmp/mqtest
Connecting to core.voneicken.com:1883
Connected! Subscribing to esp32/mmp/mqtest/reply/out/_LVVwAlQ
Pub esp32/mmp/mqtest/cmd/put/_LVVwAlQ/src/mqtt_as.py
Received reply on topic 'esp32/mmp/mqtest/reply/out/_LVVwAlQ' with QoS 0
OK
/h/s/e/mqboard> env PYTHONPATH=. python3 -m mqboard --server core.voneicken.com --board mqtest get src/mqtt_as.py | diff ../mpy-mqrepl/src/mqtt_as.py -
0a1,6
> core.voneicken.com None False None None esp32/mmp mqtest
> MQTT topic prefix: esp32/mmp/mqtest
> Connecting to core.voneicken.com:1883
> Connected! Subscribing to esp32/mmp/mqtest/reply/out/mWh4EPp1
> Pub esp32/mmp/mqtest/cmd/get/mWh4EPp1/src/mqtt_as.py
> Received reply on topic 'esp32/mmp/mqtest/reply/out/mWh4EPp1' with QoS 0
706a713
>
More to follow...

User avatar
pythoncoder
Posts: 5956
Joined: Fri Jul 18, 2014 8:01 am
Location: UK
Contact:

Re: How to run an mqtt-repl?

Post by pythoncoder » Sat Jan 25, 2020 9:41 am

tve wrote:
Fri Jan 24, 2020 6:30 pm
...
- I've made some additions to the mqtt_as library so message bodies can be processed as streams instead of just as byte buffers, this allows a call to `publish` to pass a stream and a subscription callback to receive a stream. This allows very large messages to be processed, such that a large file can be sent/received as one message, up to the limit of the broker (and whatever sits at the other end of the message exchange, e.g. paho-mqtt passes everything in in-memory buffers).
...
Interesting. The anticipated use case for mqtt_as was for microcontrollers sending and receiving short messages.

In my solutions I aim for resilience over an unreliable physical medium (radio). In poor propagation conditions resilience becomes less achievable as message length increases. This is because the probability of a mid-message dropout increases with length. If qos==0 the message will be lost. If qos==1 it will be retransmitted, potentially ad nauseam if conditions are sufficiently poor.

This may not be an issue in your application but in general I recommend short messages.
Peter Hinch
Index to my micropython libraries.

User avatar
tve
Posts: 216
Joined: Wed Jan 01, 2020 10:12 pm
Location: Santa Barbara, CA
Contact:

Re: How to run an mqtt-repl?

Post by tve » Sat Jan 25, 2020 4:19 pm

I agree in general about the short message size, however, MQTT in this case is over TCP, which is a reliable byte stream, with a max packet/segment size, and with selective retransmission (IIRC LwIP does support that). TCP also has a max retransmission count, so it will eventually give up.
So in a use-case where you want to update a 100kB python source file we agree you will have to transmit 100kB, right ;-)? So you would transmit that as 100 1KB messages? or 25x 4KB? You would have to handle out-of-order arrival and reassembly, which is quite a PITA. You will need an ACK scheme of some form, right? Presumably selective retransmission? Exponential rexmit backoff? How much python code would this take and how much of TCP would you be reimplementing? What do you really expect to gain (if anything) over letting TCP do the work?
I went down that thought process bit and arrived at the conclusion that it would be far less work and less code and more reliable to enhance mqtt_as so I could push large messages through. I haven't used large messages with MQTT before, so I may well hit a road block, and then I can figure out whether I can come up with a solution to the specific roadblock...

kevinkk525
Posts: 969
Joined: Sat Feb 03, 2018 7:02 pm

Re: How to run an mqtt-repl?

Post by kevinkk525 » Sat Jan 25, 2020 5:48 pm

If you send 100 1kb messages using mqtt_as they will arrive in order and reliable with qos==1. You don't send them in parallel and you are using TCP. Qos1 already has ACK in it and retransmission will occur if the package doesn't get an ACK --> you can send 100 1kb messages and they will safely arrive in order
Kevin Köck
Micropython Smarthome Firmware (with Home-Assistant integration): https://github.com/kevinkk525/pysmartnode

User avatar
tve
Posts: 216
Joined: Wed Jan 01, 2020 10:12 pm
Location: Santa Barbara, CA
Contact:

Re: How to run an mqtt-repl?

Post by tve » Sat Jan 25, 2020 9:54 pm

> If you send 100 1kb messages using mqtt_as they will arrive in order and reliable with qos==1.

Hmm, good point, I'll have to go back to the spec... (Do you have a section number for this btw?) Is the ordering guarantee true in the face of retransmissions, i.e. disconnect/reconnect & resuming the session? It seems safe to assume that if message N on a TCP connection is lost, then all messages >N will be lost too. So as long as the broker is careful and retransmits unack'ed messages in the right order on the new connection the ordering guarantee should hold. It's mostly a matter of ensuring that the rexmit timeouts stay in-order.

But even with the in-order assumption the code increase on the python end seems quite significant. Right now my subscription callback (minus the topic matching part) for a file put is:

Code: Select all

async def do_put(fname, read_msg, msg_len):
    with open(fname, 'wb') as fd:
        buf = await read_msg(1024)
        while len(buf) > 0:
            fd.write(buf)
            buf = await read_msg(1024)
    return "OK"
Where `fname` is the filename portion of the topic, read_msg is the stream handle to read the payload from, and msg_len is the total payload length. If this came in in 1024-byte messages you'd have to explicitly manage the state between messages and have some explicit timeout on the whole thing. Maybe not horrible code expansion, but certainly more that the above.
The get file handler is even less code:

Code: Select all

async def do_get(fname, read_msg, msg_len):
    return open(fname, 'rb')
The return value (resp) goes into a `mqclient.publish(rtopic, resp, 0, 1)` in the subscription dispatcher.

I'm liking the simplicity of it all right now and am taking the attitude of "show me the problem" before throwing more complexity at it. At least then I'll also have a concrete problem to solve. I can imagine that users using unconfigurable MQTT brokers (e.g. public services) will run into issues pushing a MP firmware update through in one message :-). Overall I like to avoid premature optimization...

Thanks for the discussion!

Post Reply