Let me start off by saying I am fairly new to Python and MicroPython. I'm enjoying the learning process though and have my first working device, using a Wemos D1 Mini with a reed switch to send the status of my garage door to MQTT. I'm using the asyncio and aswitch modules compiled in a customer firmware in the code below to update a MQTT topic whenever the reed switch is triggered, or every 5 minutes.
The code seems to be working for me. I would like to hear some review and suggestions so I can continue to learn. Thanks.
from config import SERVER, STATE_TOPIC
from umqtt.simple import MQTTClient
import uasyncio as asyncio
from aswitch import Switch
import ubinascii
from machine import Pin
import machine
CLIENT_ID = ubinascii.hexlify(machine.unique_id())
#function to send mqtt message status
def mqtt_msg(update):
client.publish(STATE_TOPIC,update)
#coro to send status of switch every 5 mintues
async def timed_update():
while True:
await asyncio.sleep(300)
mqtt_msg(sw_status)
#coro initiated by test_sw function, when status of switch changes send mqtt update message
async def sw_coro(coro_sw_status):
global sw_status
sw_status = coro_sw_status
mqtt_msg(sw_status)
await asyncio.sleep(0)
#function to get status of reed switch by checking pin value of D5 on wemos d1 mini
#using switch class, then sets and sends initial state to mqtt
#then initiates coros for switch change to open and close
def test_sw():
global sw_status
pin = Pin(14, Pin.IN, Pin.PULL_UP)
sw = Switch(pin)
if sw.switchstate == 0:
sw_status = 'closed'
else:
sw_status = 'open'
mqtt_msg(sw_status)
sw.close_func(sw_coro, ('closed',))
sw.open_func(sw_coro, ('open',))
# setup connection to MQTT, reset board if no connection
client = MQTTClient(CLIENT_ID,SERVER)
try:
client.connect()
print("connected to mqtt")
except OSError:
print("no connection to mqtt, resetting in 20 seconds")
time.sleep(20)
machine.reset()
#run function to test switch, set initial switch state and initiate switch coros
test_sw()
#initiate event loop using asyncio and create timed coro
loop = asyncio.get_event_loop()
loop.create_task(timed_update())
loop.run_forever()
For Your Review - Reed Switch Door Status
- pythoncoder
- Posts: 5956
- Joined: Fri Jul 18, 2014 8:01 am
- Location: UK
- Contact:
Re: For Your Review - Reed Switch Door Status
Great to see that you're using uasyncio - the proper way to write firmware
It's hard to argue with code that works. It could be slightly simplified by using a callback rather than a coroutine for the Switch instance, but this is just a matter of style.
It's hard to argue with code that works. It could be slightly simplified by using a callback rather than a coroutine for the Switch instance, but this is just a matter of style.
Code: Select all
from config import SERVER, STATE_TOPIC
from umqtt.simple import MQTTClient
import uasyncio as asyncio
from aswitch import Switch
import ubinascii
from machine import Pin
import machine
CLIENT_ID = ubinascii.hexlify(machine.unique_id())
#function to send mqtt message status
def mqtt_msg(update):
client.publish(STATE_TOPIC,update)
#coro to send status of switch every 5 mintues
async def timed_update():
while True:
await asyncio.sleep(300)
mqtt_msg(sw_status)
# callback initiated by test_sw function, when status of switch changes send mqtt update message
def sw_callback(coro_sw_status):
global sw_status
sw_status = coro_sw_status
mqtt_msg(sw_status)
#function to get status of reed switch by checking pin value of D5 on wemos d1 mini
#using switch class, then sets and sends initial state to mqtt
#then initiates coros for switch change to open and close
def test_sw():
global sw_status
pin = Pin(14, Pin.IN, Pin.PULL_UP)
sw = Switch(pin)
if sw.switchstate == 0:
sw_status = 'closed'
else:
sw_status = 'open'
mqtt_msg(sw_status)
sw.close_func(sw_callback, ('closed',))
sw.open_func(sw_callback, ('open',))
# setup connection to MQTT, reset board if no connection
client = MQTTClient(CLIENT_ID,SERVER)
try:
client.connect()
print("connected to mqtt")
except OSError:
print("no connection to mqtt, resetting in 20 seconds")
time.sleep(20)
machine.reset()
#run function to test switch, set initial switch state and initiate switch coros
test_sw()
#initiate event loop using asyncio and create timed coro
loop = asyncio.get_event_loop()
loop.create_task(timed_update())
loop.run_forever()
Peter Hinch
Index to my micropython libraries.
Index to my micropython libraries.
Re: For Your Review - Reed Switch Door Status
Thanks Peter. I was wondering about the option of a callback.
So help me understand. The way I had it setup as a coro, it added the sw_coro to the event loop as a coroutine. Setup as a callback, the callback is not part of the event loop, but basically gets processed whenever the switch changes its state? Both setups use the aswitch module though, to get the benefits of the debounce handling, correct?
Thank you for your assistance.
So help me understand. The way I had it setup as a coro, it added the sw_coro to the event loop as a coroutine. Setup as a callback, the callback is not part of the event loop, but basically gets processed whenever the switch changes its state? Both setups use the aswitch module though, to get the benefits of the debounce handling, correct?
Thank you for your assistance.
- pythoncoder
- Posts: 5956
- Joined: Fri Jul 18, 2014 8:01 am
- Location: UK
- Contact:
Re: For Your Review - Reed Switch Door Status
Yes, both use debouncing: the Switch instance runs a task which handles this. When the switch changes state the callback runs, performs the publish, and returns. Using a coroutine achieves exactly the same thing.
It's worth noting that umqtt is synchronous code. In particular, if you publish with qos==1 it will block until it receives a response from the broker. This can take a long time; under error conditions it can block forever. This fact would remain if you used a coroutine: in cooperative scheduling if something blocks, everything blocks.
I wrote an asynchronous MQTT client which addresses this and other issues.
It's worth noting that umqtt is synchronous code. In particular, if you publish with qos==1 it will block until it receives a response from the broker. This can take a long time; under error conditions it can block forever. This fact would remain if you used a coroutine: in cooperative scheduling if something blocks, everything blocks.
I wrote an asynchronous MQTT client which addresses this and other issues.
Peter Hinch
Index to my micropython libraries.
Index to my micropython libraries.
Re: For Your Review - Reed Switch Door Status
If u post this on github, i´ll help u... I ´m working in door sensors...rca wrote: ↑Sun Jul 29, 2018 5:01 amLet me start off by saying I am fairly new to Python and MicroPython. I'm enjoying the learning process though and have my first working device, using a Wemos D1 Mini with a reed switch to send the status of my garage door to MQTT. I'm using the asyncio and aswitch modules compiled in a customer firmware in the code below to update a MQTT topic whenever the reed switch is triggered, or every 5 minutes.
The code seems to be working for me. I would like to hear some review and suggestions so I can continue to learn. Thanks.
from config import SERVER, STATE_TOPIC
from umqtt.simple import MQTTClient
import uasyncio as asyncio
from aswitch import Switch
import ubinascii
from machine import Pin
import machine
CLIENT_ID = ubinascii.hexlify(machine.unique_id())
#function to send mqtt message status
def mqtt_msg(update):
client.publish(STATE_TOPIC,update)
#coro to send status of switch every 5 mintues
async def timed_update():
while True:
await asyncio.sleep(300)
mqtt_msg(sw_status)
#coro initiated by test_sw function, when status of switch changes send mqtt update message
async def sw_coro(coro_sw_status):
global sw_status
sw_status = coro_sw_status
mqtt_msg(sw_status)
await asyncio.sleep(0)
#function to get status of reed switch by checking pin value of D5 on wemos d1 mini
#using switch class, then sets and sends initial state to mqtt
#then initiates coros for switch change to open and close
def test_sw():
global sw_status
pin = Pin(14, Pin.IN, Pin.PULL_UP)
sw = Switch(pin)
if sw.switchstate == 0:
sw_status = 'closed'
else:
sw_status = 'open'
mqtt_msg(sw_status)
sw.close_func(sw_coro, ('closed',))
sw.open_func(sw_coro, ('open',))
# setup connection to MQTT, reset board if no connection
client = MQTTClient(CLIENT_ID,SERVER)
try:
client.connect()
print("connected to mqtt")
except OSError:
print("no connection to mqtt, resetting in 20 seconds")
time.sleep(20)
machine.reset()
#run function to test switch, set initial switch state and initiate switch coros
test_sw()
#initiate event loop using asyncio and create timed coro
loop = asyncio.get_event_loop()
loop.create_task(timed_update())
loop.run_forever()