get_pulses and put_pulses

RP2040 based microcontroller boards running MicroPython.
Target audience: MicroPython users with an RP2040 boards.
This does not include conventional Linux-based Raspberry Pi boards.
Post Reply
User avatar
Roberthh
Posts: 3667
Joined: Sat May 09, 2015 4:13 pm
Location: Rhineland, Europe

get_pulses and put_pulses

Post by Roberthh » Wed Mar 03, 2021 9:05 pm

Just as another PIO exercise I sketched a class which allows to time a pulse sequence on a GPIO pin and to send a pulse sequence. The resolution is given by the PIO frequency, the pulse range span is related to the 32 bit PIO registers. At the moment there is a bug in the Pico software so the set-up for getting pulses seems a little bit strange. You have to request one pulse more than you will get.

Code: Select all

# Triel class to get and put pulse trains on a GPIO pin
# using PIO. The pulse duration is set or returned as
# the multiple of a basic tick, defined by the PIO clock,
# which gives a lot of flexibilty. Since the duration values
# used can be 32 bit integers, that gices a wirde range of
# duration and resolution. The maximum frequency for timing
# pulses is machine.freq()/2, for sending pulses it's
# machine.freq(). So at 125MHz MCU clock
# for timing input pulses, the resolution can be 16ns
# for a pulse range of ~100ns - ~68 seconds, for sending
# the resolution is 8 ns and the range ~60ns to ~34 seconds.
# At lower frequencies for the PIO, resolution and range
# scale accordingly.

import machine
import rp2
import time
import array


class Pulses:
    def __init__(self, get_pin=None, put_pin=None, sm_freq=1_000_000):
        self.get_done = False
        self.sm_get_index = '0'
        if get_pin is not None:
            if (sm_freq * 2) > machine.freq():
                raise (ValueError, "frequency too high")
            self.sm_get_freq = sm_freq * 2
            self.sm_get = rp2.StateMachine(int(self.sm_get_index), self.sm_get_pulses,
                freq=self.sm_get_freq, jmp_pin=get_pin, in_base=get_pin)
        else:
            self.sm_get = None

        self.put_pin = put_pin
        self.put_done = False
        self.sm_put_index = '4'
        if put_pin is not None:
            if (sm_freq) > machine.freq():
                raise (ValueError, "frequency too high")
            self.sm_put_freq = sm_freq
            self.sm_put = rp2.StateMachine(int(self.sm_put_index), self.sm_put_pulses,
                freq=self.sm_put_freq, set_base=put_pin)
        else:
            self.sm_put = None

    @staticmethod
    @rp2.asm_pio(
        in_shiftdir=rp2.PIO.SHIFT_LEFT,
        autopull=False,
        autopush=False,
    )
    def sm_get_pulses():
        pull()                      # get a value and start
        mov(x, pins)                # wait for a transition

        label("trigger")
        mov(y, pins)
        jmp(x_not_y, "start")
        jmp("trigger")

        label("start")              # got a trigger, go
        in_(pins, 1)                # signal the start level
        push(block)
        mov(y, osr)                 # get number of items from osr
        pull()                      # pull max_time to the osr

        label("again")              # and loop for the values
        jmp(y_dec, "get_pulse")     # and go for another loop
        jmp("end")

        label("get_pulse")
        jmp(pin, "high")            # have a high level

        mov(x, osr)                 # preload with the max value

        label("count_low")          # timing a low pulse
        jmp(pin, "issue")           #
        jmp(x_dec, "count_low")     # count cycles
        # get's here if the pulse is longer than max_time

        jmp("issue")                # could as well jmp("end")

        label("high")               # timing a high pulse
        mov(x, osr)                 # preload with the max value

        label("count_high")
        jmp(pin, "still_high")
        jmp("issue")

        label("still_high")
        jmp(x_dec, "count_high")     # count cycles
        # get's here if the pulse is longer than max_time
        # could as well go to label end

        label("issue")               # report the result
        mov(isr, x)
        push(block)
        nop()
        jmp("again")

        label("end")
        nop()                       # must have a statement

    @staticmethod
    @rp2.asm_pio(
        set_init=rp2.PIO.OUT_HIGH,
        autopull=False,
    )
    def sm_put_pulses():
        pull()                  # get the number of pulses
        mov(y, osr)
        set(pindirs, 1)
        pull()                  # get start level
        mov(x, osr)
        jmp(x_dec, "hi_pulse")  # start with 1

