For Your Review - Reed Switch Door Status
Posted: Sun Jul 29, 2018 5:01 am
Let me start off by saying I am fairly new to Python and MicroPython. I'm enjoying the learning process though and have my first working device, using a Wemos D1 Mini with a reed switch to send the status of my garage door to MQTT. I'm using the asyncio and aswitch modules compiled in a customer firmware in the code below to update a MQTT topic whenever the reed switch is triggered, or every 5 minutes.
The code seems to be working for me. I would like to hear some review and suggestions so I can continue to learn. Thanks.
from config import SERVER, STATE_TOPIC
from umqtt.simple import MQTTClient
import uasyncio as asyncio
from aswitch import Switch
import ubinascii
from machine import Pin
import machine
CLIENT_ID = ubinascii.hexlify(machine.unique_id())
#function to send mqtt message status
def mqtt_msg(update):
client.publish(STATE_TOPIC,update)
#coro to send status of switch every 5 mintues
async def timed_update():
while True:
await asyncio.sleep(300)
mqtt_msg(sw_status)
#coro initiated by test_sw function, when status of switch changes send mqtt update message
async def sw_coro(coro_sw_status):
global sw_status
sw_status = coro_sw_status
mqtt_msg(sw_status)
await asyncio.sleep(0)
#function to get status of reed switch by checking pin value of D5 on wemos d1 mini
#using switch class, then sets and sends initial state to mqtt
#then initiates coros for switch change to open and close
def test_sw():
global sw_status
pin = Pin(14, Pin.IN, Pin.PULL_UP)
sw = Switch(pin)
if sw.switchstate == 0:
sw_status = 'closed'
else:
sw_status = 'open'
mqtt_msg(sw_status)
sw.close_func(sw_coro, ('closed',))
sw.open_func(sw_coro, ('open',))
# setup connection to MQTT, reset board if no connection
client = MQTTClient(CLIENT_ID,SERVER)
try:
client.connect()
print("connected to mqtt")
except OSError:
print("no connection to mqtt, resetting in 20 seconds")
time.sleep(20)
machine.reset()
#run function to test switch, set initial switch state and initiate switch coros
test_sw()
#initiate event loop using asyncio and create timed coro
loop = asyncio.get_event_loop()
loop.create_task(timed_update())
loop.run_forever()
The code seems to be working for me. I would like to hear some review and suggestions so I can continue to learn. Thanks.
from config import SERVER, STATE_TOPIC
from umqtt.simple import MQTTClient
import uasyncio as asyncio
from aswitch import Switch
import ubinascii
from machine import Pin
import machine
CLIENT_ID = ubinascii.hexlify(machine.unique_id())
#function to send mqtt message status
def mqtt_msg(update):
client.publish(STATE_TOPIC,update)
#coro to send status of switch every 5 mintues
async def timed_update():
while True:
await asyncio.sleep(300)
mqtt_msg(sw_status)
#coro initiated by test_sw function, when status of switch changes send mqtt update message
async def sw_coro(coro_sw_status):
global sw_status
sw_status = coro_sw_status
mqtt_msg(sw_status)
await asyncio.sleep(0)
#function to get status of reed switch by checking pin value of D5 on wemos d1 mini
#using switch class, then sets and sends initial state to mqtt
#then initiates coros for switch change to open and close
def test_sw():
global sw_status
pin = Pin(14, Pin.IN, Pin.PULL_UP)
sw = Switch(pin)
if sw.switchstate == 0:
sw_status = 'closed'
else:
sw_status = 'open'
mqtt_msg(sw_status)
sw.close_func(sw_coro, ('closed',))
sw.open_func(sw_coro, ('open',))
# setup connection to MQTT, reset board if no connection
client = MQTTClient(CLIENT_ID,SERVER)
try:
client.connect()
print("connected to mqtt")
except OSError:
print("no connection to mqtt, resetting in 20 seconds")
time.sleep(20)
machine.reset()
#run function to test switch, set initial switch state and initiate switch coros
test_sw()
#initiate event loop using asyncio and create timed coro
loop = asyncio.get_event_loop()
loop.create_task(timed_update())
loop.run_forever()