Equivalent to asyncio.gather()

General discussions and questions abound development of code with MicroPython that is not hardware specific.
Target audience: MicroPython Users.
cefn
Posts: 230
Joined: Tue Aug 09, 2016 10:58 am

Equivalent to asyncio.gather()

Post by cefn » Wed Jan 03, 2018 8:00 am

I can't identify how to wait on the results of multiple coroutines (and handle the first exception raised by any of them). This is available through asyncio.gather in Cpython. How do I do it in Micropython?

I have some asynchronous code which looks after e.g. polling network connectivity, sending heartbeats, sending data over serial.

It is very natural to author code so that each coroutine function is defined with finally statements for tidyup of the configuration it makes, and I would like to be able to handle any unmanaged exception thrown by any of the coroutines by 'cancelling' all the previously scheduled coroutines, triggering their finally clauses.

Ideally I should then be in a position to simply rerun all the coroutine functions again and reschedule the coroutines to attempt to resume the application.

A good candidate strategy seems to be through calling a throw(), and having each coroutine's 'continuation' halted by the propagation of an exception from whatever await statement it was currently at.

However, without being able to wait on the first exception thrown by _any_ of the coroutines, I can't figure out how to achieve this.

I put together a gist to explore how this should best be done in Cpython, which looks like...

Code: Select all

    try:
        tasks = [loop.create_task(coro) for coro in coros]
        gatheredTask = asyncio.gather(*tasks)
        loop.run_until_complete(gatheredTask)
    except asyncio.CancelledError:
        try:
            gatheredTask.cancel()
            loop.run_until_complete(gatheredTask)
        finally:
            print("run() Finished")
See https://gist.github.com/cefn/ead11ab8a7 ... 18fb299831 for fully working example

What's the equivalent means of monitoring multiple coroutines for results (and the first thrown exception) in Micropython?

User avatar
pythoncoder
Posts: 5956
Joined: Fri Jul 18, 2014 8:01 am
Location: UK
Contact:

Re: Equivalent to asyncio.gather()

Post by pythoncoder » Wed Jan 03, 2018 8:39 am

Bear in mind that uasyncio is a "micro" version designed for high performance and zero RAM allocation. It therefore supports only a subset of the asyncio functionality. Throwing exceptions from one coro to another is supported via pend_throw which allows an exception to be thrown to a coroutine/generator but its use isn't recommended for user programs. I would advise a different approach.

I suggest you look a the synchronisation primitives here, together with the (unofficial) support for task cancellation. There is provision for the common situation where you need to cancel a set of tasks and wait until they have actually terminated.
Peter Hinch
Index to my micropython libraries.

cefn
Posts: 230
Joined: Tue Aug 09, 2016 10:58 am

Re: Equivalent to asyncio.gather()

Post by cefn » Thu Jan 04, 2018 12:40 am

Thanks for the guidance. It's truly amazing that there is any support at all given the headroom available in the target environments, yes! This is bound to come with some compromises.

I was ready to explore various ways to do cancellation but thought I was stuck a step earlier than that. I had understood I couldn't detect the circumstances to attempt cancellation, because I didn't have the means of monitoring multiple co-routines for exceptions they raise. I was wrong.

I had encountered a case where loop.run_forever() seemed to swallow the exceptions raised by its coroutines, and I understood therefore that loop.run_until_complete(coro) was the only way to monitor the results (including exceptions) from any coro, but therefore I was limited to monitoring exceptions from a single coro.

For this reason I was puzzling how to monitor exceptions from more than one coro, (as I had achieved with asyncio.gather() with the Task-oriented CPython API, to turn multiple tasks into a single task).

However, it seems like the exception-swallowing I encountered was to do with an except+pass clause in a coro which was too broad. In fact, monitoring for all exceptions from all coros is the default behaviour of both loop.run_forever() and loop.run_until_complete() but I hadn't figured this out.

So after more experimentation I have come to believe two important new facts which make sense of everything...
  • loop.run_forever() actually raises exceptions encountered when evaluating scheduled continuations within any of the coroutines previously scheduled with loop.create_task()
  • loop.run_until_complete(coro) also raises exceptions arising in other coros than coro, but just terminates early when coro has a result (or exception).
That means I should be in a position to handle exceptions from all coros, and then trigger a cancellation.

As regards cancellation itself, the following example seems effective on ESP8266. It successfully restarts an asynchronous Russian roulette tournament when the first player wins...

Code: Select all

import uasyncio as asyncio
from uos import urandom
def random_uint8():
    return int(urandom(1)[0])


class CancelledException(BaseException):
    pass


class BangException(BaseException):
    pass


