3 PIO state machines to determine minimum, maximum, count and mean of a series of pulses on a pin.
I used it to measure RPI2040 ISR latencies and reported there.
As I am not using github actively atm, I post my code here:
Code: Select all
#
# ------------ Class PulseStat for RPi2040 ----------------------
#
# Created 2022 by Raul Kompass (rkompass).
#
# License: The MIT License (MIT)
#
#
#
# Records minimum, maximum, mean of durations and count of a number of pulses defined by 1 or 2 pins.
# Employs three successive rp2 PIO state machines.
# These three state machines occupy up to 30 of the 32 available instructions, if on the same PIO block.
#
# sm_id recommended to be 0, 1, 4 or 5.
# Pins should be machine.Pin instances.
#
#
# ------------ Usage: -----------
#
# pstat = PulseStat((pinA, pinB)) # pinA triggers duration measurement start (always with positive flank)
# # pinB triggers stop (with positive flank as default for 2 different pins)
# # optional stop_pol=1 is default here
#
# pstat = PulseStat((pinA, pinB), stop_pol=0) # as above, but pinB negative flank is stop signal
#
# pstat = PulseStat(pin) # pin positive flank is start, negative flank is stop signal
# # optional stop_pol=0 is default here, stop_pol=1 not allowed
#
# pstat = PulseStat(pin, sm_id=1) # use StateMachines 1, 2 and 3 in first PIO block
#
# pstat.start() # resets state machines and starts measurements, starting flanks trigger from now on
#
# pstat.stop() # stop measurements
#
# (mi, ma, mean, n) = pstat.get_stats() # gets measurements in nanoseconds
# print('Latency min:', mi, ' max:', ma, ' mean:', mean, 'ns; ', 'n:', n) # and prints results
#
# -------- Precision ------
#
# Every 3 PIO clock-cycles the count is increased -> precision is <= 24 ns.
#
# Test results for single pulses, generated by PIO:
# pulse-duration(ns) min max mean as measured with PulseStat
# 8 0 24 0 <- differs from rule of 3
# 16 24 24 24
# 24 24 24 24
# 32 24 24 24
# 40 48 48 48
# 48 48 48 48
# 56 48 48 48
# 64 72 72 72
# 72 72 72 72
# 80 72 72 72
# 88 96 96 96
# .. .. .. ..
#
# Hardware precision: With the Pulse-durations (left column) generated on pin 7 (output mode) of RPI-Pico and
# PulseStat listening on pin 9 (input mode) for start and pin 7 for stop signal, the same counts are reported.
# So the pin hardware seems to be fast enough to put such brief signals through.
#
# -------- Limits ------
#
# min and max are limited to unsigned 32 bits (in 3*PIO_clock periods) i.e. 103,079 seconds.
# If max value exceeded 2**32 an exception is raised in .get_stats().
# mean accumulates the sum of durations in 64 bits. The overflow of 32 bits is hopefully handled
# correctly (not tested yet).
# The three state machines are not started and stopped simultaneously but one after the other.
# There is no micropython function for that.
#
# -------- ToDo ------
#
# Test 32 bit excess in mean.
# Do synchronous starting and stopping of state machines of by register tweaking.
# Add adaptation for negative starting flanks.
# Perhaps adapt for different PIO clocks.
#
from rp2 import PIO, StateMachine, asm_pio
from math import nan
class PulseStat:
def __init__(self, pin, stop_pol=None, sm_id=4):
if not isinstance(pin, (tuple, list)): # if we have a single pin, use it for start and stop
pin = (pin, pin)
two_pins = False
else:
two_pins = True
if not two_pins:
if stop_pol:
raise ValueError('single start/stop pin requires negative stop polarity')
else:
if stop_pol is None: # for two pins positive stop polarity is the default
stop_pol = 1
if stop_pol: # Note: We have always a positive start signal. Changing that requires changing polarities of the wait(x, pin, 0) instructions. Programmatically??
self.sm_pulse_min = self.sm_pulse_min_ps # positive stop signal
self.sm_pulse_max = self.sm_pulse_max_ps
self.sm_pulse_mean = self.sm_pulse_mean_ps
else:
self.sm_pulse_min = self.sm_pulse_min_ns # negative stop signal
self.sm_pulse_max = self.sm_pulse_max_ns
self.sm_pulse_mean = self.sm_pulse_mean_ns
self.sm_min = StateMachine(sm_id, self.sm_pulse_min, freq=125_000_000, in_base=pin[0], jmp_pin=pin[1])
self.sm_max = StateMachine(sm_id+1, self.sm_pulse_max, freq=125_000_000, in_base=pin[0], jmp_pin=pin[1])
self.sm_mean = StateMachine(sm_id+2, self.sm_pulse_mean, freq=125_000_000, in_base=pin[0], jmp_pin=pin[1])
self.sm_min.restart() # we do not start the StateMachine yet
self.sm_max.restart()
self.sm_mean.restart()
for _ in range(self.sm_min.rx_fifo()): # but we clear the receive FIFOs
self.sm_min.get()
for _ in range(self.sm_max.rx_fifo()):
self.sm_max.get()
for _ in range(self.sm_mean.rx_fifo()):
self.sm_mean.get()
@staticmethod
@asm_pio()
def sm_pulse_min_ps(): # 8 instructions, minimal duration is accumulated in X; version for positive stop signal
# set(x, 0) # in prep. function; X: min of durations; at start max value 0xffffffff inverted
wait(0, pin, 0) # in case start pin is still high, esp. after stop signal, wait for low
set(y, 0)
mov(y, invert(y)) # prepare Y for upcounting by counting ~Y down
wait(1, pin, 0) # now being low, wait for high, which starts the measurement
label('loop')
jmp(pin, 'update') # if pin is low continue counting Y down, else (stop signal) update X
jmp(y_dec, 'here') # increment ~Y, continue first loop
label('here')
jmp(x_not_y, 'loop') # we did not reach the old minimum yet, continue increasing Y
label('update')
mov(x, y) # stop signal: duration was less then present X (mininum): update X
@staticmethod
@asm_pio()
def sm_pulse_min_ns(): # 9 instructions, minimal duration is accumulated in X; version for negative stop signal
# set(x, 0) # in prep. function; X: min of durations; at start max value 0xffffffff inverted
label('start')
wait(0, pin, 0) # in case start pin is still high, esp. after stop signal, wait for low
set(y, 0)
mov(y, invert(y)) # prepare Y for upcounting by counting ~Y down
wait(1, pin, 0) # now being low, wait for high, which starts the measurement
label('loop')
jmp(pin, 'inner') # in case pin is high, continue counting Y down
mov(x, y) # pin is low, latency was less then present X (mininum) update X
jmp('start')
label('inner')
jmp(y_dec, 'here') # increment ~Y, continue first loop
label('here')
jmp(x_not_y, 'loop') # we did not reach the old minimum yet, continue increasing Y
@staticmethod
@asm_pio()
def sm_pulse_max_ps(): # 9 instructions (7 ign. overflow); max. duration is accumulated in X; version for positive stop signal
# set(x, 0) # in setup now
# mov(x, invert(x)) # X = 0 in inverted form
label('start')
wait(0, pin, 0) # in case start pin is still high, esp. after stop signal, wait for low
mov(y, invert(x)) # Y = ~X, to be later decremented down to 0
wait(1, pin, 0) # now being low, wait for high, which starts the measurement
label('loop1')
jmp(pin, 'start') [1] # if stop pin high, duration was less then present max: nothing to do, wait for next pulse
jmp(y_dec, 'loop1') # stop pin was still low, cont. incrementing ~Y, cont. first loop
label('loop2') # Y==0: our duration is as long now as stored maximum
jmp(pin, 'start') [1] # if stop signal: thats it: wait for next pulse
jmp(x_dec, 'loop2') # no stop: we count max duration up by decrementing ~X, loop then
mov(isr, x) # max duration > 0xffffffff
push(block) # we send this value blockingly to FIFO
@staticmethod
@asm_pio()
def sm_pulse_max_ns(): # 11 instructions (9 ignoring overflow); maximal duration is accumulated in X; version for negative stop signal
# set(x, 0) # in setup now,
# mov(x, invert(x)) # X = 0 in inverted form
label('start')
wait(0, pin, 0) # in case start pin is still high, esp. after stop signal, wait for low
mov(y, invert(x)) # Y = ~X, to be later decremented down to 0
wait(1, pin, 0) # now being low, wait for high, which starts the measurement
label('loop1')
jmp(pin, 'inner1') [1] # in case pin is high, continue counting Y down
jmp('start') # pin is low, latency was less then present max: nothing to do, wait for next pulse
label('inner1')
jmp(y_dec, 'loop1') # decrement Y, continue first loop
label('loop2') # Y==0: our pulse was high now for past maximum duration,
jmp(pin, 'inner2') [1] # in case pin is still high, we count max duration up
jmp('start') # pulse is over: wait for next pulse
label('inner2')
jmp(x_dec, 'loop2') # count max duration up by counting ~X down
mov(isr, x) # max duration >= 0xffffffff
push(block) # we send this value blockingly to FIFO
@staticmethod
@asm_pio()
def sm_pulse_mean_ps(): # 10 instructions; sum of durations is accumulated in X and OSR, count in Y; version for positive stop signal
# set(x, 0) # in setup now, OSR, X sum of durations
# mov(x, invert(x)) #
# mov(osr, x) # X, OSR and Y in inverted form set to 0
# mov(y, x) # count
# mov(isr, x) # value for X reset
label('start')
wait(0, pin, 0) # in case start pin is still high, esp. after stop signal, wait for low
wait(1, pin, 0) # now being low, wait for high, which starts the measurement
label('loop')
jmp(pin, 'over') [1] # stop signal? duration over
jmp(x_dec, 'loop') # not over: increment ~X, cont. loop
mov(x, osr) # ~X=0 i.e. X = 0xffffffff = (1<<32)-1, we increment OSR now
jmp(x_dec, 'here')
label('here')
mov(osr, x) # !! every increment of OSR consumes 5 additional PIO instructions !!
mov(x, isr) # we have to multiply OSR value by ((1<<32)-1) *24+5*8 = (1<<32)*24+2*8
jmp('loop')
label('over')
jmp(y_dec, 'start') # duration over: increment inverted Y
@staticmethod
@asm_pio()
def sm_pulse_mean_ns(): # 10 instructions; sum of durations is accumulated in X and OSR, count in Y; version for negative stop signal
# set(x, 0) # in setup now, OSR, X sum of durations
# mov(x, invert(x)) #
# mov(osr, x) # X, OSR and Y in inverted form set to 0
# mov(y, x) # count
# mov(isr, x) # value fo X reset
label('start')
wait(0, pin, 0) # in case start pin is still high, esp. after stop signal, wait for low
wait(1, pin, 0) # now being low, wait for high, which starts the measurement
label('loop')
jmp(pin, 'inner') [1] # in case pin is high, continue counting Y down
jmp(y_dec, 'start') # pin is low, pulse over: increment inverted Y
label('inner')
jmp(x_dec, 'loop') # increment inverted X
mov(x, osr) # ~X=0 i.e. X = 0xffffffff = (1<<32)-1, we increment OSR now
jmp(x_dec, 'here')
label('here')
mov(osr, x) # !! every increment of OSR consumes 5 additional PIO instructions !!
mov(x, isr) # we have to multiply OSR value by ((1<<32)-1) *24+5*8 = (1<<32)*24+2*8
jmp('loop')
def start(self):
self.sm_min.active(0)
self.sm_max.active(0)
self.sm_mean.active(0)
self.sm_min.restart() # clear StateMachine internals, goto first instruction
self.sm_max.restart()
self.sm_mean.restart()
for _ in range(self.sm_min.rx_fifo()): # clear FIFOs
self.sm_min.get()
for _ in range(self.sm_max.rx_fifo()):
self.sm_max.get()
for _ in range(self.sm_mean.rx_fifo()):
self.sm_max.get()
self.sm_min.exec("set(x, 0)") # X: minimum of durations; at start max possible value 0xffffffff in inverted form
self.sm_max.exec("set(x, 0)") # X: maximum of durations
self.sm_max.exec("mov(x, invert(x))") # to be counted up by counting ~X down
self.sm_mean.exec("set(x, 0)") # OSR, X sum of durations
self.sm_mean.exec("mov(x, invert(x))") # to be counted up by counting ~OSR, ~X down
self.sm_mean.exec("mov(osr, x)") # higher value of sum of durations
self.sm_mean.exec("mov(y, x)") # pulse count, to be counted up by counting ~Y down
self.sm_mean.exec("mov(isr, x)") # value to reset X from
self.sm_min.active(1) # start StateMachines
self.sm_max.active(1)
self.sm_mean.active(1)
def stop(self):
self.sm_min.active(0)
self.sm_max.active(0)
self.sm_mean.active(0)
def get_stats(self): # also stops the StateMachine
self.sm_min.active(0)
self.sm_max.active(0)
self.sm_mean.active(0)
if self.sm_max.rx_fifo() > 0:
raise ValueError('sm_pulse_max: max duration {} exceeded'.format (self.sm_max.get()))
self.sm_min.exec("mov(isr, invert(x))")
self.sm_min.exec("push(noblock)")
self.sm_max.exec("mov(isr, invert(x))")
self.sm_max.exec("push(noblock)")
self.sm_mean.exec("mov(isr, invert(y))") # count: n; every count contributes 8ns to dsum
self.sm_mean.exec("push(noblock)")
self.sm_mean.exec("mov(isr, invert(x))") # lower part of dsum
self.sm_mean.exec("push(noblock)")
self.sm_mean.exec("mov(isr, invert(osr))") # upper part of dsum, to be multiplied by (1<<32)*24+2*8, see above
self.sm_mean.exec("push(noblock)")
n = self.sm_mean.get()
dsum = self.sm_mean.get()*24 + self.sm_mean.get()*0x1800000010
mean = dsum/n if n > 0 else nan
return self.sm_min.get()*24, self.sm_max.get()*24+24, mean, n