Arduino rp2040 connect wifi through SPI

RP2040 based microcontroller boards running MicroPython.
Target audience: MicroPython users with an RP2040 boards.
This does not include conventional Linux-based Raspberry Pi boards.
Post Reply
manpowre
Posts: 7
Joined: Mon Oct 18, 2021 5:44 pm

Arduino rp2040 connect wifi through SPI

Post by manpowre » Mon Oct 18, 2021 10:22 pm

Adafruit wrote some python libraries to communicate with the wifi chip on the Arduino Connect rp2040 board (the board also has a accelerometer and a microphone). Circuitpython is a few years old fork, so I would love to stay on micropython, which is why Im looking into writing this communication object.

Unfortunately this wifi chip isnt supported directly within micropython unless we write a interface to talk to this wifi chip.

I did some digging tonight last few hours, and got a bit into it, but just knocking my head on woods atm, I just can't seem to figure out why the wifi board doesnt respond after sending a command to it.

I figured the CS pin needs to be high to select the SPI interface. This is done in the reset function. Also the reset pin is set to low then high in the reset function, which is identical to the original code from adafruit.

According to rp2040 interface from arduino these are the pins:
self._cs_pin = 9
self._ready_pin = 10
self._reset_pin = 3
sck=Pin(14), mosi=Pin(11), miso=Pin(8)

Mabye someone wants to take a look at mabye spot the issue ? Im off for some kids time tomorrow, so Ill be back looking at this wednesday. mabye someone could take a quick look whom done SPI before ? (I did spi on raspberry pi before, just new to micropython).

Arduino RP2040 connect pins:
https://github.com/adafruit/circuitpyth ... ect/pins.c

Code: Select all

    // Secondary SPI connected to ESP32
    { MP_ROM_QSTR(MP_QSTR_MISO1), MP_ROM_PTR(&pin_GPIO8) },
    { MP_ROM_QSTR(MP_QSTR_MOSI1), MP_ROM_PTR(&pin_GPIO11) },
    { MP_ROM_QSTR(MP_QSTR_SCK1), MP_ROM_PTR(&pin_GPIO14) },
    { MP_ROM_QSTR(MP_QSTR_CS1), MP_ROM_PTR(&pin_GPIO9) },
original code:
https://github.com/adafruit/Adafruit_Ci ... sp32spi.py

the code I changed tonight for micropython:

Code: Select all


"""
Implementation Notes
--------------------
Arduino RP2040 connect pins:
https://github.com/adafruit/circuitpython/blob/main/ports/raspberrypi/boards/arduino_nano_rp2040_connect/pins.c

original code:
https://github.com/adafruit/Adafruit_CircuitPython_ESP32SPI/blob/main/adafruit_esp32spi/adafruit_esp32spi.py

================================================================================

"""

import struct
import time
from micropython import const
import machine
from machine import Pin, SPI

#from digitalio import Direction
#from adafruit_bus_device.spi_device import SPIDevice

__version__ = "0.0.0-auto.0"
__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_ESP32SPI.git"

# pylint: disable=bad-whitespace
_SET_NET_CMD           = const(0x10)
_SET_PASSPHRASE_CMD    = const(0x11)

_GET_CONN_STATUS_CMD   = const(0x20)
_GET_IPADDR_CMD        = const(0x21)
_GET_MACADDR_CMD       = const(0x22)
_GET_CURR_SSID_CMD     = const(0x23)
_GET_CURR_RSSI_CMD     = const(0x25)
_GET_CURR_ENCT_CMD     = const(0x26)

_SCAN_NETWORKS         = const(0x27)
_GET_SOCKET_CMD        = const(0x3F)
_GET_STATE_TCP_CMD     = const(0x29)
_DATA_SENT_TCP_CMD     = const(0x2A)
_AVAIL_DATA_TCP_CMD    = const(0x2B)
_GET_DATA_TCP_CMD      = const(0x2C)
_START_CLIENT_TCP_CMD  = const(0x2D)
_STOP_CLIENT_TCP_CMD   = const(0x2E)
_GET_CLIENT_STATE_TCP_CMD = const(0x2F)
_DISCONNECT_CMD        = const(0x30)
_GET_IDX_RSSI_CMD      = const(0x32)
_GET_IDX_ENCT_CMD      = const(0x33)
_REQ_HOST_BY_NAME_CMD  = const(0x34)
_GET_HOST_BY_NAME_CMD  = const(0x35)
_START_SCAN_NETWORKS   = const(0x36)
_GET_FW_VERSION_CMD    = const(0x37)
_PING_CMD              = const(0x3E)

