nrf24l01 Library

Discussion about programs, libraries and tools that work with MicroPython. Mostly these are provided by a third party.
Target audience: All users and developers of MicroPython.
Post Reply
User avatar
mytechnotalent
Posts: 3
Joined: Mon Jul 22, 2019 6:15 pm
Location: Washington, D.C.
Contact:

nrf24l01 Library

Post by mytechnotalent » Fri Jul 16, 2021 11:23 am

I am attempting to use this nrf library to:
1)Have an unlimited number of picos send and recv a small message (which the test code provides but more driven toward a master/slave setup on two devices). I would like to avoid the pipes as I do not want a limited set of hardwired pipes.

Thanks in advance.

https://github.com/micropython/micropyt ... l01test.py

User avatar
mytechnotalent
Posts: 3
Joined: Mon Jul 22, 2019 6:15 pm
Location: Washington, D.C.
Contact:

Re: nrf24l01 Library

Post by mytechnotalent » Fri Jul 16, 2021 7:57 pm

I worked out my own solution and wanted to provide for the benefit of the community.

Code: Select all

# pyright: reportMissingImports=false
# pyright: reportUndefinedVariable=false

import utime
import ustruct as struct
from nrf24l01 import NRF24L01

from config import *

# NRF config
if usys.platform == 'rp2':  # Software SPI
    cfg = {'spi': 0, 'copi': 4, 'cipo': 7, 'sck': 6, 'csn': 14, 'ce': 17}
else:
    raise ValueError('Unsupported platform {}'.format(usys.platform))
pipes = (b"\xe1\xf0\xf0\xf0\xf0", b"\xe1\xf0\xf0\xf0\xf0")


class NRF:
    """
    Class to handle the NRF functionality
    """

    def __init__(self):
        csn = Pin(cfg['csn'], mode=Pin.OUT, value=1)
        ce = Pin(cfg['ce'], mode=Pin.OUT, value=0)
        if cfg['spi'] == -1:
            spi = SPI(-1, sck=Pin(cfg['sck']), cipo=Pin(cfg['cipo']), copi=Pin(cfg['copi']))
            self.nrf = NRF24L01(spi, csn, ce, payload_size=8)
        else:
            self.nrf = NRF24L01(SPI(cfg['spi']), csn, ce, payload_size=8)

    def send(self, message):
        """
        Method to send nrf data

        Params:
            message: str
        """
        self.nrf.open_tx_pipe(pipes[0])
        self.nrf.open_rx_pipe(1, pipes[1])
        self.nrf.start_listening()
        num_needed = 1
        num_successes = 0
        num_failures = 0
        led_state = 0
        print('NRF24L01 send mode, sending %d packets...' % num_needed)
        while num_successes < num_needed and num_failures < num_needed:
            # Stop listening and send packet
            self.nrf.stop_listening()
            led_state = max(1, (led_state << 1) & 0x0F)
            print('sending:', message, led_state)
            try:
                for letter in message:
                    letter = ord(letter)
                    self.nrf.send(struct.pack('ii', letter, led_state))
            except OSError:
                pass
            # Start listening again
            self.nrf.start_listening()
            # Wait for response, with 250ms timeout
            start_time = utime.ticks_ms()
            timeout = False
            while not self.nrf.any() and not timeout:
                if utime.ticks_diff(utime.ticks_ms(), start_time) > 250:
                    timeout = True
            if timeout:
                print('failed, response timed out')
                num_failures += 1
            else:
                # Recv packet
                (got_millis,) = struct.unpack('i', self.nrf.recv())
                # Print response and round-trip delay
                print(
                    'got response:',
                    got_millis,
                    '(delay',
                    utime.ticks_diff(utime.ticks_ms(), got_millis),
                    'ms)',
                )
                num_successes += 1
            # Delay then loop
            utime.sleep_ms(250)
            print('master finished sending; successes=%d, failures=%d' % (num_successes, num_failures))

    def recv(self):
        """
        Method to send recv data
        """        
        _RX_POLL_DELAY = const(15)
        _RECV_SEND_DELAY = const(10)
        self.nrf.open_tx_pipe(pipes[0])
        self.nrf.open_rx_pipe(1, pipes[1])
        self.nrf.start_listening()
        print('NRF24L01 recv mode, waiting for packets... (ctrl-C to stop)')
        time = 0
        interval = 7500
        while time < interval:
            time += 1
            if time > interval:
                break
            if self.nrf.any():
                while self.nrf.any():
                    buf = self.nrf.recv()
                    letter, led_state = struct.unpack('ii', buf)
                    letter = chr(letter)
                    print('received:', letter, led_state)
                    utime.sleep_ms(_RX_POLL_DELAY)
                # Give sender time to get into receive mode
                utime.sleep_ms(_RECV_SEND_DELAY)
                self.nrf.stop_listening()
                try:
                    letter = ord(letter)
                    self.nrf.send(struct.pack('i', letter))
                except OSError:
                    pass
                print('sent response')
                self.nrf.start_listening()


Post Reply