Hi,
I have a rotary encoder (functioning as a volume control) with an interrupt handler which schedules a user mode handler.
The rate of interrupts causes the schedule queue to overflow, even with a debounce time of several hundred ms, so it seems the queue can only manage about 1 interrupt per second. These only occur in bursts when the knob is turned, not continuously.
It may be that the main thread is too busy doing other stuff, but I'm not keen to add a thread just for this. Is there any way to increase the queue length with python? I haven't looked at the MP sources, but I'm guessing there's a hash define there somewhere if I recompile the interpreter..
Cheers
micropython.schedule() queue length
- pythoncoder
- Posts: 5956
- Joined: Fri Jul 18, 2014 8:01 am
- Location: UK
- Contact:
Re: micropython.schedule() queue length
It would help if you told us what hardware you're using. If you're using .schedule() I assume your hardware supports hard IRQ's. In which case you might like to look at the work I've done. The interface with uasyncio works well, and I have tested an audio player application with an encoder as a volume control.
To answer your question I'd expect .schedule() to be called with a latency measured in single digit ms. It's normally fast, only venturing into multiple ms if it happens to collide with a GC. So I think something is very amiss in your application if you're seeing latency of one second. It's also unclear how increasing the queue length will help.
To answer your question I'd expect .schedule() to be called with a latency measured in single digit ms. It's normally fast, only venturing into multiple ms if it happens to collide with a GC. So I think something is very amiss in your application if you're seeing latency of one second. It's also unclear how increasing the queue length will help.
Peter Hinch
Index to my micropython libraries.
Index to my micropython libraries.
Re: micropython.schedule() queue length
Hi Peter,
thanks for this! I imagined millisecond rate queue clearance would be more realistic. I'm not sure what's happening with my code, but I get the same effect with this simplified version. Maybe it's my use of asyncio that's wrong... Anyway, I liked your encoder driver much better than mine, so I adapted it as below..
I'm using a mechanical encoder and an ESP32.
The 'debouncer' just ignores events occurring within 500ms of the last one admitted. Without it I get a lot of spurious triggerings, and I want to limit the rate anyway because I only need a few events, not thousands, and I don't need to keep track of them.
What's weird is that disabling it generates the same number of queue full exceptions, even though there are many more events being sheduled.
Typically the number of queue overflow exceptions increases the longer I test (by twiddling the encoder back and forth), but sometimes I get them before the main loop even registers an event...
This is what I get when I twiddle the knob:
thanks for this! I imagined millisecond rate queue clearance would be more realistic. I'm not sure what's happening with my code, but I get the same effect with this simplified version. Maybe it's my use of asyncio that's wrong... Anyway, I liked your encoder driver much better than mine, so I adapted it as below..
I'm using a mechanical encoder and an ESP32.
The 'debouncer' just ignores events occurring within 500ms of the last one admitted. Without it I get a lot of spurious triggerings, and I want to limit the rate anyway because I only need a few events, not thousands, and I don't need to keep track of them.
What's weird is that disabling it generates the same number of queue full exceptions, even though there are many more events being sheduled.
Typically the number of queue overflow exceptions increases the longer I test (by twiddling the encoder back and forth), but sometimes I get them before the main loop even registers an event...

