PIO program to determine minimum, maximum and mean of pulses

RP2040 based microcontroller boards running MicroPython.
Target audience: MicroPython users with an RP2040 boards.
This does not include conventional Linux-based Raspberry Pi boards.
Post Reply
rkompass
Posts: 66
Joined: Fri Sep 17, 2021 8:25 pm

PIO program to determine minimum, maximum and mean of pulses

Post by rkompass » Mon Jul 25, 2022 11:39 am

For the problem reported in viewtopic.php?f=6&t=12721 I wrote a class that employs
3 PIO state machines to determine minimum, maximum, count and mean of a series of pulses on a pin.
I used it to measure RPI2040 ISR latencies and reported there.
As I am not using github actively atm, I post my code here:

Code: Select all

#
#  ------------ Class PulseStat for RPi2040 ----------------------
#
#    Created 2022 by Raul Kompass  (rkompass).
#
#    License: The MIT License (MIT)
#
#
#
# Records minimum, maximum, mean of durations and count of a number of pulses defined by 1 or 2 pins.
# Employs three successive rp2 PIO state machines.
# These three state machines occupy up to 30 of the 32 available instructions, if on the same PIO block.
# 
# sm_id recommended to be 0, 1, 4 or 5.
# Pins should be machine.Pin instances.
#
# 
#  ------------  Usage:  -----------
#  
# pstat = PulseStat((pinA, pinB))  # pinA triggers duration measurement start (always with positive flank)
#                                  # pinB triggers stop (with positive flank as default for 2 different pins)
#                                  # optional stop_pol=1 is default here
#
# pstat = PulseStat((pinA, pinB), stop_pol=0)  # as above, but pinB negative flank is stop signal
#
# pstat = PulseStat(pin)           # pin positive flank is start, negative flank is stop signal
#                                  # optional stop_pol=0 is default here, stop_pol=1 not allowed
#
# pstat = PulseStat(pin, sm_id=1)  # use StateMachines 1, 2 and 3 in first PIO block
#
# pstat.start()                    # resets state machines and starts measurements, starting flanks trigger from now on
#
# pstat.stop()                     # stop measurements
# 
# (mi, ma, mean, n) = pstat.get_stats()                                     # gets measurements in nanoseconds
# print('Latency min:', mi, ' max:', ma, ' mean:', mean, 'ns; ', 'n:', n)   # and prints results
#
#    --------  Precision  ------
#
#  Every 3 PIO clock-cycles the count is increased -> precision is <= 24 ns.
#
#  Test results for single pulses, generated by PIO:
#  pulse-duration(ns)    min      max     mean    as measured with PulseStat
#          8              0       24       0     <- differs from rule of 3
#         16             24       24      24
#         24             24       24      24
#         32             24       24      24
#         40             48       48      48
#         48             48       48      48
#         56             48       48      48
#         64             72       72      72
#         72             72       72      72
#         80             72       72      72
#         88             96       96      96
#         ..             ..       ..      ..
#
#  Hardware precision: With the Pulse-durations (left column) generated on pin 7 (output mode) of RPI-Pico and
#     PulseStat listening on pin 9 (input mode) for start and pin 7 for stop signal, the same counts are reported.
#     So the pin hardware seems to be fast enough to put such brief signals through. 
#
#    --------  Limits  ------
#
#  min and max are limited to unsigned 32 bits (in 3*PIO_clock periods) i.e. 103,079 seconds.
#  If max value exceeded 2**32 an exception is raised in .get_stats().
#  mean accumulates the sum of durations in 64 bits. The overflow of 32 bits is hopefully handled
#     correctly (not tested yet).
#  The three state machines are not started and stopped simultaneously but one after the other.
#  There is no micropython function for that.
#
#    --------  ToDo  ------
#
#  Test 32 bit excess in mean.
#  Do synchronous starting and stopping of state machines of by register tweaking.
#  Add adaptation for negative starting flanks.
#  Perhaps adapt for different PIO clocks.
#

