I am attempting to use this nrf library to:
1)Have an unlimited number of picos send and recv a small message (which the test code provides but more driven toward a master/slave setup on two devices). I would like to avoid the pipes as I do not want a limited set of hardwired pipes.
Thanks in advance.
https://github.com/micropython/micropyt ... l01test.py
nrf24l01 Library
- mytechnotalent
- Posts: 3
- Joined: Mon Jul 22, 2019 6:15 pm
- Location: Washington, D.C.
- Contact:
- mytechnotalent
- Posts: 3
- Joined: Mon Jul 22, 2019 6:15 pm
- Location: Washington, D.C.
- Contact:
Re: nrf24l01 Library
I worked out my own solution and wanted to provide for the benefit of the community.
Code: Select all
# pyright: reportMissingImports=false
# pyright: reportUndefinedVariable=false
import utime
import ustruct as struct
from nrf24l01 import NRF24L01
from config import *
# NRF config
if usys.platform == 'rp2': # Software SPI
cfg = {'spi': 0, 'copi': 4, 'cipo': 7, 'sck': 6, 'csn': 14, 'ce': 17}
else:
raise ValueError('Unsupported platform {}'.format(usys.platform))
pipes = (b"\xe1\xf0\xf0\xf0\xf0", b"\xe1\xf0\xf0\xf0\xf0")
class NRF:
"""
Class to handle the NRF functionality
"""
def __init__(self):
csn = Pin(cfg['csn'], mode=Pin.OUT, value=1)
ce = Pin(cfg['ce'], mode=Pin.OUT, value=0)
if cfg['spi'] == -1:
spi = SPI(-1, sck=Pin(cfg['sck']), cipo=Pin(cfg['cipo']), copi=Pin(cfg['copi']))
self.nrf = NRF24L01(spi, csn, ce, payload_size=8)
else:
self.nrf = NRF24L01(SPI(cfg['spi']), csn, ce, payload_size=8)
def send(self, message):
"""
Method to send nrf data
Params:
message: str
"""
self.nrf.open_tx_pipe(pipes[0])
self.nrf.open_rx_pipe(1, pipes[1])
self.nrf.start_listening()
num_needed = 1
num_successes = 0
num_failures = 0
led_state = 0
print('NRF24L01 send mode, sending %d packets...' % num_needed)
while num_successes < num_needed and num_failures < num_needed:
# Stop listening and send packet
self.nrf.stop_listening()
led_state = max(1, (led_state << 1) & 0x0F)
print('sending:', message, led_state)
try:
for letter in message:
letter = ord(letter)
self.nrf.send(struct.pack('ii', letter, led_state))
except OSError:
pass
# Start listening again
self.nrf.start_listening()
# Wait for response, with 250ms timeout
start_time = utime.ticks_ms()
timeout = False
while not self.nrf.any() and not timeout:
if utime.ticks_diff(utime.ticks_ms(), start_time) > 250:
timeout = True
if timeout:
print('failed, response timed out')
num_failures += 1
else:
# Recv packet
(got_millis,) = struct.unpack('i', self.nrf.recv())
# Print response and round-trip delay
print(
'got response:',
got_millis,
'(delay',
utime.ticks_diff(utime.ticks_ms(), got_millis),
'ms)',
)
num_successes += 1
# Delay then loop
utime.sleep_ms(250)
print('master finished sending; successes=%d, failures=%d' % (num_successes, num_failures))
def recv(self):
"""
Method to send recv data
"""
_RX_POLL_DELAY = const(15)
_RECV_SEND_DELAY = const(10)
self.nrf.open_tx_pipe(pipes[0])
self.nrf.open_rx_pipe(1, pipes[1])
self.nrf.start_listening()
print('NRF24L01 recv mode, waiting for packets... (ctrl-C to stop)')
time = 0
interval = 7500
while time < interval:
time += 1
if time > interval:
break
if self.nrf.any():
while self.nrf.any():
buf = self.nrf.recv()
letter, led_state = struct.unpack('ii', buf)
letter = chr(letter)
print('received:', letter, led_state)
utime.sleep_ms(_RX_POLL_DELAY)
# Give sender time to get into receive mode
utime.sleep_ms(_RECV_SEND_DELAY)
self.nrf.stop_listening()
try:
letter = ord(letter)
self.nrf.send(struct.pack('i', letter))
except OSError:
pass
print('sent response')
self.nrf.start_listening()