# create the low pulse
        label("low_pulse")
        jmp(y_dec, "next_low")  # finished?
        jmp("end")              # yes, tell mother

        label("next_low")       # no, the go
        pull()                  # get the duration
        mov(x, osr)
        jmp(x_dec,"set_low")    # Zero length?
        jmp("hi_pulse")         # yes, next pulse

        label("set_low")        # finally go
        set(pins, 0)
        label("low")
        jmp(x_dec, "low")

# create the high pulse
        label("hi_pulse")
        jmp(y_dec, "next_hi")   # finished ?
        jmp("end")              # yes, tell mother

        label("next_hi")        # no, then next check
        pull()                  # get the duration
        mov(x, osr)
        jmp(x_dec, "set_high")  # Is it zero?
        jmp("low_pulse")        # yes, next pulse

        label("set_high")       # no, now Pulse
        set(pins, 1)
        label("high")
        jmp(x_dec, "high")
        jmp("low_pulse")        # now another low pulse

        label("end")
        irq(noblock, rel(0))    # wave finished!

    def irq_finished(self, sm):
        if repr(sm)[13] == self.sm_put_index:  # quite hacky
            self.put_done = True
        else:
            self.get_done = True

    def get_pulses(self, buffer, timeout):
        if self.sm_get is None:
            raise(ValueError, "get_pulses is not enabled")
        self.get_done = False
        # self.sm_get.put(len(buffer))  # set number of pulses
        self.sm_get.put(len(buffer) + 1)  # set number of pulses
        self.sm_get.put(timeout)  # set the timeout

        self.sm_get.active(1)
        start_state = self.sm_get.get()  # get the start state
        self.sm_get.get(buffer)  # get data
        self.sm_get.active(0)
        for i in range(len(buffer)):  # scale the results
            buffer[i] = timeout - buffer[i] + 4
        return start_state

    def put_pulses(self, buffer, start_level=1):
        if self.sm_put is None:
            raise(ValueError, "put_pulses is not enabled")
        self.put_done = False
        print(buffer)
        # compensate handling time
        for i in range(len(buffer)):
            buffer[i] = max(0, buffer[i] - 5)
        self.sm_put.irq(self.irq_finished)
        self.sm_put.active(1)

        self.sm_put.put(len(buffer))   # tell the size
        self.sm_put.put(start_level != 0) # tell the start level
        self.sm_put.put(buffer)        # send the pulse train
        while self.put_done is False:  # and wait for getting is done
            time.sleep_ms(1)

        self.sm_put.active(0)


#
# Instantiate the class
#

pulses = Pulses(machine.Pin(10, machine.Pin.IN), machine.Pin(11, machine.Pin.OUT), sm_freq=1_000_000)

#
# two test functions
#
def get(samples=10, timeout=100_000):
    global pulses
    ar = array.array("I", bytearray(samples * 4))
    start = pulses.get_pulses(ar, timeout)
    print("Start state: ", start)
    print(pulses.get_done, ar)

def put(pattern="10 20 30 40", start=1):
    global pulses
    v = [int(i) for i in pattern.strip().split()]
    ar = array.array("I", v)
    pulses.put_pulses(ar, start)


User avatar
Roberthh
Posts: 3667
Joined: Sat May 09, 2015 4:13 pm
Location: Rhineland, Europe

Re: get_pulses and put_pulses

Post by Roberthh » Thu Mar 04, 2021 2:47 pm

I have cut a few rough edges, added a description and put it on Github: https://github.com/robert-hh/RP2040-Exa ... ter/pulses

User avatar
pythoncoder
Posts: 5956
Joined: Fri Jul 18, 2014 8:01 am
Location: UK
Contact:

Re: get_pulses and put_pulses

Post by pythoncoder » Fri Mar 05, 2021 10:31 am

Very impressive, a great lesson in PIO coding.

I'm interested in this line and likewise in the opposite direction. Is this technique of linking the FIFO to a buffer documented?
Peter Hinch
Index to my micropython libraries.

User avatar
Roberthh
Posts: 3667
Joined: Sat May 09, 2015 4:13 pm
Location: Rhineland, Europe