_SEND_DATA_TCP_CMD     = const(0x44)
_GET_DATABUF_TCP_CMD   = const(0x45)
_SET_ENT_IDENT_CMD     = const(0x4A)
_SET_ENT_UNAME_CMD     = const(0x4B)
_SET_ENT_PASSWD_CMD    = const(0x4C)
_SET_ENT_ENABLE_CMD    = const(0x4F)

_START_CMD             = const(0xE0)
_END_CMD               = const(0xEE)
_ERR_CMD               = const(0xEF)
_REPLY_FLAG            = const(1<<7)
_CMD_FLAG              = const(0)

SOCKET_CLOSED      = const(0)
SOCKET_LISTEN      = const(1)
SOCKET_SYN_SENT    = const(2)
SOCKET_SYN_RCVD    = const(3)
SOCKET_ESTABLISHED = const(4)
SOCKET_FIN_WAIT_1  = const(5)
SOCKET_FIN_WAIT_2  = const(6)
SOCKET_CLOSE_WAIT  = const(7)
SOCKET_CLOSING     = const(8)
SOCKET_LAST_ACK    = const(9)
SOCKET_TIME_WAIT   = const(10)

WL_NO_SHIELD          = const(0xFF)
WL_NO_MODULE          = const(0xFF)
WL_IDLE_STATUS        = const(0)
WL_NO_SSID_AVAIL      = const(1)
WL_SCAN_COMPLETED     = const(2)
WL_CONNECTED          = const(3)
WL_CONNECT_FAILED     = const(4)
WL_CONNECTION_LOST    = const(5)
WL_DISCONNECTED       = const(6)
WL_AP_LISTENING       = const(7)
WL_AP_CONNECTED       = const(8)
WL_AP_FAILED          = const(9)
# pylint: enable=bad-whitespace