async def russian_roulette(bulletPos = None):
    if bulletPos is None:
        bulletPos = random_uint8() % 6
    cylinderPos = 0
    while cylinderPos < 6:
        await asyncio.sleep(random_uint8() / 100)
        print("Bullet {} Cylinder {}. Pulling trigger...".format(bulletPos, cylinderPos))
        if cylinderPos == bulletPos:
            print("...Bang!")
            raise BangException
        else:
            print("...click")
        cylinderPos += 1

numPlayers = 10

loop = asyncio.get_event_loop()

def run():
    coros = []
    for count in range(numPlayers):
        coro = russian_roulette()
        coros.append(coro)
        loop.create_task(coro)

    # keep pulling trigger until a coro bangs
    try:
        loop.run_forever()
    except BangException as e:
        pass

    # dispose of all coros (banged or not)
    for coro in coros:
        try:
            coro.throw(CancelledException)
        except StopIteration: # raised by already stopped coro?
            pass
        except CancelledException: # raised by all others?
            pass

    # help loop tidy up the exceptions
    for coro in coros:
        loop.run_until_complete(coro)

while True:
    print("Players ready")
    run()
    print("Game Over: A player has died")

User avatar
pythoncoder
Posts: 5956
Joined: Fri Jul 18, 2014 8:01 am
Location: UK
Contact:

Re: Equivalent to asyncio.gather()

Post by pythoncoder » Thu Jan 04, 2018 12:05 pm

An interesting approach but as a general solution to task cancellation it has problems, notably the need to stop and start the scheduler. The approach using my asyn library avoids this. The library aims to be a general solution to task cancellation. The code below is shorter and in my view it is simpler and cleaner.

Code: Select all

import uasyncio as asyncio
import asyn
from uos import urandom

def random_uint8():
    return int(urandom(1)[0])

@asyn.cancellable
async def russian_roulette(_, event, bulletPos = None):
    if bulletPos is None:
        bulletPos = random_uint8() % 6
    cylinderPos = 0
    while cylinderPos < 6:
        await asyn.sleep(random_uint8() / 100)
        print("Bullet {} Cylinder {}. Pulling trigger...".format(bulletPos, cylinderPos))
        if cylinderPos == bulletPos:
            print("...Bang!")
            event.set()
            return
        else:
            print("...click")
        cylinderPos += 1

async def run(loop):
    numPlayers = 10
    while True:
        print("Players ready")
        bang = asyn.Event()
        for count in range(numPlayers):
            loop.create_task(asyn.Cancellable(russian_roulette, bang)())
        await bang
        await asyn.Cancellable.cancel_all()
        print("Game Over: A player has died")

loop = asyncio.get_event_loop()
loop.run_until_complete(run(loop))
In general I think it is best to handle exceptions within the task in which they occur and to perform inter-task communication by other means.
Peter Hinch
Index to my micropython libraries.

User avatar
pythoncoder
Posts: 5956
Joined: Fri Jul 18, 2014 8:01 am
Location: UK
Contact:

Re: Equivalent to asyncio.gather()

Post by pythoncoder » Fri Jan 05, 2018 8:41 am

On further thought throwing exceptions to coroutines is highly undesirable. This is because throw causes the coro to run even though it may not be scheduled for execution. This is breaking uasyncio! For example if the coro handles the exception and terminates, it remains on uasyncio's queue and will subsequently be rescheduled.

In general it is safest to use uasyncio as if it were CPython's asyncio, and to pretend that uasyncio coroutines are not generators. Exceptions occurring in coros should be trapped either in that coro, or in one higher up which is awaiting it.

Task cancellation needs to be handled in a controlled fashion as per my library. The cancellation support does (of necessity) break the above rule, but it does so with considerable care and more testing.
Peter Hinch
Index to my micropython libraries.

cefn
Posts: 230
Joined: Tue Aug 09, 2016 10:58 am

Re: Equivalent to asyncio.gather()

Post by cefn » Fri Jan 05, 2018 11:21 am

Yes, I would expect that it remains in the uasyncio loop's queue, and is removed on the next call to loop.run_*() since its new status is complete. It's for that reason there's a chunk of coro completion logic in the Roulette example I shared, after throw has been called on all the coros...

Code: Select all

# help loop tidy up the exceptions
for coro in coros:
    loop.run_until_complete(coro)
...which was intended to explicitly check that everything was out of the queue.

Not sure in what way uasyncio is broken by continuations being completed outside of its run_*() calls. However, it's certainly a worry if some bookkeeping has been messed up by my approach.

The roulette example seemed run at high speed without hitting any queue length or memory limits, even on an ESP8266. Incidentally I first explored the approach in Python3.6, but it would only work using the task support in Cpython, since the line above implicitly calls await twice on a coro (permitted in Micropython, but a runtime error in Python3.6 if I understand).

Would your concern arise in some other scenario that I'm not getting?

User avatar
pythoncoder
Posts: 5956
Joined: Fri Jul 18, 2014 8:01 am
Location: UK
Contact:

Re: Equivalent to asyncio.gather()

Post by pythoncoder » Fri Jan 05, 2018 12:13 pm

I can't claim to have thought through all possible implications of issuing throw to a coro which is on the uasyncio queue. But it is a misuse of uasyncio which you do at your own risk ;) I can't actually see any reason to attempt it. As far as I can see there are better ways to achieve task cancellation and inter-task communication.
Peter Hinch
Index to my micropython libraries.

cefn
Posts: 230
Joined: Tue Aug 09, 2016 10:58 am

Re: Equivalent to asyncio.gather()

Post by cefn » Tue Jan 09, 2018 10:15 am

Although there's a lot of valid discussion above, especially if I shouldn't throw exceptions to terminate the loop, I was still pretty stuck on the original issue in the first post of this thread. How can I await the completion of more than one coro?

Wondered if this was impossible given the limitations of Micropython uasyncio or just that I haven't figured out which combination of primitive uasyncio operations could achieve it?

I recognise that Micropython is choosing not to adopt tasks for sensible resourcing reasons. I also take on board that there are other ways than return to send information around between coros as demonstrated by the asyn library, but I am a bit flummoxed that multiple coroutines adopting a simple return to indicate their completion (and the result of the coro) can't themselves be simultaneously monitored for completion by uasyncio. Only being able to monitor a single one at a time seems to defeat the purpose of asyncio a little bit.

Currently, you can only await completion of a single target coroutine through run_until_complete, and there is no core method which waits for the first coroutine to complete from a list of coroutines as demonstrated in CPython's task-oriented implementation of asyncio.gather?

In the absence of any core methods, (or having missed the one I should use), I am trying the use of a StopLoop(0) return in a 'wrapper' coroutine, but repeated for multiple coroutines. This mirrors the implementation of loop.run_until_complete() in uasyncio...

Code: Select all

    def run_until_complete(self, coro):
        def _run_and_stop():
            yield from coro
            yield StopLoop(0)
        self.call_soon(_run_and_stop())
        self.run_forever()
...from https://github.com/micropython/micropyt ... io/core.py

Here's an apparently working implementation (based on the two-day old commit of uasyncio/core.py with the cancel operation defined (thanks @pfalcon). Probably works on esp8266 but only tested on unix.

Code: Select all

import uasyncio as asyncio
#platform = "esp8266"
platform = "unix"
if platform == "esp8266":
	from uos import urandom
	def random_uint8():
		return int(urandom(1)[0])
elif platform == "unix":
	import urandom
	def random_uint8():
		return int(urandom.getrandbits(8))	

async def russian_roulette(bulletPos = None):
    if bulletPos is None:
        bulletPos = random_uint8() % 6
    cylinderPos = 0
    while cylinderPos < 6:
        await asyncio.sleep(random_uint8() / 100)
        print("Bullet {} Cylinder {}. Pulling trigger...".format(bulletPos, cylinderPos))
        if cylinderPos == bulletPos:
            print("...Bang!")
            return cylinderPos
        else:
            print("...click")
        cylinderPos += 1

numPlayers = 10

class MultiEventLoop(asyncio.EventLoop):
	def run_until_any(self, coros):
		stopcoros = []
		done = []
		pending = list(coros)
		for coro in coros:
			def stopper(coro):
				yield from coro
				pending.remove(coro)
				done.append(coro)
				yield asyncio.StopLoop(0)
			stopcoro = stopper(coro)
			stopcoros.append(stopcoro)
			self.call_soon(stopcoro)
		self.run_forever()
		for stopcoro in stopcoros:
			asyncio.cancel(stopcoro)
		return done, pending

loop = MultiEventLoop()

def run():
    coros = []
    for count in range(numPlayers):
        coro = russian_roulette()
        coros.append(coro)
        loop.create_task(coro)

	while len(coros) > 0:
		done, pending = loop.run_until_any(coros)
		for coro in done:
			coros.remove(coro)

print("Players ready")
run()
print("Game Over: All players have died")

However, this is DEFINITELY messing with uasyncio internals, so would much prefer a less hacky way or a core method to support this.

User avatar
pythoncoder
Posts: 5956
Joined: Fri Jul 18, 2014 8:01 am
Location: UK
Contact:

Re: Equivalent to asyncio.gather()

Post by pythoncoder » Tue Jan 09, 2018 10:23 am

I know I keep referring you to my library but you'll find an answer there.
Peter Hinch
Index to my micropython libraries.

User avatar
pythoncoder
Posts: 5956
Joined: Fri Jul 18, 2014 8:01 am
Location: UK
Contact:

Re: Equivalent to asyncio.gather()

Post by pythoncoder » Sat Jan 20, 2018 10:00 am

The above library now includes a 'micro' implementation of asyncio.gather().
Peter Hinch
Index to my micropython libraries.

Post Reply