Re: get_pulses and put_pulses

Post by Roberthh » Fri Mar 05, 2021 12:03 pm

The answer about the documentation is obvious, since it does not exists. I got that information from reading the code in rp2_pio.c. The respective functions have a comment line above which tell a little bot about the usage. Like this one and this one. Further code reading reveals more details, like the count bug in sm.get(). See my PR #6982.
When using with a buffer or array, it currently expects the state machine to push one item more than the length of the buffer, even if that is not stored. So if you have code that tells the state machine how many items to supply (like get_pulses()), you have to increase that number by one.
So when it's about the python methods for PIO and state machines, I look into rp2_pio.c. When it's about the pythonic variant of the pio_asm language, I look at modules/rp.py. Although that code is pretty concise and sometimes hard to understand.
Still waiting for documentation.

User avatar
pythoncoder
Posts: 5956
Joined: Fri Jul 18, 2014 8:01 am
Location: UK
Contact:

Another approach to put_pulses

Post by pythoncoder » Mon Mar 08, 2021 1:30 pm

This doc describes my take on outputting an arbitrary pulse train. It was developed for the micropython_ir and micropython_remote libraries, which handle IR and radio remote control reception and transmission. It is loosely based on the ESP32 RMT class.

It has quite different characteristics from @Robert_hh's driver. The key design difference is that it uses interrupts to feed the PIO with data. This has benefits and drawbacks:
  • It is slower. It is designed for pulses of at least ~100μs with repetition rates of <=5KHz.
  • It is nonblocking (achieved by PIO code and an interrupt service routine).
  • It suppors a carrier frequency as per the ESP32 RMT class. On one or two pins it can output pulses and/or a carrier.
  • By default it ensures that the carrier is off between bursts (a requirement for 433MHz RF).
  • It supports bursts of data: a pulse train can be repeated N times or run continuously.
  • Repeating pulse trains can be cancelled.
It is output-only. My existing code for nonblocking input uses pin IRQ's and works on RP2 to the degree of precision needed by remote controls. Again, the get_pulses driver of @Robert_hh is very much faster.

Code is here.
Peter Hinch
Index to my micropython libraries.

User avatar
Roberthh
Posts: 3667
Joined: Sat May 09, 2015 4:13 pm
Location: Rhineland, Europe

Re: get_pulses and put_pulses

Post by Roberthh » Mon Mar 08, 2021 4:10 pm

That looks really impressive and is made for a real application case. My driver was just intetended as a (not too trivial) learning exercise. I was about to tell (and do it), that the feeding of the state machine can also be done with DMA, even in cycles, allowing higher precision. But then the fine control is more difficult. Maybe the a DMA IRQ would be required, which is not supported in Python.
Maybe a machine.dma module is something to consider.

User avatar
pythoncoder
Posts: 5956
Joined: Fri Jul 18, 2014 8:01 am
Location: UK
Contact:

machine.dma

Post by pythoncoder » Tue Mar 09, 2021 7:10 am

That is an interesting idea. Taking a top-down view what is needed are nonblocking Python-level interfaces to various devices such as SPI, I2C, ADC's, DAC's, PIO, etc. @Damien and @jimmo have briefly mentioned the idea of providing interfaces which integrate these devices with uasyncio. I'm hoping that making uasyncio a core part of MicroPython was done to set up a necessary precondition for this work.

To make this work on any reasonably fast device would require DMA: even if uasyncio I/O performance is improved so that it can be polled on every scheduler iteration (another hope) we're still talking about many ms of latency in a typical real application. I'm unclear whether a generic machine.dma would help. Figuring this out could be beyond my ageing brain...
Peter Hinch
Index to my micropython libraries.

nickovs
Posts: 11
Joined: Sun Sep 11, 2016 8:11 pm

Re: get_pulses and put_pulses

Post by nickovs » Thu Apr 01, 2021 5:48 pm

I have been thinking about writing a machine.DMA module myself, partly because (as Roberthh points out) its helpful to be able to add an IRQ handler and partly so as to play nicely with other modules that makes use of the C SDK's dma_claim_unused_channel function. Most of the DMA examples that have been posted will die as soon as someone tries to use machine.SPI because they just use the first couple of DMA channels without checking if they are being used. If nobody else is working on it then I might try to put one together and submit a PR.

Post Reply