from rp2 import PIO, StateMachine, asm_pio
from math import nan

class PulseStat:

    def __init__(self, pin, stop_pol=None, sm_id=4):
        
        if not isinstance(pin, (tuple, list)):     # if we have a single pin, use it for start and stop
            pin = (pin, pin)
            two_pins = False
        else:
            two_pins = True
        if not two_pins:
            if stop_pol:
                raise ValueError('single start/stop pin requires negative stop polarity')
        else:
            if stop_pol is None:                   # for two pins positive stop polarity is the default
                stop_pol = 1

        if stop_pol:      # Note: We have always a positive start signal. Changing that requires changing polarities of the wait(x, pin, 0) instructions. Programmatically??
            self.sm_pulse_min  = self.sm_pulse_min_ps   # positive stop signal
            self.sm_pulse_max  = self.sm_pulse_max_ps
            self.sm_pulse_mean = self.sm_pulse_mean_ps
        else:
            self.sm_pulse_min  = self.sm_pulse_min_ns   # negative stop signal
            self.sm_pulse_max  = self.sm_pulse_max_ns
            self.sm_pulse_mean = self.sm_pulse_mean_ns
            
        self.sm_min  = StateMachine(sm_id,   self.sm_pulse_min,  freq=125_000_000, in_base=pin[0], jmp_pin=pin[1])
        self.sm_max  = StateMachine(sm_id+1, self.sm_pulse_max,  freq=125_000_000, in_base=pin[0], jmp_pin=pin[1])
        self.sm_mean = StateMachine(sm_id+2, self.sm_pulse_mean, freq=125_000_000, in_base=pin[0], jmp_pin=pin[1])
        self.sm_min.restart()                      # we do not start the StateMachine yet
        self.sm_max.restart()
        self.sm_mean.restart()
        for _ in range(self.sm_min.rx_fifo()):     # but we clear the receive FIFOs
            self.sm_min.get()
        for _ in range(self.sm_max.rx_fifo()):
            self.sm_max.get()
        for _ in range(self.sm_mean.rx_fifo()):
            self.sm_mean.get()

    @staticmethod
    @asm_pio()
    def sm_pulse_min_ps():    # 8 instructions, minimal duration is accumulated in X; version for positive stop signal
        # set(x, 0)      # in prep. function; X: min of durations; at start max value 0xffffffff inverted
        wait(0, pin, 0)       # in case start pin is still high, esp. after stop signal, wait for low
        set(y, 0)
        mov(y, invert(y))     # prepare Y for upcounting by counting ~Y down
        wait(1, pin, 0)       # now being low, wait for high, which starts the measurement
        label('loop')        
        jmp(pin, 'update')    # if pin is low continue counting Y down, else (stop signal) update X
        jmp(y_dec, 'here')    # increment ~Y, continue first loop
        label('here')
        jmp(x_not_y, 'loop')  # we did not reach the old minimum yet, continue increasing Y
        label('update')
        mov(x, y)             # stop signal: duration was less then present X (mininum): update X

    @staticmethod
    @asm_pio()
    def sm_pulse_min_ns():    # 9 instructions, minimal duration is accumulated in X; version for negative stop signal
        # set(x, 0)      # in prep. function; X: min of durations; at start max value 0xffffffff inverted
        label('start')
        wait(0, pin, 0)       # in case start pin is still high, esp. after stop signal, wait for low
        set(y, 0)
        mov(y, invert(y))     # prepare Y for upcounting by counting ~Y down
        wait(1, pin, 0)       # now being low, wait for high, which starts the measurement
        label('loop')        
        jmp(pin, 'inner')     # in case pin is high, continue counting Y down
        mov(x, y)             # pin is low, latency was less then present X (mininum) update X
        jmp('start')
        label('inner')
        jmp(y_dec, 'here')    # increment ~Y, continue first loop
        label('here')
        jmp(x_not_y, 'loop')  # we did not reach the old minimum yet, continue increasing Y

    @staticmethod
    @asm_pio()
    def sm_pulse_max_ps():    # 9 instructions (7 ign. overflow); max. duration is accumulated in X; version for positive stop signal
        # set(x, 0)           # in setup now
        # mov(x, invert(x))   # X = 0 in inverted form
        label('start')
        wait(0, pin, 0)       # in case start pin is still high, esp. after stop signal, wait for low
        mov(y, invert(x))     # Y = ~X, to be later decremented down to 0
        wait(1, pin, 0)       # now being low, wait for high, which starts the measurement
        label('loop1')
        jmp(pin, 'start') [1] # if stop pin high, duration was less then present max: nothing to do, wait for next pulse
        jmp(y_dec, 'loop1')   # stop pin was still low, cont. incrementing ~Y, cont. first loop
        label('loop2')        # Y==0: our duration is as long now as stored maximum 
        jmp(pin, 'start') [1] # if stop signal: thats it: wait for next pulse
        jmp(x_dec, 'loop2')   # no stop: we count max duration up by decrementing ~X, loop then
        mov(isr, x)           # max duration > 0xffffffff
        push(block)           # we send this value blockingly to FIFO

    @staticmethod
    @asm_pio()
    def sm_pulse_max_ns():    # 11 instructions (9 ignoring overflow); maximal duration is accumulated in X; version for negative stop signal
        # set(x, 0)           # in setup now, 
        # mov(x, invert(x))   # X = 0 in inverted form
        label('start')
        wait(0, pin, 0)       # in case start pin is still high, esp. after stop signal, wait for low
        mov(y, invert(x))     # Y = ~X, to be later decremented down to 0
        wait(1, pin, 0)       # now being low, wait for high, which starts the measurement
        label('loop1')
        jmp(pin, 'inner1') [1]   # in case pin is high, continue counting Y down
        jmp('start')          # pin is low, latency was less then present max: nothing to do, wait for next pulse
        label('inner1')
        jmp(y_dec, 'loop1')   # decrement Y, continue first loop
        label('loop2')        # Y==0: our pulse was high now for past maximum duration, 
        jmp(pin, 'inner2') [1]   # in case pin is still high, we count max duration up
        jmp('start')          # pulse is over: wait for next pulse
        label('inner2')
        jmp(x_dec, 'loop2')   # count max duration up by counting ~X down
        mov(isr, x)           # max duration >= 0xffffffff
        push(block)           # we send this value blockingly to FIFO

    @staticmethod
    @asm_pio()
    def sm_pulse_mean_ps():   # 10 instructions; sum of durations is accumulated in X and OSR, count in Y; version for positive stop signal
        # set(x, 0)           # in setup now,  OSR, X sum of durations
        # mov(x, invert(x))   #  
        # mov(osr, x)         # X, OSR and Y in inverted form set to 0
        # mov(y, x)           # count
        # mov(isr, x)         # value for X reset
        label('start')
        wait(0, pin, 0)       # in case start pin is still high, esp. after stop signal, wait for low
        wait(1, pin, 0)       # now being low, wait for high, which starts the measurement
        label('loop')
        jmp(pin, 'over') [1]  # stop signal? duration over        
        jmp(x_dec, 'loop')    # not over: increment ~X, cont. loop
        mov(x, osr)           # ~X=0 i.e. X = 0xffffffff = (1<<32)-1, we increment OSR now
        jmp(x_dec, 'here')
        label('here')
        mov(osr, x)           # !! every increment of OSR consumes 5 additional PIO instructions !!
        mov(x, isr)           # we have to multiply OSR value by ((1<<32)-1) *24+5*8 = (1<<32)*24+2*8
        jmp('loop')
        label('over')
        jmp(y_dec, 'start')   # duration over: increment inverted Y

    @staticmethod
    @asm_pio()
    def sm_pulse_mean_ns():   # 10 instructions; sum of durations is accumulated in X and OSR, count in Y; version for negative stop signal
        # set(x, 0)           # in setup now,  OSR, X sum of durations
        # mov(x, invert(x))   #  
        # mov(osr, x)         # X, OSR and Y in inverted form set to 0
        # mov(y, x)           # count
        # mov(isr, x)         # value fo X reset
        label('start')
        wait(0, pin, 0)       # in case start pin is still high, esp. after stop signal, wait for low
        wait(1, pin, 0)       # now being low, wait for high, which starts the measurement
        label('loop')
        jmp(pin, 'inner') [1] # in case pin is high, continue counting Y down
        jmp(y_dec, 'start')   # pin is low, pulse over: increment inverted Y
        label('inner')
        jmp(x_dec, 'loop')    # increment inverted X
        mov(x, osr)           #  ~X=0 i.e. X = 0xffffffff = (1<<32)-1, we increment OSR now
        jmp(x_dec, 'here')
        label('here')
        mov(osr, x)           # !! every increment of OSR consumes 5 additional PIO instructions !!
        mov(x, isr)           # we have to multiply OSR value by ((1<<32)-1) *24+5*8 = (1<<32)*24+2*8
        jmp('loop')

    def start(self):
        self.sm_min.active(0)
        self.sm_max.active(0)
        self.sm_mean.active(0)
        self.sm_min.restart()                    # clear StateMachine internals, goto first instruction
        self.sm_max.restart()
        self.sm_mean.restart()
        for _ in range(self.sm_min.rx_fifo()):   # clear FIFOs
            self.sm_min.get()
        for _ in range(self.sm_max.rx_fifo()):
            self.sm_max.get()
        for _ in range(self.sm_mean.rx_fifo()):
            self.sm_max.get()
        self.sm_min.exec("set(x, 0)")            # X: minimum of durations; at start max possible value 0xffffffff in inverted form
        self.sm_max.exec("set(x, 0)")            # X: maximum of durations
        self.sm_max.exec("mov(x, invert(x))")    #    to be counted up by counting ~X down
        self.sm_mean.exec("set(x, 0)")           #  OSR, X sum of durations
        self.sm_mean.exec("mov(x, invert(x))")   #      to be counted up by counting ~OSR, ~X down
        self.sm_mean.exec("mov(osr, x)")         # higher value of sum of durations
        self.sm_mean.exec("mov(y, x)")           # pulse count, to be counted up by counting ~Y down
        self.sm_mean.exec("mov(isr, x)")         # value to reset X from
        self.sm_min.active(1)                    #  start StateMachines
        self.sm_max.active(1)
        self.sm_mean.active(1)

    def stop(self):
        self.sm_min.active(0)
        self.sm_max.active(0)
        self.sm_mean.active(0)

    def get_stats(self):                         # also stops the StateMachine
        self.sm_min.active(0)
        self.sm_max.active(0)
        self.sm_mean.active(0)
        if self.sm_max.rx_fifo() > 0:
            raise ValueError('sm_pulse_max: max duration {} exceeded'.format (self.sm_max.get()))            
        self.sm_min.exec("mov(isr, invert(x))")
        self.sm_min.exec("push(noblock)")
        self.sm_max.exec("mov(isr, invert(x))")
        self.sm_max.exec("push(noblock)")
        self.sm_mean.exec("mov(isr, invert(y))")   # count: n; every count contributes 8ns to dsum
        self.sm_mean.exec("push(noblock)")
        self.sm_mean.exec("mov(isr, invert(x))")   # lower part of dsum
        self.sm_mean.exec("push(noblock)")
        self.sm_mean.exec("mov(isr, invert(osr))") # upper part of dsum, to be multiplied by (1<<32)*24+2*8, see above
        self.sm_mean.exec("push(noblock)")
        n = self.sm_mean.get()
        dsum =  self.sm_mean.get()*24 + self.sm_mean.get()*0x1800000010
        mean = dsum/n if n > 0 else nan
        return self.sm_min.get()*24, self.sm_max.get()*24+24, mean, n

Post Reply