uasyncio - How detect the end task in another task.

Discussion about programs, libraries and tools that work with MicroPython. Mostly these are provided by a third party.
Target audience: All users and developers of MicroPython.
Primesty
Posts: 17
Joined: Sun Jun 28, 2020 11:06 pm

Re: uasyncio - How detect the end task in another task.

Post by Primesty » Tue Jul 28, 2020 2:07 pm

Will do and re-write according to the tutorial!

Primesty
Posts: 17
Joined: Sun Jun 28, 2020 11:06 pm

Re: uasyncio - How detect the end task in another task.

Post by Primesty » Fri Jul 31, 2020 7:51 pm

Hi Peter!

I was able to re-write the code according to your tutorial so that it's not legacy anymore:

Code: Select all

from machine import Pin, Timer, I2C
import dht
import time
import uasyncio
import gc

from my_app import temp_reader

led = Pin(2, Pin.OUT)

def set_global_exception():
    def handle_exception(loop, context):
        import sys
        sys.print_exception(context["exception"])
        sys.exit()
    loop = uasyncio.get_event_loop()
    loop.set_exception_handler(handle_exception)

async def main():
    set_global_exception()  # Debug aid
    my_class = temp_reader(led)  # Constructor might create tasks
    uasyncio.create_task(my_class.blink())  # Or you might do this
    await my_class.cancel()  # Non-terminating method
try:
    uasyncio.run(main())
finally:
    uasyncio.new_event_loop()  # Clear retained state


main()

gc.collect()
gc.threshold(gc.mem_free() // 4 + gc.mem_alloc())
The led-blink example still works and the class with the blink method and the cancel method is in a file *my_app.py* .

Should I now first implement a temp_reader method or work on the encoder method? I assume both will then have to be put into **uasyncio.create_task()** above. Should the temperature measurement OR the encoder be non-terminating methods - for the last line above, e.g.

Code: Select all

await my_class.temp_measure() # Non-terminating method
? Please let me know what you think. Happy weekend!

User avatar
pythoncoder
Posts: 4351
Joined: Fri Jul 18, 2014 8:01 am
Location: UK
Contact:

Re: uasyncio - How detect the end task in another task.

Post by pythoncoder » Sat Aug 01, 2020 7:30 am

A typical firmware application will have several non-terminating tasks. Your main() task starts all but one by issuing .create_task() and then awaits the last task: this ensures that the scheduler itself runs forever.

In many cases it doesn't matter which task you await - it's only an issue if the application logic requires the tasks to be started in a defined order.
Peter Hinch

Primesty
Posts: 17
Joined: Sun Jun 28, 2020 11:06 pm

Re: uasyncio - How detect the end task in another task.

Post by Primesty » Sat Aug 01, 2020 3:04 pm

Gotcha! Thanks for the input - once I have my drivers up and running again - I'll continue building the coros.

Primesty
Posts: 17
Joined: Sun Jun 28, 2020 11:06 pm

Re: uasyncio - How detect the end task in another task.

Post by Primesty » Fri Aug 07, 2020 8:16 pm

Hi Pete,

I got my system up and running again :)

I've successfully implemented code that blinks the led *and* reads the temperature at the same time according to your specs and tutorial.

One question popped up.

When I run this:

Code: Select all

from machine import Pin, Timer, I2C
import dht
import time
import uasyncio
import gc

led = Pin(2, Pin.OUT)

sensor = dht.DHT22(Pin(14))

class tempReader():
  
  sleep = 1 #static instance variable for testing
  
  def __init__(self, led, sensor):
    self.led = led
    self.sensor = sensor
    
  async def blink(self):
    while True:
        self.led.on()
        await uasyncio.sleep(tempReader.sleep)
        self.led.off()
        await uasyncio.sleep(tempReader.sleep)
  
  async def temp(self):
    while True:
      self.sensor.measure()
      temp = self.sensor.temperature()
      print('Temperature: %3.1f C' %temp)
      await uasyncio.sleep(2)
        
  async def cancel(self):
    await uasyncio.sleep(10) #stops the loop after 10 seconds


