Page 1 of 1

FIFO buffer for interrupt timestamps

Posted: Sun Feb 14, 2021 5:09 pm
by Tinus
Hi there,

I just posted a question on the rpi microPython forum but have since found the answer.
Apparently ucollections doesn't support deque on the rpi pico.

Can anybody tell me how to create a fifo buffer that I can fill with interrupt timestamps?

I am trying to hook my pi up to an ergometer to calculate speed, work, distance etc.
The machine has a rpm sensor and I am trying to read the time difference between pulses.

My thinking was that to keep the interrupt handler as fast as possible so I would store the timestamp in a buffer and read that buffer in a slow loop to update the display.
This whole programming without documentation thing is quite challenging.

Re: FIFO buffer for interrupt timestamps

Posted: Sun Feb 14, 2021 5:41 pm
by Roberthh
You can implement it with a circular buffer, which is either global, or a class member in a class which also contains the irq response function.
Circular buffers are easy to implement and a common design. You'll find a lot about that in the Internet.

Re: FIFO buffer for interrupt timestamps

Posted: Mon Feb 15, 2021 1:35 pm
by Tinus
Thank you very much.

This is my first project that has wires coming out of it and I keep running into problems because I know what I want but I just don't know the words for it.

Circular buffer sounds exactly right :)

Re: FIFO buffer for interrupt timestamps

Posted: Mon Feb 15, 2021 1:41 pm
by pythoncoder

Re: FIFO buffer for interrupt timestamps

Posted: Mon Feb 15, 2021 6:04 pm
by Tinus
I think I have successfully implemented a ring-buffer.
Thank you for the help.

Code: Select all

import machine
import utime
import array

class MySensor:
  sensorGPIOPin = 4
  bounce = 5000
  pulse = 0
  myRB = array.array('i',[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0])
  rb_len = 0
  w_cursor = 0
  r_cursor = 0
  
  sensor = None
  oldTime = 0

  def init_GPIO(self):          # initialize GPIO
    self.sensor = machine.Pin(self.sensorGPIOPin, machine.Pin.IN, machine.Pin.PULL_DOWN)

  def init_interrupt(self):
    self.oldTime = utime.ticks_us()
    self.sensor.irq(trigger=machine.Pin.IRQ_FALLING, handler=self.mark_time)

  def mark_time(self,channel):        # callback function
    self.pulse+=1                # increase pulse by 1 whenever interrupt occurred
    newTime = utime.ticks_us()
    dT = utime.ticks_diff(newTime,self.oldTime)
    if dT > self.bounce:
        self.write(dT)
        self.oldTime = newTime    


  def write(self,time):
    self.myRB[self.w_cursor] = time
    self.w_cursor+=1
    if self.w_cursor >= self.rb_len:
      self.w_cursor = 0
  
  def read(self):
    if self.r_cursor != self.w_cursor:
      val = self.myRB[self.r_cursor]
      self.r_cursor += 1
      if self.r_cursor >= self.rb_len:
        self.r_cursor = 0
    else:
      val = False
    return val
    
  def __init__(self, pinNr):
    self.rb_len = len(self.myRB)
    self.sensorGPIOPin = pinNr
    self.init_GPIO()
    self.init_interrupt()