For Your Review - Reed Switch Door Status

All ESP8266 boards running MicroPython.
Official boards are the Adafruit Huzzah and Feather boards.
Target audience: MicroPython users with an ESP8266 board.
Post Reply
rca
Posts: 3
Joined: Sat Jul 14, 2018 6:20 pm

For Your Review - Reed Switch Door Status

Post by rca » Sun Jul 29, 2018 5:01 am

Let me start off by saying I am fairly new to Python and MicroPython. I'm enjoying the learning process though and have my first working device, using a Wemos D1 Mini with a reed switch to send the status of my garage door to MQTT. I'm using the asyncio and aswitch modules compiled in a customer firmware in the code below to update a MQTT topic whenever the reed switch is triggered, or every 5 minutes.

The code seems to be working for me. I would like to hear some review and suggestions so I can continue to learn. Thanks.

from config import SERVER, STATE_TOPIC
from umqtt.simple import MQTTClient
import uasyncio as asyncio
from aswitch import Switch
import ubinascii
from machine import Pin
import machine

CLIENT_ID = ubinascii.hexlify(machine.unique_id())

#function to send mqtt message status
def mqtt_msg(update):
client.publish(STATE_TOPIC,update)

#coro to send status of switch every 5 mintues
async def timed_update():
while True:
await asyncio.sleep(300)
mqtt_msg(sw_status)

#coro initiated by test_sw function, when status of switch changes send mqtt update message
async def sw_coro(coro_sw_status):
global sw_status
sw_status = coro_sw_status
mqtt_msg(sw_status)
await asyncio.sleep(0)

#function to get status of reed switch by checking pin value of D5 on wemos d1 mini
#using switch class, then sets and sends initial state to mqtt
#then initiates coros for switch change to open and close
def test_sw():
global sw_status
pin = Pin(14, Pin.IN, Pin.PULL_UP)
sw = Switch(pin)
if sw.switchstate == 0:
sw_status = 'closed'
else:
sw_status = 'open'
mqtt_msg(sw_status)
sw.close_func(sw_coro, ('closed',))
sw.open_func(sw_coro, ('open',))


# setup connection to MQTT, reset board if no connection
client = MQTTClient(CLIENT_ID,SERVER)
try:
client.connect()
print("connected to mqtt")
except OSError:
print("no connection to mqtt, resetting in 20 seconds")
time.sleep(20)
machine.reset()

#run function to test switch, set initial switch state and initiate switch coros
test_sw()

#initiate event loop using asyncio and create timed coro
loop = asyncio.get_event_loop()
loop.create_task(timed_update())
loop.run_forever()

User avatar
pythoncoder
Posts: 5956
Joined: Fri Jul 18, 2014 8:01 am
Location: UK
Contact:

Re: For Your Review - Reed Switch Door Status

Post by pythoncoder » Mon Jul 30, 2018 7:23 am

Great to see that you're using uasyncio - the proper way to write firmware ;)

It's hard to argue with code that works. It could be slightly simplified by using a callback rather than a coroutine for the Switch instance, but this is just a matter of style.

Code: Select all

from config import SERVER, STATE_TOPIC
from umqtt.simple import MQTTClient
import uasyncio as asyncio
from aswitch import Switch
import ubinascii
from machine import Pin
import machine

CLIENT_ID = ubinascii.hexlify(machine.unique_id())

#function to send mqtt message status
def mqtt_msg(update):
	client.publish(STATE_TOPIC,update)

#coro to send status of switch every 5 mintues	
async def timed_update():
	while True:
		await asyncio.sleep(300)
		mqtt_msg(sw_status)
	
# callback initiated by test_sw function, when status of switch changes send mqtt update message	
def sw_callback(coro_sw_status):
	global sw_status
	sw_status = coro_sw_status
	mqtt_msg(sw_status)

#function to get status of reed switch by checking pin value of D5 on wemos d1 mini
#using switch class, then sets and sends initial state to mqtt
#then initiates coros for switch change to open and close	
def test_sw():
	global sw_status
	pin = Pin(14, Pin.IN, Pin.PULL_UP)
	sw = Switch(pin)
	if sw.switchstate == 0:
		sw_status = 'closed'
	else:
		sw_status = 'open'
	mqtt_msg(sw_status)
	sw.close_func(sw_callback, ('closed',))
	sw.open_func(sw_callback, ('open',))

	