def set_global_exception():
    def handle_exception(loop, context):
        import sys
        sys.print_exception(context["exception"])
        sys.exit()
    loop = uasyncio.get_event_loop()
    loop.set_exception_handler(handle_exception)

async def main():
    set_global_exception()  # Debug aid
    my_class = tempReader(led, sensor)  # Constructor might create tasks
    uasyncio.create_task(my_class.blink()) # Or you might do this
    uasyncio.create_task(my_class.temp())
    await my_class.cancel() # Non-terminating method
try:
    uasyncio.run(main())
finally:
    uasyncio.new_event_loop()  # Clear retained state


main()

gc.collect()
gc.threshold(gc.mem_free() // 4 + gc.mem_alloc())
It works without a hitch. However, when I have the class in another file, like so

main.py

Code: Select all

from machine import Pin, Timer, I2C
import dht
import time
import uasyncio
import gc

from my_app import tempReader

led = Pin(2, Pin.OUT)

sensor = dht.DHT22(Pin(14))


def set_global_exception():
    def handle_exception(loop, context):
        import sys
        sys.print_exception(context["exception"])
        sys.exit()
    loop = uasyncio.get_event_loop()
    loop.set_exception_handler(handle_exception)

async def main():
    set_global_exception()  # Debug aid
    my_class = tempReader(led, sensor)  # Constructor might create tasks
    uasyncio.create_task(my_class.blink()) # Or you might do this
    uasyncio.create_task(my_class.temp())
    await my_class.cancel() # Non-terminating method
try:
    uasyncio.run(main())
finally:
    uasyncio.new_event_loop()  # Clear retained state


main()

gc.collect()
gc.threshold(gc.mem_free() // 4 + gc.mem_alloc())
my_app.py

Code: Select all

from machine import Pin, Timer, I2C
import dht
import time
import uasyncio
import gc


class tempReader():
  
  sleep = 1 #static instance variable for testing
  
  def __init__(self, led, sensor):
    self.led = led
    self.sensor = sensor
    
  async def blink(self):
    while True:
        self.led.on()
        await uasyncio.sleep(tempReader.sleep)
        self.led.off()
        await uasyncio.sleep(tempReader.sleep)
  
  async def temp(self):
    while True:
      self.sensor.measure()
      temp = self.sensor.temperature()
      print('Temperature: %3.1f C' %temp)
      await uasyncio.sleep(2)
        
  async def cancel(self):
    await uasyncio.sleep(10) #stops the loop after 10 seconds
I get this error
TypeError: function takes 2 positional arguments but 3 were given
It seems that the second class method is not imported correctly to main.py. I'm under the impression that the sensor object has to go into the class instantiation process, right?

Do you know what might be causing this?

Have a great weekend!

Matthias

User avatar
pythoncoder
Posts: 4351
Joined: Fri Jul 18, 2014 8:01 am
Location: UK
Contact:

Re: uasyncio - How detect the end task in another task.

Post by pythoncoder » Sat Aug 08, 2020 12:34 pm

I can't spot the cause. It would help if you told us the line number where the error occurs.

It's worth noting that the following code will only be executed after you interrupt the program or after it fails with an exception:

Code: Select all

main()

gc.collect()
gc.threshold(gc.mem_free() // 4 + gc.mem_alloc())
This is because execution passes to the scheduler with

Code: Select all

uasyncio.run(main())
Peter Hinch

Primesty
Posts: 17
Joined: Sun Jun 28, 2020 11:06 pm

Re: uasyncio - How detect the end task in another task.

Post by Primesty » Sat Aug 08, 2020 12:54 pm

Gotcha, I removed the dangling main() function call

Also, the above error has resolved itself and now it's running smoothly, which means I can now move on to handling the rotary encoder or displaying the temperature.

Post Reply