Code: Select all
# Triel class to get and put pulse trains on a GPIO pin
# using PIO. The pulse duration is set or returned as
# the multiple of a basic tick, defined by the PIO clock,
# which gives a lot of flexibilty. Since the duration values
# used can be 32 bit integers, that gices a wirde range of
# duration and resolution. The maximum frequency for timing
# pulses is machine.freq()/2, for sending pulses it's
# machine.freq(). So at 125MHz MCU clock
# for timing input pulses, the resolution can be 16ns
# for a pulse range of ~100ns - ~68 seconds, for sending
# the resolution is 8 ns and the range ~60ns to ~34 seconds.
# At lower frequencies for the PIO, resolution and range
# scale accordingly.
import machine
import rp2
import time
import array
class Pulses:
def __init__(self, get_pin=None, put_pin=None, sm_freq=1_000_000):
self.get_done = False
self.sm_get_index = '0'
if get_pin is not None:
if (sm_freq * 2) > machine.freq():
raise (ValueError, "frequency too high")
self.sm_get_freq = sm_freq * 2
self.sm_get = rp2.StateMachine(int(self.sm_get_index), self.sm_get_pulses,
freq=self.sm_get_freq, jmp_pin=get_pin, in_base=get_pin)
else:
self.sm_get = None
self.put_pin = put_pin
self.put_done = False
self.sm_put_index = '4'
if put_pin is not None:
if (sm_freq) > machine.freq():
raise (ValueError, "frequency too high")
self.sm_put_freq = sm_freq
self.sm_put = rp2.StateMachine(int(self.sm_put_index), self.sm_put_pulses,
freq=self.sm_put_freq, set_base=put_pin)
else:
self.sm_put = None
@staticmethod
@rp2.asm_pio(
in_shiftdir=rp2.PIO.SHIFT_LEFT,
autopull=False,
autopush=False,
)
def sm_get_pulses():
pull() # get a value and start
mov(x, pins) # wait for a transition
label("trigger")
mov(y, pins)
jmp(x_not_y, "start")
jmp("trigger")
label("start") # got a trigger, go
in_(pins, 1) # signal the start level
push(block)
mov(y, osr) # get number of items from osr
pull() # pull max_time to the osr
label("again") # and loop for the values
jmp(y_dec, "get_pulse") # and go for another loop
jmp("end")
label("get_pulse")
jmp(pin, "high") # have a high level
mov(x, osr) # preload with the max value
label("count_low") # timing a low pulse
jmp(pin, "issue") #
jmp(x_dec, "count_low") # count cycles
# get's here if the pulse is longer than max_time
jmp("issue") # could as well jmp("end")
label("high") # timing a high pulse
mov(x, osr) # preload with the max value
label("count_high")
jmp(pin, "still_high")
jmp("issue")
label("still_high")
jmp(x_dec, "count_high") # count cycles
# get's here if the pulse is longer than max_time
# could as well go to label end
label("issue") # report the result
mov(isr, x)
push(block)
nop()
jmp("again")
label("end")
nop() # must have a statement
@staticmethod
@rp2.asm_pio(
set_init=rp2.PIO.OUT_HIGH,
autopull=False,
)
def sm_put_pulses():
pull() # get the number of pulses
mov(y, osr)
set(pindirs, 1)
pull() # get start level
mov(x, osr)
jmp(x_dec, "hi_pulse") # start with 1
# create the low pulse
label("low_pulse")
jmp(y_dec, "next_low") # finished?
jmp("end") # yes, tell mother
label("next_low") # no, the go
pull() # get the duration
mov(x, osr)
jmp(x_dec,"set_low") # Zero length?
jmp("hi_pulse") # yes, next pulse
label("set_low") # finally go
set(pins, 0)
label("low")
jmp(x_dec, "low")
# create the high pulse
label("hi_pulse")
jmp(y_dec, "next_hi") # finished ?
jmp("end") # yes, tell mother
label("next_hi") # no, then next check
pull() # get the duration
mov(x, osr)
jmp(x_dec, "set_high") # Is it zero?
jmp("low_pulse") # yes, next pulse
label("set_high") # no, now Pulse
set(pins, 1)
label("high")
jmp(x_dec, "high")
jmp("low_pulse") # now another low pulse
label("end")
irq(noblock, rel(0)) # wave finished!
def irq_finished(self, sm):
if repr(sm)[13] == self.sm_put_index: # quite hacky
self.put_done = True
else:
self.get_done = True
def get_pulses(self, buffer, timeout):
if self.sm_get is None:
raise(ValueError, "get_pulses is not enabled")
self.get_done = False
# self.sm_get.put(len(buffer)) # set number of pulses
self.sm_get.put(len(buffer) + 1) # set number of pulses
self.sm_get.put(timeout) # set the timeout
self.sm_get.active(1)
start_state = self.sm_get.get() # get the start state
self.sm_get.get(buffer) # get data
self.sm_get.active(0)
for i in range(len(buffer)): # scale the results
buffer[i] = timeout - buffer[i] + 4
return start_state
def put_pulses(self, buffer, start_level=1):
if self.sm_put is None:
raise(ValueError, "put_pulses is not enabled")
self.put_done = False
print(buffer)
# compensate handling time
for i in range(len(buffer)):
buffer[i] = max(0, buffer[i] - 5)
self.sm_put.irq(self.irq_finished)
self.sm_put.active(1)
self.sm_put.put(len(buffer)) # tell the size
self.sm_put.put(start_level != 0) # tell the start level
self.sm_put.put(buffer) # send the pulse train
while self.put_done is False: # and wait for getting is done
time.sleep_ms(1)
self.sm_put.active(0)
#
# Instantiate the class
#
pulses = Pulses(machine.Pin(10, machine.Pin.IN), machine.Pin(11, machine.Pin.OUT), sm_freq=1_000_000)
#
# two test functions
#
def get(samples=10, timeout=100_000):
global pulses
ar = array.array("I", bytearray(samples * 4))
start = pulses.get_pulses(ar, timeout)
print("Start state: ", start)
print(pulses.get_done, ar)
def put(pattern="10 20 30 40", start=1):
global pulses
v = [int(i) for i in pattern.strip().split()]
ar = array.array("I", v)
pulses.put_pulses(ar, start)