Code: Select all
# encoder_portable.py
# BASED ON Copyright (c) 2017-2021 Peter Hinch
# Released under the MIT License (MIT) - see LICENSE file
from machine import Pin
import micropython
import utime
last_value_change_ms = 0
def debounce(db = 500):
global last_value_change_ms
cur_time = utime.ticks_ms()
diff = cur_time - last_value_change_ms
bounce = diff < db
if not bounce:
last_value_change_ms = utime.ticks_ms()
return bounce
class Encoder:
def __init__(self, pin_x, pin_y, handler, scale=1):
self.scale = scale
self.handler = handler
self.forward = True
self.pin_x = pin_x
self.pin_y = pin_y
self._pos = 0
try:
self.x_interrupt = pin_x.irq(trigger=Pin.IRQ_RISING | Pin.IRQ_FALLING, handler=self.x_callback, hard=True)
self.y_interrupt = pin_y.irq(trigger=Pin.IRQ_RISING | Pin.IRQ_FALLING, handler=self.y_callback, hard=True)
except TypeError:
self.x_interrupt = pin_x.irq(trigger=Pin.IRQ_RISING | Pin.IRQ_FALLING, handler=self.x_callback)
self.y_interrupt = pin_y.irq(trigger=Pin.IRQ_RISING | Pin.IRQ_FALLING, handler=self.y_callback)
def x_callback(self, pin):
if debounce():
return
self.forward = pin() ^ self.pin_y()
self._pos += 1 if self.forward else -1
micropython.schedule(self.handler, self.forward)
def y_callback(self, pin):
if debounce():
return
self.forward = self.pin_x() ^ pin() ^ 1
self._pos += 1 if self.forward else -1
micropython.schedule(self.handler, self.forward)
def position(self, value=None):
if value is not None:
self._pos = round(value / self.scale) # # Improvement provided by @IhorNehrutsa
return self._pos * self.scale
def value(self):
return self._pos
### =====================================
from machine import Pin, Timer, ADC
import uasyncio as asyncio
from ucollections import namedtuple
import micropython
import utime
class EncoderEvent(bool):
pass
def get_encoder_handler(events, flag):
def handler(fwd):
events.insert(0, EncoderEvent(fwd))
flag.set()
return handler
async def reactor():
p16 = Pin(16, Pin.IN, Pin.PULL_UP)
p17 = Pin(17, Pin.IN, Pin.PULL_UP)
events = []
flag = asyncio.ThreadSafeFlag()
encoder = Encoder(p16, p17, get_encoder_handler(events, flag))
while True:
if len(events) == 0:
await flag.wait()
while len(events):
event = events.pop()
if type(event) == EncoderEvent:
print('got Encoder: {}'.format(event))
def run():
import uasyncio as asyncio
asyncio.run(reactor())
Code: Select all
got Encoder: False
got Encoder: False
Traceback (most recent call last):
File "encoder.py", line 48, in x_callback
RuntimeError: schedule queue full
got Encoder: True
Traceback (most recent call last):
File "encoder.py", line 48, in x_callback
RuntimeError: schedule queue full
Traceback (most recent call last):
File "encoder.py", line 55, in y_callback
RuntimeError: schedule queue full
got Encoder: False
got Encoder: False
got Encoder: True
got Encoder: True
got Encoder: False
Traceback (most recent call last):
File "encoder.py", line 55, in y_callback
RuntimeError: schedule queue full
got Encoder: False
Traceback (most recent call last):
File "encoder.py", line 55, in y_callback
RuntimeError: schedule queue full
got Encoder: True
Traceback (most recent call last):
File "encoder.py", line 48, in x_callback
RuntimeError: schedule queue full
Traceback (most recent call last):
File "encoder.py", line 48, in x_callback
RuntimeError: schedule queue full
got Encoder: False
got Encoder: False
got Encoder: True
- pythoncoder
- Posts: 5956
- Joined: Fri Jul 18, 2014 8:01 am
- Location: UK
- Contact:
Re: micropython.schedule() queue length
There is no point in using micropython.schedule() on an ESP32 as the MicroPython port supports only soft IRQ's.
Your modifications to my code have me baffled. The algorithm performs debouncing automatically. I suggest you look at my async encoder class. This has been tested on ESP32.
Your modifications to my code have me baffled. The algorithm performs debouncing automatically. I suggest you look at my async encoder class. This has been tested on ESP32.
Peter Hinch
Index to my micropython libraries.
Index to my micropython libraries.
Re: micropython.schedule() queue length
Hmmm. So are the soft IRQ's called in the main thread?? I didn't realise that!
Btw, I hope you don't think I was trying to improve on your code! Just hacked it to demonstrate my approach.
Not sure how your code debounces, but maybe it's the encoder I'm using. Without a delay I get ~16 extra steps every second 'click'.... My debounce code and setup with a 200ms delay works perfectly now that I've removed schedule(), so onwards and upwards.
Thanks for putting me right on the IRQs! Does the same apply to machine.Timer?
Btw, I hope you don't think I was trying to improve on your code! Just hacked it to demonstrate my approach.

Not sure how your code debounces, but maybe it's the encoder I'm using. Without a delay I get ~16 extra steps every second 'click'.... My debounce code and setup with a 200ms delay works perfectly now that I've removed schedule(), so onwards and upwards.
Thanks for putting me right on the IRQs! Does the same apply to machine.Timer?
- pythoncoder
- Posts: 5956
- Joined: Fri Jul 18, 2014 8:01 am
- Location: UK
- Contact:
Re: micropython.schedule() queue length
ESP32 and ESP8266 support only soft IRQ's. This means that if an interrupt occurs while a GC is in progress the MicroPython service routine is delayed until the GC is complete. The ISR then runs to completion with the guarantee that a GC will not occur while it is running. This means you can do allocations in the ISR.
So I'm puzzled by your observation. It would be interesting to put a scope on the encoder signals.
[EDIT]
What you're observing seems far too deterministic to be contact bounce. I suspect something else is going on.
Debouncing is handled because every edge is accounted for. If a bounce occurs, on every edge of the bounce the count changes. But since every high going edges has a matching low going edge, the count never changes by more than one increment. That is the theory. In practice, without hardware conditioning, errors can occur if edges appear faster than the ISR can handle them. In practice with switch-based encoders, there is usually user feedback and absolute precision is not required. I've used the uasyncio version extensively with my micro-gui: in normal usage I've never spotted an error. But in carefully designed tests with an optical encoder I have seen gradual creep. For precision applications such as machine tools you need at least two stages of hardware synchronisation to a clock.Not sure how your code debounces, but maybe it's the encoder I'm using. Without a delay I get ~16 extra steps every second 'click
So I'm puzzled by your observation. It would be interesting to put a scope on the encoder signals.
[EDIT]
What you're observing seems far too deterministic to be contact bounce. I suspect something else is going on.
Peter Hinch
Index to my micropython libraries.
Index to my micropython libraries.
Re: micropython.schedule() queue length
Yes, my first driver only responded to edges on one pin, and checked the value of the other in the handler, but I can see that your driver needs the level shifts on both pins to be in sync (barring coincidences). I'm not sure what's going on, but I don't have time to investigate right now, so I'm going to go with my solution for now.pythoncoder wrote: ↑Tue Nov 02, 2021 8:29 amDebouncing is handled because every edge is accounted for. If a bounce occurs, on every edge of the bounce the count changes.
.
.
.
What you're observing seems far too deterministic to be contact bounce. I suspect something else is going on.
I've got pieces of an old trackball mouse on my desk with optical encoders, so when I get time I'm going to have a play with that. Also, I'm definitely going to try your asyncio drivers, but not right now as my stuff is working and I need to move on.
Thanks again for your help. Appreciated as always!
