Hi Peter,
thanks for this! I imagined millisecond rate queue clearance would be more realistic. I'm not sure what's happening with my code, but I get the same effect with this simplified version. Maybe it's my use of asyncio that's wrong... Anyway, I liked your encoder driver much better than mine, so I adapted it as below..
I'm using a mechanical encoder and an ESP32.
The 'debouncer' just ignores events occurring within 500ms of the last one admitted. Without it I get a lot of spurious triggerings, and I want to limit the rate anyway because I only need a few events, not thousands, and I don't need to keep track of them.
What's weird is that disabling it generates the same number of queue full exceptions, even though there are many more events being sheduled.
Typically the number of queue overflow exceptions increases the longer I test (by twiddling the encoder back and forth), but sometimes I get them before the main loop even registers an event...
Code: Select all
# encoder_portable.py
# BASED ON Copyright (c) 2017-2021 Peter Hinch
# Released under the MIT License (MIT) - see LICENSE file
from machine import Pin
import micropython
import utime
last_value_change_ms = 0
def debounce(db = 500):
global last_value_change_ms
cur_time = utime.ticks_ms()
diff = cur_time - last_value_change_ms
bounce = diff < db
if not bounce:
last_value_change_ms = utime.ticks_ms()
return bounce
class Encoder:
def __init__(self, pin_x, pin_y, handler, scale=1):
self.scale = scale
self.handler = handler
self.forward = True
self.pin_x = pin_x
self.pin_y = pin_y
self._pos = 0
try:
self.x_interrupt = pin_x.irq(trigger=Pin.IRQ_RISING | Pin.IRQ_FALLING, handler=self.x_callback, hard=True)
self.y_interrupt = pin_y.irq(trigger=Pin.IRQ_RISING | Pin.IRQ_FALLING, handler=self.y_callback, hard=True)
except TypeError:
self.x_interrupt = pin_x.irq(trigger=Pin.IRQ_RISING | Pin.IRQ_FALLING, handler=self.x_callback)
self.y_interrupt = pin_y.irq(trigger=Pin.IRQ_RISING | Pin.IRQ_FALLING, handler=self.y_callback)
def x_callback(self, pin):
if debounce():
return
self.forward = pin() ^ self.pin_y()
self._pos += 1 if self.forward else -1
micropython.schedule(self.handler, self.forward)
def y_callback(self, pin):
if debounce():
return
self.forward = self.pin_x() ^ pin() ^ 1
self._pos += 1 if self.forward else -1
micropython.schedule(self.handler, self.forward)
def position(self, value=None):
if value is not None:
self._pos = round(value / self.scale) # # Improvement provided by @IhorNehrutsa
return self._pos * self.scale
def value(self):
return self._pos
### =====================================
from machine import Pin, Timer, ADC
import uasyncio as asyncio
from ucollections import namedtuple
import micropython
import utime
class EncoderEvent(bool):
pass
def get_encoder_handler(events, flag):
def handler(fwd):
events.insert(0, EncoderEvent(fwd))
flag.set()
return handler
async def reactor():
p16 = Pin(16, Pin.IN, Pin.PULL_UP)
p17 = Pin(17, Pin.IN, Pin.PULL_UP)
events = []
flag = asyncio.ThreadSafeFlag()
encoder = Encoder(p16, p17, get_encoder_handler(events, flag))
while True:
if len(events) == 0:
await flag.wait()
while len(events):
event = events.pop()
if type(event) == EncoderEvent:
print('got Encoder: {}'.format(event))
def run():
import uasyncio as asyncio
asyncio.run(reactor())
This is what I get when I twiddle the knob:
Code: Select all
got Encoder: False
got Encoder: False
Traceback (most recent call last):
File "encoder.py", line 48, in x_callback
RuntimeError: schedule queue full
got Encoder: True
Traceback (most recent call last):
File "encoder.py", line 48, in x_callback
RuntimeError: schedule queue full
Traceback (most recent call last):
File "encoder.py", line 55, in y_callback
RuntimeError: schedule queue full
got Encoder: False
got Encoder: False
got Encoder: True
got Encoder: True
got Encoder: False
Traceback (most recent call last):
File "encoder.py", line 55, in y_callback
RuntimeError: schedule queue full
got Encoder: False
Traceback (most recent call last):
File "encoder.py", line 55, in y_callback
RuntimeError: schedule queue full
got Encoder: True
Traceback (most recent call last):
File "encoder.py", line 48, in x_callback
RuntimeError: schedule queue full
Traceback (most recent call last):
File "encoder.py", line 48, in x_callback
RuntimeError: schedule queue full
got Encoder: False
got Encoder: False
got Encoder: True