uasyncio - How detect the end task in another task.

Discussion about programs, libraries and tools that work with MicroPython. Mostly these are provided by a third party.
Target audience: All users and developers of MicroPython.
Primesty
Posts: 49
Joined: Sun Jun 28, 2020 11:06 pm

Re: uasyncio - How detect the end task in another task.

Post by Primesty » Tue Jul 28, 2020 2:07 pm

Will do and re-write according to the tutorial!

Primesty
Posts: 49
Joined: Sun Jun 28, 2020 11:06 pm

Re: uasyncio - How detect the end task in another task.

Post by Primesty » Fri Jul 31, 2020 7:51 pm

Hi Peter!

I was able to re-write the code according to your tutorial so that it's not legacy anymore:

Code: Select all

from machine import Pin, Timer, I2C
import dht
import time
import uasyncio
import gc

from my_app import temp_reader

led = Pin(2, Pin.OUT)

def set_global_exception():
    def handle_exception(loop, context):
        import sys
        sys.print_exception(context["exception"])
        sys.exit()
    loop = uasyncio.get_event_loop()
    loop.set_exception_handler(handle_exception)

async def main():
    set_global_exception()  # Debug aid
    my_class = temp_reader(led)  # Constructor might create tasks
    uasyncio.create_task(my_class.blink())  # Or you might do this
    await my_class.cancel()  # Non-terminating method
try:
    uasyncio.run(main())
finally:
    uasyncio.new_event_loop()  # Clear retained state


main()

gc.collect()
gc.threshold(gc.mem_free() // 4 + gc.mem_alloc())
The led-blink example still works and the class with the blink method and the cancel method is in a file *my_app.py* .

Should I now first implement a temp_reader method or work on the encoder method? I assume both will then have to be put into **uasyncio.create_task()** above. Should the temperature measurement OR the encoder be non-terminating methods - for the last line above, e.g.

Code: Select all

await my_class.temp_measure() # Non-terminating method
? Please let me know what you think. Happy weekend!

User avatar
pythoncoder
Posts: 5956
Joined: Fri Jul 18, 2014 8:01 am
Location: UK
Contact:

Re: uasyncio - How detect the end task in another task.

Post by pythoncoder » Sat Aug 01, 2020 7:30 am

A typical firmware application will have several non-terminating tasks. Your main() task starts all but one by issuing .create_task() and then awaits the last task: this ensures that the scheduler itself runs forever.

In many cases it doesn't matter which task you await - it's only an issue if the application logic requires the tasks to be started in a defined order.
Peter Hinch
Index to my micropython libraries.

Primesty
Posts: 49
Joined: Sun Jun 28, 2020 11:06 pm

Re: uasyncio - How detect the end task in another task.

Post by Primesty » Sat Aug 01, 2020 3:04 pm

Gotcha! Thanks for the input - once I have my drivers up and running again - I'll continue building the coros.

Primesty
Posts: 49
Joined: Sun Jun 28, 2020 11:06 pm

Re: uasyncio - How detect the end task in another task.

Post by Primesty » Fri Aug 07, 2020 8:16 pm

Hi Pete,

I got my system up and running again :)

I've successfully implemented code that blinks the led *and* reads the temperature at the same time according to your specs and tutorial.

One question popped up.

When I run this:

Code: Select all

from machine import Pin, Timer, I2C
import dht
import time
import uasyncio
import gc

led = Pin(2, Pin.OUT)

sensor = dht.DHT22(Pin(14))

class tempReader():
  
  sleep = 1 #static instance variable for testing
  
  def __init__(self, led, sensor):
    self.led = led
    self.sensor = sensor
    
  async def blink(self):
    while True:
        self.led.on()
        await uasyncio.sleep(tempReader.sleep)
        self.led.off()
        await uasyncio.sleep(tempReader.sleep)
  
  async def temp(self):
    while True:
      self.sensor.measure()
      temp = self.sensor.temperature()
      print('Temperature: %3.1f C' %temp)
      await uasyncio.sleep(2)
        
  async def cancel(self):
    await uasyncio.sleep(10) #stops the loop after 10 seconds


