uasyncio - How detect the end task in another task.
Re: uasyncio - How detect the end task in another task.
Will do and re-write according to the tutorial!
Re: uasyncio - How detect the end task in another task.
Hi Peter!
I was able to re-write the code according to your tutorial so that it's not legacy anymore:
The led-blink example still works and the class with the blink method and the cancel method is in a file *my_app.py* .
Should I now first implement a temp_reader method or work on the encoder method? I assume both will then have to be put into **uasyncio.create_task()** above. Should the temperature measurement OR the encoder be non-terminating methods - for the last line above, e.g.
? Please let me know what you think. Happy weekend!
I was able to re-write the code according to your tutorial so that it's not legacy anymore:
Code: Select all
from machine import Pin, Timer, I2C
import dht
import time
import uasyncio
import gc
from my_app import temp_reader
led = Pin(2, Pin.OUT)
def set_global_exception():
def handle_exception(loop, context):
import sys
sys.print_exception(context["exception"])
sys.exit()
loop = uasyncio.get_event_loop()
loop.set_exception_handler(handle_exception)
async def main():
set_global_exception() # Debug aid
my_class = temp_reader(led) # Constructor might create tasks
uasyncio.create_task(my_class.blink()) # Or you might do this
await my_class.cancel() # Non-terminating method
try:
uasyncio.run(main())
finally:
uasyncio.new_event_loop() # Clear retained state
main()
gc.collect()
gc.threshold(gc.mem_free() // 4 + gc.mem_alloc())
Should I now first implement a temp_reader method or work on the encoder method? I assume both will then have to be put into **uasyncio.create_task()** above. Should the temperature measurement OR the encoder be non-terminating methods - for the last line above, e.g.
Code: Select all
await my_class.temp_measure() # Non-terminating method
- pythoncoder
- Posts: 5956
- Joined: Fri Jul 18, 2014 8:01 am
- Location: UK
- Contact:
Re: uasyncio - How detect the end task in another task.
A typical firmware application will have several non-terminating tasks. Your main() task starts all but one by issuing .create_task() and then awaits the last task: this ensures that the scheduler itself runs forever.
In many cases it doesn't matter which task you await - it's only an issue if the application logic requires the tasks to be started in a defined order.
In many cases it doesn't matter which task you await - it's only an issue if the application logic requires the tasks to be started in a defined order.
Peter Hinch
Index to my micropython libraries.
Index to my micropython libraries.
Re: uasyncio - How detect the end task in another task.
Gotcha! Thanks for the input - once I have my drivers up and running again - I'll continue building the coros.
Re: uasyncio - How detect the end task in another task.
Hi Pete,
I got my system up and running again
I've successfully implemented code that blinks the led *and* reads the temperature at the same time according to your specs and tutorial.
One question popped up.
When I run this:
It works without a hitch. However, when I have the class in another file, like so
main.py
my_app.py
I get this error
Do you know what might be causing this?
Have a great weekend!
Matthias
I got my system up and running again
I've successfully implemented code that blinks the led *and* reads the temperature at the same time according to your specs and tutorial.
One question popped up.
When I run this:
Code: Select all
from machine import Pin, Timer, I2C
import dht
import time
import uasyncio
import gc
led = Pin(2, Pin.OUT)
sensor = dht.DHT22(Pin(14))
class tempReader():
sleep = 1 #static instance variable for testing
def __init__(self, led, sensor):
self.led = led
self.sensor = sensor
async def blink(self):
while True:
self.led.on()
await uasyncio.sleep(tempReader.sleep)
self.led.off()
await uasyncio.sleep(tempReader.sleep)
async def temp(self):
while True:
self.sensor.measure()
temp = self.sensor.temperature()
print('Temperature: %3.1f C' %temp)
await uasyncio.sleep(2)
async def cancel(self):
await uasyncio.sleep(10) #stops the loop after 10 seconds
def set_global_exception():
def handle_exception(loop, context):
import sys
sys.print_exception(context["exception"])
sys.exit()
loop = uasyncio.get_event_loop()
loop.set_exception_handler(handle_exception)
async def main():
set_global_exception() # Debug aid
my_class = tempReader(led, sensor) # Constructor might create tasks
uasyncio.create_task(my_class.blink()) # Or you might do this
uasyncio.create_task(my_class.temp())
await my_class.cancel() # Non-terminating method
try:
uasyncio.run(main())
finally:
uasyncio.new_event_loop() # Clear retained state
main()
gc.collect()
gc.threshold(gc.mem_free() // 4 + gc.mem_alloc())
main.py
Code: Select all
from machine import Pin, Timer, I2C
import dht
import time
import uasyncio
import gc
from my_app import tempReader
led = Pin(2, Pin.OUT)
sensor = dht.DHT22(Pin(14))
def set_global_exception():
def handle_exception(loop, context):
import sys
sys.print_exception(context["exception"])
sys.exit()
loop = uasyncio.get_event_loop()
loop.set_exception_handler(handle_exception)
async def main():
set_global_exception() # Debug aid
my_class = tempReader(led, sensor) # Constructor might create tasks
uasyncio.create_task(my_class.blink()) # Or you might do this
uasyncio.create_task(my_class.temp())
await my_class.cancel() # Non-terminating method
try:
uasyncio.run(main())
finally:
uasyncio.new_event_loop() # Clear retained state
main()
gc.collect()
gc.threshold(gc.mem_free() // 4 + gc.mem_alloc())
Code: Select all
from machine import Pin, Timer, I2C
import dht
import time
import uasyncio
import gc
class tempReader():
sleep = 1 #static instance variable for testing
def __init__(self, led, sensor):
self.led = led
self.sensor = sensor
async def blink(self):
while True:
self.led.on()
await uasyncio.sleep(tempReader.sleep)
self.led.off()
await uasyncio.sleep(tempReader.sleep)
async def temp(self):
while True:
self.sensor.measure()
temp = self.sensor.temperature()
print('Temperature: %3.1f C' %temp)
await uasyncio.sleep(2)
async def cancel(self):
await uasyncio.sleep(10) #stops the loop after 10 seconds
It seems that the second class method is not imported correctly to main.py. I'm under the impression that the sensor object has to go into the class instantiation process, right?TypeError: function takes 2 positional arguments but 3 were given
Do you know what might be causing this?
Have a great weekend!
Matthias
- pythoncoder
- Posts: 5956
- Joined: Fri Jul 18, 2014 8:01 am
- Location: UK
- Contact:
Re: uasyncio - How detect the end task in another task.
I can't spot the cause. It would help if you told us the line number where the error occurs.
It's worth noting that the following code will only be executed after you interrupt the program or after it fails with an exception:
This is because execution passes to the scheduler with
It's worth noting that the following code will only be executed after you interrupt the program or after it fails with an exception:
Code: Select all
main()
gc.collect()
gc.threshold(gc.mem_free() // 4 + gc.mem_alloc())
Code: Select all
uasyncio.run(main())
Peter Hinch
Index to my micropython libraries.
Index to my micropython libraries.
Re: uasyncio - How detect the end task in another task.
Gotcha, I removed the dangling main() function call
Also, the above error has resolved itself and now it's running smoothly, which means I can now move on to handling the rotary encoder or displaying the temperature.
Also, the above error has resolved itself and now it's running smoothly, which means I can now move on to handling the rotary encoder or displaying the temperature.
Re: uasyncio - How detect the end task in another task.
Hey Pete,
I tried installing a daily build on an ESP8266 today to get the latest version of uasyncio on there, but that led to urequests not being available anymore, which only seems to be available in stable versions. Are you aware of any ESP8266 builds, which have all standard libraries including urequests AND the latest version of uasyncio?
Thanks!
I tried installing a daily build on an ESP8266 today to get the latest version of uasyncio on there, but that led to urequests not being available anymore, which only seems to be available in stable versions. Are you aware of any ESP8266 builds, which have all standard libraries including urequests AND the latest version of uasyncio?
Thanks!
- pythoncoder
- Posts: 5956
- Joined: Fri Jul 18, 2014 8:01 am
- Location: UK
- Contact:
Re: uasyncio - How detect the end task in another task.
You'll need to install it with upip. I'm not familiar with urequests so I suggest you read this to figure out which version you need.
Peter Hinch
Index to my micropython libraries.
Index to my micropython libraries.
Re: uasyncio - How detect the end task in another task.
Hey Pete,
Thanks for the ESP8266 tip. I've been working some more on my temp-reader project and am testing your aswitch.py functions to be able to give the app potentially more functionality.
I've also written a class method, which now shows the current temperature reliably on the OLED. Then, I went into button testing with three external LEDs.
All button functions work, the double click and long click trigger the right functions. However, even though I have set suppress = True in the Pushbutton class and have a release function. The press_func still runs and runs the pulse function. I'm sure I'm missing something that prevents that from really suppressing it. Do you see why that might be happening?
Thanks for the ESP8266 tip. I've been working some more on my temp-reader project and am testing your aswitch.py functions to be able to give the app potentially more functionality.
I've also written a class method, which now shows the current temperature reliably on the OLED. Then, I went into button testing with three external LEDs.
Code: Select all
from machine import Pin, Timer, I2C
import dht
import time
import uasyncio
import gc
import ssd1306
from aswitch import Switch, Pushbutton
from my_app import tempReader
led = Pin(2, Pin.OUT)
led2 = Pin(19, Pin.OUT)
led3 = Pin(32, Pin.OUT)
led4 = Pin(33, Pin.OUT)
sw = Pin(18, Pin.IN, Pin.PULL_UP)
sensor = dht.DHT22(Pin(14))
i2c = I2C(-1, scl=Pin(22), sda=Pin(21))
oled_width = 128
oled_height = 64
oled = ssd1306.SSD1306_I2C(oled_width, oled_height, i2c)
async def pulse(led, ms):
for i in range(5):
led.on()
await uasyncio.sleep_ms(ms)
led.off()
await uasyncio.sleep_ms(ms)
async def blink2(led):
for i in range(5):
led.on()
await uasyncio.sleep_ms(100)
led.off()
await uasyncio.sleep_ms(100)
async def blink3(led):
for i in range(10):
led.on()
await uasyncio.sleep_ms(200)
led.off()
await uasyncio.sleep_ms(200)
async def nothing():
await uasyncio.sleep_ms(2)
def set_global_exception():
def handle_exception(loop, context):
import sys
sys.print_exception(context["exception"])
sys.exit()
loop = uasyncio.get_event_loop()
loop.set_exception_handler(handle_exception)
async def main():
set_global_exception() # Debug aid
my_class = tempReader(led, sensor, oled) # Constructor might create tasks
switch = Pushbutton(sw, suppress = True)
switch.press_func(pulse, (led3, 1000))
switch.release_func(nothing, ()) #check to see if there is a long or double func
switch.double_func(blink2, (led2,))
switch.long_func(blink3, (led4,))
uasyncio.create_task(my_class.blink()) # Or you might do this
uasyncio.create_task(my_class.temp())
uasyncio.create_task(my_class.show())
await my_class.cancel() # Non-terminating method
try:
uasyncio.run(main())
finally:
uasyncio.new_event_loop() # Clear retained state
gc.collect()
gc.threshold(gc.mem_free() // 4 + gc.mem_alloc())