class ESP_SPIcontrol:  # pylint: disable=too-many-public-methods
    """A class that will talk to an ESP32 module programmed with special firmware
    that lets it act as a fast an efficient WiFi co-processor"""
    TCP_MODE = const(0)
    UDP_MODE = const(1)
    TLS_MODE = const(2)

    # pylint: disable=too-many-arguments
    def __init__(self, debug=False):
        self._debug = debug
        self._buffer = bytearray(10)
        self._pbuf = bytearray(1)  # buffer for param read
        self._sendbuf = bytearray(256)  # buffer for command sending
        self._socknum_ll = [[0]]      # pre-made list of list of socket #
        self._spi_device = SPI(id=1, baudrate=8000000, sck=Pin(14), mosi=Pin(11), miso=Pin(8)) #, polarity=0, phase=0)
        self._cs_pin = 9
        self._ready_pin = 10
        self._reset_pin = 3
        
        self._cs = Pin(self._cs_pin, Pin.OUT)
        self._ready = Pin(self._ready_pin, Pin.IN)
        self._reset = Pin(self._reset_pin, Pin.OUT)
        
        
        self.reset()
    
    def reset(self):
        """Hard reset the ESP32 using the reset pin"""
        if self._debug:
            print("Reset ESP32")
        self._cs.on()
        self._reset.off()
        time.sleep(0.01)    # reset
        self._reset.on()
        time.sleep(0.75)    # wait for it to boot up
        
    def _wait_for_ready(self):
        """Wait until the ready pin goes low"""
        if self._debug >= 3:
            print("Wait for ESP32 ready", end='')
        t_end = time.time() + 10
        while time.time() < t_end:  # wait up to 10 seconds
            if not self._ready.value(): # we're ready!
                if self._debug:
                    print("ESP32 ready")
                break
            if self._debug:
                print('.', end='')
                time.sleep(0.05)
        else:
            raise RuntimeError("ESP32 not responding")
        if self._debug:
            print()
    
    def _send_command(self, cmd, params=None, *, param_len_16=False):
        """Send over a command with a list of parameters"""
        if not params:
            params = ()

        packet_len = 4 # header + end byte
        for i, param in enumerate(params):
            packet_len += len(param)   # parameter
            packet_len += 1            # size byte
            if param_len_16:
                packet_len += 1        # 2 of em here!
        while packet_len % 4 != 0:
            packet_len += 1
        # we may need more space
        if packet_len > len(self._sendbuf):
            self._sendbuf = bytearray(packet_len)

        self._sendbuf[0] = _START_CMD
        self._sendbuf[1] = cmd & ~_REPLY_FLAG
        self._sendbuf[2] = len(params)

        # handle parameters here
        ptr = 3
        for i, param in enumerate(params):
            if self._debug >= 2:
                print("\tSending param #%d is %d bytes long" % (i, len(param)))
            if param_len_16:
                self._sendbuf[ptr] = (len(param) >> 8) & 0xFF
                ptr += 1
            self._sendbuf[ptr] = len(param) & 0xFF
            ptr += 1
            for j, par in enumerate(param):
                self._sendbuf[ptr+j] = par
            ptr += len(param)
        self._sendbuf[ptr] = _END_CMD

        self._wait_for_ready()
        #while True:
        """t_end = time.time() + 6 # wait up to 1000ms
        while time.time() < t_end:
            if self._ready.value():  # ok ready to send!
                print("ready to send")
                break
        else:
            raise RuntimeError("ESP32 timed out on SPI select")
        """
        self._spi_device.write(self._sendbuf) #, start=0, end=packet_len)  # pylint: disable=no-member
        if self._debug:
            print("Wrote: ", [hex(b) for b in self._sendbuf[0:packet_len]])
    
    
    
    def _send_command_get_response(self, cmd, params=None, *,
                                   reply_params=1, sent_param_len_16=False,
                                   recv_param_len_16=False):
        """Send a high level SPI command, wait and return the response"""
        self._send_command(cmd, params, param_len_16=sent_param_len_16)
        return self._wait_response_cmd(cmd, reply_params, param_len_16=recv_param_len_16)
    
    
    def _read_byte(self, spi):
        """Read one byte from SPI"""
        spi.readinto(self._pbuf)
        if self._debug:
            print("\t\tRead:", hex(self._pbuf[0]))
        return self._pbuf[0]
    
    def _wait_spi_char(self, spi, desired):
        """Read a byte with a time-out, and if we get it, check that its what we expect"""
        t_end = time.time() + 1 # wait up to 100ms
        while time.time() < t_end: 
            r = self._read_byte(spi)
            #print(r)
            if r == _ERR_CMD:
                raise RuntimeError("Error response to command")
            if r == desired:
                return True
        raise RuntimeError("Timed out waiting for SPI char")
    
    
    def _wait_response_cmd(self, cmd, num_responses=None, *, param_len_16=False):
        """Wait for ready, then parse the response"""
        self._wait_for_ready()

        responses = []
        #with self._spi_device as spi:
        t_end = time.time() + 1 # wait up to 1000ms
        while time.time() < t_end: 
            if self._ready.value():  # ok ready to send!
                break
        else:
            raise RuntimeError("ESP32 timed out on SPI select")

        self._wait_spi_char(self._spi_device, _START_CMD)
        self._check_data(spi, cmd | _REPLY_FLAG)
        if num_responses is not None:
            self._check_data(spi, num_responses)
        else:
            num_responses = self._read_byte(spi)
        for num in range(num_responses):
            param_len = self._read_byte(spi)
            if param_len_16:
                param_len <<= 8
                param_len |= self._read_byte(spi)
            if self._debug >= 2:
                print("\tParameter #%d length is %d" % (num, param_len))
            response = bytearray(param_len)
            self._read_bytes(spi, response)
            responses.append(response)
        self._check_data(spi, _END_CMD)

        if self._debug >= 2:
            print("Read %d: " % len(responses[0]), responses)
        return responses
    
    
    
    @property
    def status(self):
        """The status of the ESP32 WiFi core. Can be WL_NO_SHIELD or WL_NO_MODULE
        (not found), WL_IDLE_STATUS, WL_NO_SSID_AVAIL, WL_SCAN_COMPLETED,
        WL_CONNECTED, WL_CONNECT_FAILED, WL_CONNECTION_LOST, WL_DISCONNECTED,
        WL_AP_LISTENING, WL_AP_CONNECTED, WL_AP_FAILED"""
        if self._debug:
            print("Connection status")
        resp = self._send_command_get_response(_GET_CONN_STATUS_CMD)
        if self._debug:
            print("Conn status:", resp[0][0])
        return resp[0][0]   # one byte response

    @property
    def firmware_version(self):
        """A string of the firmware version on the ESP32"""
        if self._debug:
            print("Firmware version")
        resp = self._send_command_get_response(_GET_FW_VERSION_CMD)
        print(resp)
        return resp[0]

    @property
    def MAC_address(self):        # pylint: disable=invalid-name
        """A bytearray containing the MAC address of the ESP32"""
        if self._debug:
            print("MAC address")
        resp = self._send_command_get_response(_GET_MACADDR_CMD, [b'\xFF'])
        return resp[0]
        
        
    @property
    def ssid(self):
        """The name of the access point we're connected to"""
        resp = self._send_command_get_response(_GET_CURR_SSID_CMD, [b'\xFF'])
        return resp[0]

    @property
    def rssi(self):
        """The receiving signal strength indicator for the access point we're
        connected to"""
        resp = self._send_command_get_response(_GET_CURR_RSSI_CMD, [b'\xFF'])
        return struct.unpack('<i', resp[0])[0]

    @property
    def network_data(self):
        """A dictionary containing current connection details such as the 'ip_addr',
        'netmask' and 'gateway'"""
        resp = self._send_command_get_response(_GET_IPADDR_CMD, [b'\xFF'], reply_params=3)
        return {'ip_addr': resp[0], 'netmask': resp[1], 'gateway': resp[2]}

    @property
    def ip_address(self):
        """Our local IP address"""
        return self.network_data['ip_addr']

    @property
    def is_connected(self):
        """Whether the ESP32 is connected to an access point"""
        try:
            return self.status == WL_CONNECTED
        except RuntimeError:
            self.reset()
            return False
        
    def connect_AP(self, ssid, password): # pylint: disable=invalid-name
        """Connect to an access point with given name and password.
        Will retry up to 10 times and return on success or raise
        an exception on failure"""
        print("connect_AP")
        if self._debug:
            print("Connect to AP", ssid, password)
        if isinstance(ssid, str):
            ssid = bytes(ssid, 'utf-8')
        if password:
            if isinstance(password, str):
                password = bytes(password, 'utf-8')
            self.wifi_set_passphrase(ssid, password)
        else:
            self.wifi_set_network(ssid)
        for _ in range(10): # retries
            stat = self.status
            if stat == WL_CONNECTED:
                return stat
            time.sleep(1)
        if stat in (WL_CONNECT_FAILED, WL_CONNECTION_LOST, WL_DISCONNECTED):
            raise RuntimeError("Failed to connect to ssid", ssid)
        if stat == WL_NO_SSID_AVAIL:
            raise RuntimeError("No such ssid", ssid)
        raise RuntimeError("Unknown error 0x%02X" % stat)
    
    def wifi_set_passphrase(self, ssid, passphrase):
        """Sets the desired access point ssid and passphrase"""
        print("wifi_set_passphrase")
        resp = self._send_command_get_response(_SET_PASSPHRASE_CMD, [ssid, passphrase])
        if resp[0][0] != 1:
            raise RuntimeError("Failed to set passphrase")
    
    def pretty_ip(self, ip): # pylint: disable=no-self-use, invalid-name
        """Converts a bytearray IP address to a dotted-quad string for printing"""
        return "%d.%d.%d.%d" % (ip[0], ip[1], ip[2], ip[3])
        
        