# setup connection to MQTT, reset board if no connection
client = MQTTClient(CLIENT_ID,SERVER)
try:
	client.connect()
	print("connected to mqtt")
except OSError:
	print("no connection to mqtt, resetting in 20 seconds")
	time.sleep(20)
	machine.reset()

#run function to test switch, set initial switch state and initiate switch coros
test_sw()	

#initiate event loop using asyncio and create timed coro	
loop = asyncio.get_event_loop()
loop.create_task(timed_update())
loop.run_forever()
Peter Hinch
Index to my micropython libraries.

rca
Posts: 3
Joined: Sat Jul 14, 2018 6:20 pm

Re: For Your Review - Reed Switch Door Status

Post by rca » Mon Jul 30, 2018 3:59 pm

Thanks Peter. I was wondering about the option of a callback.

So help me understand. The way I had it setup as a coro, it added the sw_coro to the event loop as a coroutine. Setup as a callback, the callback is not part of the event loop, but basically gets processed whenever the switch changes its state? Both setups use the aswitch module though, to get the benefits of the debounce handling, correct?

Thank you for your assistance.

User avatar
pythoncoder
Posts: 5956
Joined: Fri Jul 18, 2014 8:01 am
Location: UK
Contact:

Re: For Your Review - Reed Switch Door Status

Post by pythoncoder » Tue Jul 31, 2018 8:36 am

Yes, both use debouncing: the Switch instance runs a task which handles this. When the switch changes state the callback runs, performs the publish, and returns. Using a coroutine achieves exactly the same thing.

It's worth noting that umqtt is synchronous code. In particular, if you publish with qos==1 it will block until it receives a response from the broker. This can take a long time; under error conditions it can block forever. This fact would remain if you used a coroutine: in cooperative scheduling if something blocks, everything blocks.

I wrote an asynchronous MQTT client which addresses this and other issues.
Peter Hinch
Index to my micropython libraries.

User avatar
tonyldo
Posts: 5
Joined: Wed Jun 20, 2018 5:04 pm
Location: Brazil

Re: For Your Review - Reed Switch Door Status

Post by tonyldo » Fri Aug 24, 2018 5:19 pm

rca wrote:
Sun Jul 29, 2018 5:01 am
Let me start off by saying I am fairly new to Python and MicroPython. I'm enjoying the learning process though and have my first working device, using a Wemos D1 Mini with a reed switch to send the status of my garage door to MQTT. I'm using the asyncio and aswitch modules compiled in a customer firmware in the code below to update a MQTT topic whenever the reed switch is triggered, or every 5 minutes.

The code seems to be working for me. I would like to hear some review and suggestions so I can continue to learn. Thanks.

from config import SERVER, STATE_TOPIC
from umqtt.simple import MQTTClient
import uasyncio as asyncio
from aswitch import Switch
import ubinascii
from machine import Pin
import machine

CLIENT_ID = ubinascii.hexlify(machine.unique_id())

#function to send mqtt message status
def mqtt_msg(update):
client.publish(STATE_TOPIC,update)

#coro to send status of switch every 5 mintues
async def timed_update():
while True:
await asyncio.sleep(300)
mqtt_msg(sw_status)

#coro initiated by test_sw function, when status of switch changes send mqtt update message
async def sw_coro(coro_sw_status):
global sw_status
sw_status = coro_sw_status
mqtt_msg(sw_status)
await asyncio.sleep(0)

#function to get status of reed switch by checking pin value of D5 on wemos d1 mini
#using switch class, then sets and sends initial state to mqtt
#then initiates coros for switch change to open and close
def test_sw():
global sw_status
pin = Pin(14, Pin.IN, Pin.PULL_UP)
sw = Switch(pin)
if sw.switchstate == 0:
sw_status = 'closed'
else:
sw_status = 'open'
mqtt_msg(sw_status)
sw.close_func(sw_coro, ('closed',))
sw.open_func(sw_coro, ('open',))


# setup connection to MQTT, reset board if no connection
client = MQTTClient(CLIENT_ID,SERVER)
try:
client.connect()
print("connected to mqtt")
except OSError:
print("no connection to mqtt, resetting in 20 seconds")
time.sleep(20)
machine.reset()

#run function to test switch, set initial switch state and initiate switch coros
test_sw()

#initiate event loop using asyncio and create timed coro
loop = asyncio.get_event_loop()
loop.create_task(timed_update())
loop.run_forever()
If u post this on github, i´ll help u... I ´m working in door sensors...

Post Reply