Re: uasyncio - How detect the end task in another task.
Posted: Tue Jul 28, 2020 2:07 pm
Will do and re-write according to the tutorial!
Please see the new forum at
https://forum.micropython.org/
Code: Select all
from machine import Pin, Timer, I2C
import dht
import time
import uasyncio
import gc
from my_app import temp_reader
led = Pin(2, Pin.OUT)
def set_global_exception():
def handle_exception(loop, context):
import sys
sys.print_exception(context["exception"])
sys.exit()
loop = uasyncio.get_event_loop()
loop.set_exception_handler(handle_exception)
async def main():
set_global_exception() # Debug aid
my_class = temp_reader(led) # Constructor might create tasks
uasyncio.create_task(my_class.blink()) # Or you might do this
await my_class.cancel() # Non-terminating method
try:
uasyncio.run(main())
finally:
uasyncio.new_event_loop() # Clear retained state
main()
gc.collect()
gc.threshold(gc.mem_free() // 4 + gc.mem_alloc())
Code: Select all
await my_class.temp_measure() # Non-terminating method
Code: Select all
from machine import Pin, Timer, I2C
import dht
import time
import uasyncio
import gc
led = Pin(2, Pin.OUT)
sensor = dht.DHT22(Pin(14))
class tempReader():
sleep = 1 #static instance variable for testing
def __init__(self, led, sensor):
self.led = led
self.sensor = sensor
async def blink(self):
while True:
self.led.on()
await uasyncio.sleep(tempReader.sleep)
self.led.off()
await uasyncio.sleep(tempReader.sleep)
async def temp(self):
while True:
self.sensor.measure()
temp = self.sensor.temperature()
print('Temperature: %3.1f C' %temp)
await uasyncio.sleep(2)
async def cancel(self):
await uasyncio.sleep(10) #stops the loop after 10 seconds
def set_global_exception():
def handle_exception(loop, context):
import sys
sys.print_exception(context["exception"])
sys.exit()
loop = uasyncio.get_event_loop()
loop.set_exception_handler(handle_exception)
async def main():
set_global_exception() # Debug aid
my_class = tempReader(led, sensor) # Constructor might create tasks
uasyncio.create_task(my_class.blink()) # Or you might do this
uasyncio.create_task(my_class.temp())
await my_class.cancel() # Non-terminating method
try:
uasyncio.run(main())
finally:
uasyncio.new_event_loop() # Clear retained state
main()
gc.collect()
gc.threshold(gc.mem_free() // 4 + gc.mem_alloc())
Code: Select all
from machine import Pin, Timer, I2C
import dht
import time
import uasyncio
import gc
from my_app import tempReader
led = Pin(2, Pin.OUT)
sensor = dht.DHT22(Pin(14))
def set_global_exception():
def handle_exception(loop, context):
import sys
sys.print_exception(context["exception"])
sys.exit()
loop = uasyncio.get_event_loop()
loop.set_exception_handler(handle_exception)
async def main():
set_global_exception() # Debug aid
my_class = tempReader(led, sensor) # Constructor might create tasks
uasyncio.create_task(my_class.blink()) # Or you might do this
uasyncio.create_task(my_class.temp())
await my_class.cancel() # Non-terminating method
try:
uasyncio.run(main())
finally:
uasyncio.new_event_loop() # Clear retained state
main()
gc.collect()
gc.threshold(gc.mem_free() // 4 + gc.mem_alloc())
Code: Select all
from machine import Pin, Timer, I2C
import dht
import time
import uasyncio
import gc
class tempReader():
sleep = 1 #static instance variable for testing
def __init__(self, led, sensor):
self.led = led
self.sensor = sensor
async def blink(self):
while True:
self.led.on()
await uasyncio.sleep(tempReader.sleep)
self.led.off()
await uasyncio.sleep(tempReader.sleep)
async def temp(self):
while True:
self.sensor.measure()
temp = self.sensor.temperature()
print('Temperature: %3.1f C' %temp)
await uasyncio.sleep(2)
async def cancel(self):
await uasyncio.sleep(10) #stops the loop after 10 seconds
It seems that the second class method is not imported correctly to main.py. I'm under the impression that the sensor object has to go into the class instantiation process, right?TypeError: function takes 2 positional arguments but 3 were given
Code: Select all
main()
gc.collect()
gc.threshold(gc.mem_free() // 4 + gc.mem_alloc())
Code: Select all
uasyncio.run(main())
Code: Select all
from machine import Pin, Timer, I2C
import dht
import time
import uasyncio
import gc
import ssd1306
from aswitch import Switch, Pushbutton
from my_app import tempReader
led = Pin(2, Pin.OUT)
led2 = Pin(19, Pin.OUT)
led3 = Pin(32, Pin.OUT)
led4 = Pin(33, Pin.OUT)
sw = Pin(18, Pin.IN, Pin.PULL_UP)
sensor = dht.DHT22(Pin(14))
i2c = I2C(-1, scl=Pin(22), sda=Pin(21))
oled_width = 128
oled_height = 64
oled = ssd1306.SSD1306_I2C(oled_width, oled_height, i2c)
async def pulse(led, ms):
for i in range(5):
led.on()
await uasyncio.sleep_ms(ms)
led.off()
await uasyncio.sleep_ms(ms)
async def blink2(led):
for i in range(5):
led.on()
await uasyncio.sleep_ms(100)
led.off()
await uasyncio.sleep_ms(100)
async def blink3(led):
for i in range(10):
led.on()
await uasyncio.sleep_ms(200)
led.off()
await uasyncio.sleep_ms(200)
async def nothing():
await uasyncio.sleep_ms(2)
def set_global_exception():
def handle_exception(loop, context):
import sys
sys.print_exception(context["exception"])
sys.exit()
loop = uasyncio.get_event_loop()
loop.set_exception_handler(handle_exception)
async def main():
set_global_exception() # Debug aid
my_class = tempReader(led, sensor, oled) # Constructor might create tasks
switch = Pushbutton(sw, suppress = True)
switch.press_func(pulse, (led3, 1000))
switch.release_func(nothing, ()) #check to see if there is a long or double func
switch.double_func(blink2, (led2,))
switch.long_func(blink3, (led4,))
uasyncio.create_task(my_class.blink()) # Or you might do this
uasyncio.create_task(my_class.temp())
uasyncio.create_task(my_class.show())
await my_class.cancel() # Non-terminating method
try:
uasyncio.run(main())
finally:
uasyncio.new_event_loop() # Clear retained state
gc.collect()
gc.threshold(gc.mem_free() // 4 + gc.mem_alloc())