esp = ESP_SPIcontrol(debug=True)

print(esp.status)
print(esp.firmware_version)
print([hex(i) for i in esp.MAC_address])

esp.connect_AP(ssid="SSID", password="passphrase")

IP_ADDR = esp.pretty_ip(esp.ip_address)




danhalbert
Posts: 18
Joined: Mon Jan 16, 2017 3:58 am

Re: Arduino rp2040 connect wifi through SPI

Post by danhalbert » Tue Oct 19, 2021 3:07 am

A side note re CircuitPython and its fork:

CircuitPython has diverged from MicroPython in terms of I/O modules, due to different goals. But we aim to keep up with the core language parts of MicroPython. We recently merged the changes up to MicroPython 1.17, and plan to keep up with the releases.

manpowre
Posts: 7
Joined: Mon Oct 18, 2021 5:44 pm

Re: Arduino rp2040 connect wifi through SPI

Post by manpowre » Tue Oct 19, 2021 8:10 am

Yeah I noticed. Even though adafruit is doing amazing with making all those libraries available, still the core is outdated and missing a few things,.. its like the best stuff is shared between openMV fork , circuitpython fork and Micropython. I got rest of my stuff working on Micropython, was just missing this wifi interface.

User avatar
Roberthh
Posts: 3667
Joined: Sat May 09, 2015 4:13 pm
Location: Rhineland, Europe

Re: Arduino rp2040 connect wifi through SPI

Post by Roberthh » Tue Oct 19, 2021 8:34 am

There is an open PR for the Arduino Nano connect supporting WiFi. See https://github.com/micropython/micropython/pull/7669
So it's coming, maybe even before Christmas.

manpowre
Posts: 7
Joined: Mon Oct 18, 2021 5:44 pm

Re: Arduino rp2040 connect wifi through SPI

Post by manpowre » Mon Oct 25, 2021 10:49 am

I made my own branch of master, and merged https://github.com/micropython/micropython/pull/7668 and https://github.com/micropython/micropython/pull/7669 and voila, the esp32spi interface on the arduino rp2040 connect is working, and its working great.
Unlike adafruit's circuitpython only supporting 1 socket, this implementation supports several sockets, so I tested with UDP incoming to local IP, and UDP multicast sending, and it works just fine and its stable..
and the best is that _thread can be imported to fire up the second core to deal with network.

Post Reply