def set_global_exception():
    def handle_exception(loop, context):
        import sys
        sys.print_exception(context["exception"])
        sys.exit()
    loop = uasyncio.get_event_loop()
    loop.set_exception_handler(handle_exception)

async def main():
    set_global_exception()  # Debug aid
    my_class = tempReader(led, sensor)  # Constructor might create tasks
    uasyncio.create_task(my_class.blink()) # Or you might do this
    uasyncio.create_task(my_class.temp())
    await my_class.cancel() # Non-terminating method
try:
    uasyncio.run(main())
finally:
    uasyncio.new_event_loop()  # Clear retained state


main()

gc.collect()
gc.threshold(gc.mem_free() // 4 + gc.mem_alloc())
It works without a hitch. However, when I have the class in another file, like so

main.py

Code: Select all

from machine import Pin, Timer, I2C
import dht
import time
import uasyncio
import gc

from my_app import tempReader

led = Pin(2, Pin.OUT)

sensor = dht.DHT22(Pin(14))


def set_global_exception():
    def handle_exception(loop, context):
        import sys
        sys.print_exception(context["exception"])
        sys.exit()
    loop = uasyncio.get_event_loop()
    loop.set_exception_handler(handle_exception)

async def main():
    set_global_exception()  # Debug aid
    my_class = tempReader(led, sensor)  # Constructor might create tasks
    uasyncio.create_task(my_class.blink()) # Or you might do this
    uasyncio.create_task(my_class.temp())
    await my_class.cancel() # Non-terminating method
try:
    uasyncio.run(main())
finally:
    uasyncio.new_event_loop()  # Clear retained state


main()

gc.collect()
gc.threshold(gc.mem_free() // 4 + gc.mem_alloc())
my_app.py

Code: Select all

from machine import Pin, Timer, I2C
import dht
import time
import uasyncio
import gc


class tempReader():
  
  sleep = 1 #static instance variable for testing
  
  def __init__(self, led, sensor):
    self.led = led
    self.sensor = sensor
    
  async def blink(self):
    while True:
        self.led.on()
        await uasyncio.sleep(tempReader.sleep)
        self.led.off()
        await uasyncio.sleep(tempReader.sleep)
  
  async def temp(self):
    while True:
      self.sensor.measure()
      temp = self.sensor.temperature()
      print('Temperature: %3.1f C' %temp)
      await uasyncio.sleep(2)
        
  async def cancel(self):
    await uasyncio.sleep(10) #stops the loop after 10 seconds
I get this error
TypeError: function takes 2 positional arguments but 3 were given
It seems that the second class method is not imported correctly to main.py. I'm under the impression that the sensor object has to go into the class instantiation process, right?

Do you know what might be causing this?

Have a great weekend!

Matthias

User avatar
pythoncoder
Posts: 5956
Joined: Fri Jul 18, 2014 8:01 am
Location: UK
Contact:

Re: uasyncio - How detect the end task in another task.

Post by pythoncoder » Sat Aug 08, 2020 12:34 pm

I can't spot the cause. It would help if you told us the line number where the error occurs.

It's worth noting that the following code will only be executed after you interrupt the program or after it fails with an exception:

Code: Select all

main()

gc.collect()
gc.threshold(gc.mem_free() // 4 + gc.mem_alloc())
This is because execution passes to the scheduler with

Code: Select all

uasyncio.run(main())
Peter Hinch
Index to my micropython libraries.

Primesty
Posts: 49
Joined: Sun Jun 28, 2020 11:06 pm

Re: uasyncio - How detect the end task in another task.

Post by Primesty » Sat Aug 08, 2020 12:54 pm

Gotcha, I removed the dangling main() function call

Also, the above error has resolved itself and now it's running smoothly, which means I can now move on to handling the rotary encoder or displaying the temperature.

Primesty
Posts: 49
Joined: Sun Jun 28, 2020 11:06 pm

Re: uasyncio - How detect the end task in another task.

Post by Primesty » Sat Aug 15, 2020 4:52 pm

Hey Pete,

I tried installing a daily build on an ESP8266 today to get the latest version of uasyncio on there, but that led to urequests not being available anymore, which only seems to be available in stable versions. Are you aware of any ESP8266 builds, which have all standard libraries including urequests AND the latest version of uasyncio?

Thanks!

User avatar
pythoncoder
Posts: 5956
Joined: Fri Jul 18, 2014 8:01 am
Location: UK
Contact:

Re: uasyncio - How detect the end task in another task.

Post by pythoncoder » Sun Aug 16, 2020 3:25 pm

You'll need to install it with upip. I'm not familiar with urequests so I suggest you read this to figure out which version you need.
Peter Hinch
Index to my micropython libraries.

Primesty
Posts: 49
Joined: Sun Jun 28, 2020 11:06 pm

Re: uasyncio - How detect the end task in another task.

Post by Primesty » Fri Aug 21, 2020 6:30 pm

Hey Pete,

Thanks for the ESP8266 tip. I've been working some more on my temp-reader project and am testing your aswitch.py functions to be able to give the app potentially more functionality.

I've also written a class method, which now shows the current temperature reliably on the OLED. Then, I went into button testing with three external LEDs.

Code: Select all


from machine import Pin, Timer, I2C
import dht
import time
import uasyncio
import gc
import ssd1306
from aswitch import Switch, Pushbutton

from my_app import tempReader

led = Pin(2, Pin.OUT)

led2 = Pin(19, Pin.OUT)

led3 = Pin(32, Pin.OUT)

led4 = Pin(33, Pin.OUT)

sw = Pin(18, Pin.IN, Pin.PULL_UP)

sensor = dht.DHT22(Pin(14))

i2c = I2C(-1, scl=Pin(22), sda=Pin(21))

oled_width = 128
oled_height = 64

oled = ssd1306.SSD1306_I2C(oled_width, oled_height, i2c)
             
             
async def pulse(led, ms):
    for i in range(5):
      led.on()
      await uasyncio.sleep_ms(ms)
      led.off()
      await uasyncio.sleep_ms(ms)
    
async def blink2(led):
    for i in range(5):
      led.on()
      await uasyncio.sleep_ms(100)
      led.off()
      await uasyncio.sleep_ms(100)
      
async def blink3(led):
    for i in range(10):
      led.on()
      await uasyncio.sleep_ms(200)
      led.off()
      await uasyncio.sleep_ms(200)
      
async def nothing():
  await uasyncio.sleep_ms(2)



def set_global_exception():
    def handle_exception(loop, context):
        import sys
        sys.print_exception(context["exception"])
        sys.exit()
    loop = uasyncio.get_event_loop()
    loop.set_exception_handler(handle_exception)

async def main():
    set_global_exception()  # Debug aid
    my_class = tempReader(led, sensor, oled)  # Constructor might create tasks
    switch = Pushbutton(sw, suppress = True)
    switch.press_func(pulse, (led3, 1000))
    switch.release_func(nothing, ()) #check to see if there is a long or double func
    switch.double_func(blink2, (led2,))
    switch.long_func(blink3, (led4,))
    uasyncio.create_task(my_class.blink()) # Or you might do this
    uasyncio.create_task(my_class.temp())
    uasyncio.create_task(my_class.show())
    await my_class.cancel() # Non-terminating method
try:
    uasyncio.run(main())
finally:
    uasyncio.new_event_loop()  # Clear retained state


gc.collect()
gc.threshold(gc.mem_free() // 4 + gc.mem_alloc())
All button functions work, the double click and long click trigger the right functions. However, even though I have set suppress = True in the Pushbutton class and have a release function. The press_func still runs and runs the pulse function. I'm sure I'm missing something that prevents that from really suppressing it. Do you see why that might be happening?

Post Reply