[SOLVED] Memory upgrade: Expand flash to 32 MB

All ESP8266 boards running MicroPython.
Official boards are the Adafruit Huzzah and Feather boards.
Target audience: MicroPython users with an ESP8266 board.
crizeo
Posts: 42
Joined: Sun Aug 06, 2017 12:55 pm
Location: Germany

Re: Memory upgrade: Expand flash to 32 MB

Post by crizeo » Fri Oct 13, 2017 5:21 pm

Thank you so much for your answers :!: Your driver from flash.py helped me a lot in understanding the SPI interface! For further assistance I used sdcard.py and the Winbond documentation for W25Q32BV and W25Q256FV. Got it working now :D


The driver [winbond.py]:

The following class acts as a simple driver for Winbond Serial Flash Memory chips (for the ESP8266). Some people searching around this forum might want to use it as a template (no caching, so it is very slow in writing, ~1-5 kBps). It was tested with a W25Q32 and W25Q256 on a ESP-12, but the others (like W25Q128 or W25Q80) should work as well.

Code: Select all

# winbond.py
from micropython import const


class W25QFlash:
    SECTOR_SIZE = const(4096)
    BLOCK_SIZE = const(512)
    PAGE_SIZE = const(256)
    
    def __init__(self, spi, cs, baud=40000000):
        self.cs = cs
        self.spi = spi
        self.cs.init(self.cs.OUT, value=1)
        self.spi.init(baudrate=baud, phase=1, polarity=1)  # highest possible baudrate is 40 MHz for ESP-12

        self._cache = bytearray(self.SECTOR_SIZE)  # buffer for writing single blocks
        self._CAPACITY = self.identify()  # calc number of bytes (and makes sure the chip is detected and supported)
        self._ADR_LEN = 3 if (len(bin(self._CAPACITY-1))-2) <= 24 else 4  # address length (default: 3 bytes, 32MB+: 4)
        # setup address mode:
        if self._ADR_LEN == 4:
            if not self._read_status_reg(16):  # not in 4-byte mode
                print("entering 4-byte address mode")
                self._await()
                self.cs(0)
                self.spi.write(b'\xB7')  # 'Enter 4-Byte Address Mode'
                self.cs(1)

    def identify(self):
        # Determines manufacturer and device id and raises an error if the device is not detected or
        # not supported. Returns the number of blocks (calculated based on the detected chip).

        self._await()
        self.cs(0)
        self.spi.write(b'\x9F')  # 'Read JEDEC ID'
        mf, mem_type, cap = self.spi.read(3, 0x00)  # manufacturer id, memory type id, capacity id
        self.cs(1)

        if not (mf and mem_type and cap):  # something is 0x00
            raise OSError("device not responding, check wiring. (%s, %s, %s)" % (hex(mf), hex(mem_type), hex(cap)))
        if mf != 0xEF or mem_type not in [0x40, 0x60]:  # Winbond manufacturer, Q25 series memory (tested 0x40 only)
            raise OSError("manufacturer (%s) or memory type (%s) not supported" % (hex(mf), hex(mem_type)))
        print("manufacturer:", hex(mf))
        print("device:", hex(mem_type << 8 | cap))
        print("capacity: %d bytes" % int(2**cap))
        return 2**cap  # calculate number of bytes

    def format(self):
        # Performs a chip erase, which resets all memory to 0xFF (might take a few seconds/minutes).
        # Important: Run os.VfsFat.mkfs(flash) to make the flash an accessible file system.
        #            As always, you will then need to run os.mount(flash, '/MyFlashDir') then.
        self._wren()
        self._await()
        self.cs(0)
        self.spi.write(b'\xC7')  # 'Chip Erase'
        self.cs(1)
        self._await()  # wait for the chip to finish formatting

    def _read_status_reg(self, nr):  # Returns the value (0 or 1) in status register <nr> (S0, S1, S2, ...)
        reg, bit = divmod(nr, 8)
        self.cs(0)
        self.spi.write((b'\x05', b'\x35', b'\x15')[reg])  # 'Read Status Register-...' (1, 2, 3)
        stat = 2**bit & self.spi.read(1, 0xFF)[0]
        self.cs(1)
        return stat

    def _await(self):  # Waits for device not to be busy and returns if so
        self.cs(0)
        self.spi.write(b'\x05')  # 'Read Status Register-1'
        while 0x1 & self.spi.read(1, 0xFF)[0]:  # last bit (1) is BUSY bit in stat. reg. byte (0 = not busy, 1 = busy)
            pass
        self.cs(1)

    def _sector_erase(self, addr):  # Resets all memory within the specified sector (4KB) to 0xFF
        self._wren()
        self._await()
        self.cs(0)
        self.spi.write(b'\x20')  # 'Sector Erase'
        self.spi.write(addr.to_bytes(self._ADR_LEN, 'big'))
        self.cs(1)

    def _read(self, buf, addr):
        # Reads len(<buf>) bytes from the chip - starting at <addr> - into <buf>. To keep things
        # easy, len(<buf>) has to be a multiple of self.SECTOR_SIZE (or even better: less than that).
        print("read %d bytes starting at %s" % (len(buf), hex(addr)))
        assert addr+len(buf) <= self._CAPACITY, \
            "memory not addressable at %s with range %d (max.: %s)" % \
            (hex(addr), len(buf), hex(self._CAPACITY-1))

        self._await()
        self.cs(0)
        self.spi.write(b'\x0C' if self._ADR_LEN == 4 else b'\x0B')  # 'Fast Read' (0x03 = default), 0x0C for 4-byte mode
        self.spi.write(addr.to_bytes(self._ADR_LEN, 'big'))
        self.spi.write(b'\xFF')  # dummy byte
        self.spi.readinto(buf, 0xFF)
        self.cs(1)

    def _wren(self):  # Sets the Write Enable Latch (WEL) bit in the status register
        self._await()
        self.cs(0)
        self.spi.write(b'\x06')  # 'Write Enable'
        self.cs(1)

    def _write(self, buf, addr):
        # Writes the data from <buf> to the device starting at <addr>, which has to be erased (0xFF)
        # before. Last byte of <addr> has to be zero, which means <addr> has to be a multiple of
        # self.PAGE_SIZE (= start of page), because wrapping to the next page (if page size exceeded)
        # is implemented for full pages only. Length of <buf> has to be a multiple of self.PAGE_SIZE,
        # because only full pages are supported at the moment (<addr> will be auto-incremented).
        print("write buf[%d] to %s (%d)" % (len(buf), hex(addr), addr))
        assert len(buf) % self.PAGE_SIZE == 0, "invalid buffer length: %d" % len(buf)
        assert not addr & 0xf, "address (%d) not at page start" % addr
        assert addr+len(buf) <= self._CAPACITY, \
            "memory not addressable at %s with range %d (max.: %s)" % \
            (hex(addr), len(buf), hex(self._CAPACITY-1))

        for i in range(0, len(buf), self.PAGE_SIZE):
            self._wren()
            self._await()
            self.cs(0)
            self.spi.write(b'\x02')  # 'Page Program'
            self.spi.write(addr.to_bytes(self._ADR_LEN, 'big'))
            self.spi.write(buf[i:i+self.PAGE_SIZE])
            addr += self.PAGE_SIZE
            self.cs(1)

    def _writeblock(self, blocknum, buf):
        # To write a block, the sector (eg 4kB = 8 blocks) has to be erased first. Therefore, a sector will be read
        # and saved in cache first, then the given block will be replaced and the whole sector written back when
        assert len(buf) == self.BLOCK_SIZE, "invalid block length: %d" % len(buf)
        print("writeblock(%d, buf[%d])" % (blocknum, len(buf)))

        sector_nr = blocknum // 8
        sector_addr = sector_nr * self.SECTOR_SIZE
        index = (blocknum << 9) & 0xfff  # index of first byte of page in sector (multiple of self.PAGE_SIZE)

        self._read(self._cache, sector_addr)
        self._cache[index:index+self.BLOCK_SIZE] = buf  # apply changes
        self._sector_erase(sector_addr)
        self._write(self._cache, sector_addr)  # addr is multiple of self.SECTOR_SIZE, so last byte is zero

    def readblocks(self, blocknum, buf):
        # Read data from the chip starting at block number <blocknum> to <buf> (len = multiple of self.BLOCK_SIZE)
        assert len(buf) % self.BLOCK_SIZE == 0, 'invalid buffer length: %d' % len(buf)

        buf_len = len(buf)
        if buf_len == self.BLOCK_SIZE:
            self._read(buf, blocknum << 9)
        else:
            offset = 0
            buf_mv = memoryview(buf)
            while offset < buf_len:
                self._read(buf_mv[offset:offset+self.BLOCK_SIZE], blocknum << 9)
                offset += self.BLOCK_SIZE
                blocknum += 1

    def writeblocks(self, blocknum, buf):
        # Writes the content from <buf> (len must be multiple of self.BLOCK_SIZE) to block number <blocknum>
        assert len(buf) % self.BLOCK_SIZE == 0, 'invalid buffer length: %d' % len(buf)

        buf_len = len(buf)
        if buf_len == self.BLOCK_SIZE:
            self._writeblock(blocknum, buf)
        else:
            offset = 0
            buf_mv = memoryview(buf)
            while offset < buf_len:
                self._writeblock(blocknum, buf_mv[offset:offset+self.BLOCK_SIZE])
                offset += self.BLOCK_SIZE
                blocknum += 1

    def count(self):  # Returns the number of blocks (self.BLOCK_SIZE bytes) available on the device
        return int(self._CAPACITY / self.BLOCK_SIZE)

Or you could use the following version of winbond.py, which uses the cached sector, so that writing will be a bit faster for big files (no faster reading). But you always have to call the sync() method to apply the changes before shutting the ESP or the memory chip down.

Code: Select all

# winbond.py
from micropython import const


class W25QFlash:
    SECTOR_SIZE = const(4096)
    BLOCK_SIZE = const(512)
    PAGE_SIZE = const(256)

    def __init__(self, spi, cs, baud=40000000):
        self.cs = cs
        self.spi = spi
        self.cs.init(self.cs.OUT, value=1)
        self.spi.init(baudrate=baud, phase=1, polarity=1)  # highest possible baudrate is 40 MHz for ESP-12

        self._cache_saddr = None  # address of the sector that is currently cached
        self._cache_isdirty = False  # set to True if the data has not been written back to the chip
        self._cache = bytearray(self.SECTOR_SIZE)  # buffer for writing single blocks
        self._CAPACITY = self.identify()  # calc number of bytes (and makes sure the chip is detected and supported)
        self._ADR_LEN = 3 if (len(
            bin(self._CAPACITY - 1)) - 2) <= 24 else 4  # address length (default: 3 bytes, 32MB+: 4)
        # setup address mode:
        if self._ADR_LEN == 4:
            if not self._read_status_reg(16):  # not in 4-byte mode
                self._await()
                self.cs(0)
                self.spi.write(b'\xB7')  # 'Enter 4-Byte Address Mode'
                self.cs(1)

    def identify(self):
        # Determines manufacturer and device id and raises an error if the device is not detected or
        # not supported. Returns the number of blocks (calculated based on the detected chip).

        self._await()
        self.cs(0)
        self.spi.write(b'\x9F')  # 'Read JEDEC ID'
        mf, mem_type, cap = self.spi.read(3, 0x00)  # manufacturer id, memory type id, capacity id
        self.cs(1)

        if not (mf and mem_type and cap):  # something is 0x00
            raise OSError("device not responding, check wiring. (%s, %s, %s)" % (hex(mf), hex(mem_type), hex(cap)))
        if mf != 0xEF or mem_type not in [0x40, 0x60]:  # Winbond manufacturer, Q25 series memory (tested 0x40 only)
            raise OSError("manufacturer (%s) or memory type (%s) not supported" % (hex(mf), hex(mem_type)))
        #print("manufacturer:", hex(mf))
        #print("device:", hex(mem_type << 8 | cap))
        #print("capacity: %d bytes" % int(2 ** cap))
        return 2 ** cap  # calculate number of bytes

    def format(self):
        # Performs a chip erase, which resets all memory to 0xFF (might take a few seconds/minutes).
        # Important: Run os.VfsFat.mkfs(flash) to make the flash an accessible file system.
        #            As always, you will then need to run os.mount(flash, '/MyFlashDir') then.
        self._wren()
        self._await()
        self.cs(0)
        self.spi.write(b'\xC7')  # 'Chip Erase'
        self.cs(1)
        self._await()  # wait for the chip to finish formatting

    def _read_status_reg(self, nr):  # Returns the value (0 or 1) in status register <nr> (S0, S1, S2, ...)
        reg, bit = divmod(nr, 8)
        self.cs(0)
        self.spi.write((b'\x05', b'\x35', b'\x15')[reg])  # 'Read Status Register-...' (1, 2, 3)
        stat = 2 ** bit & self.spi.read(1, 0xFF)[0]
        self.cs(1)
        return stat

    def _await(self):  # Waits for device not to be busy and returns if so
        self.cs(0)
        self.spi.write(b'\x05')  # 'Read Status Register-1'
        while 0x1 & self.spi.read(1, 0xFF)[0]:  # last bit (1) is BUSY bit in stat. reg. byte (0 = not busy, 1 = busy)
            pass
        self.cs(1)

    def _sector_erase(self, addr):  # Resets all memory within the specified sector (4KB) to 0xFF
        self._wren()
        self._await()
        self.cs(0)
        self.spi.write(b'\x20')  # 'Sector Erase'
        self.spi.write(addr.to_bytes(self._ADR_LEN, 'big'))
        self.cs(1)

    def _read(self, buf, addr):
        # Reads len(<buf>) bytes from the chip - starting at <addr> - into <buf>. To keep things
        # easy, len(<buf>) has to be a multiple of self.SECTOR_SIZE (or even better: less than that).
        #print("read %d bytes starting at %s" % (len(buf), hex(addr)))
        #assert addr + len(buf) <= self._CAPACITY, \
        #    "memory not addressable at %s with range %d (max.: %s)" % \
        #    (hex(addr), len(buf), hex(self._CAPACITY - 1))

        if self._cache_saddr is not None:
            buf_len = len(buf)
            if self._cache_saddr <= addr and addr+buf_len <= self._cache_saddr+self.SECTOR_SIZE:
                # section is completely cached, so simply read from cache
                buf[:] = self._cache[addr-self._cache_saddr:addr-self._cache_saddr+buf_len]
                return  # do not read from memory
            elif addr < self._cache_saddr < addr+buf_len or addr < self._cache_saddr+self.SECTOR_SIZE < addr+buf_len:
                # part of the section is in cache, so write back if required before reading all from memory
                self.sync()

        self._await()
        self.cs(0)
        self.spi.write(b'\x0C' if self._ADR_LEN == 4 else b'\x0B')  # 'Fast Read' (0x03 = default), 0x0C for 4-byte mode
        self.spi.write(addr.to_bytes(self._ADR_LEN, 'big'))
        self.spi.write(b'\xFF')  # dummy byte
        self.spi.readinto(buf, 0xFF)
        self.cs(1)

    def _wren(self):  # Sets the Write Enable Latch (WEL) bit in the status register
        self._await()
        self.cs(0)
        self.spi.write(b'\x06')  # 'Write Enable'
        self.cs(1)

    def _write(self, buf, addr):
        # Writes the data from <buf> to the device starting at <addr>, which has to be erased (0xFF)
        # before. Last byte of <addr> has to be zero, which means <addr> has to be a multiple of
        # self.PAGE_SIZE (= start of page), because wrapping to the next page (if page size exceeded)
        # is implemented for full pages only. Length of <buf> has to be a multiple of self.PAGE_SIZE,
        # because only full pages are supported at the moment (<addr> will be auto-incremented).
        #print("write buf[%d] to %s (%d)" % (len(buf), hex(addr), addr))
        #assert len(buf) % self.PAGE_SIZE == 0, "invalid buffer length: %d" % len(buf)
        #assert not addr & 0xf, "address (%d) not at page start" % addr
        #assert addr + len(buf) <= self._CAPACITY, \
        #    "memory not addressable at %s with range %d (max.: %s)" % \
        #    (hex(addr), len(buf), hex(self._CAPACITY - 1))

        for i in range(0, len(buf), self.PAGE_SIZE):
            self._wren()
            self._await()
            self.cs(0)
            self.spi.write(b'\x02')  # 'Page Program'
            self.spi.write(addr.to_bytes(self._ADR_LEN, 'big'))
            self.spi.write(buf[i:i + self.PAGE_SIZE])
            addr += self.PAGE_SIZE
            self.cs(1)

    def _writeblock(self, blocknum, buf):
        # To write a block, the sector (eg 4kB = 8 blocks) has to be erased first. Therefore, a sector will be read
        # and saved in cache first, then the given block will be replaced and the whole sector written back when
        #assert len(buf) == self.BLOCK_SIZE, "invalid block length: %d" % len(buf)
        #print("writeblock(%d, buf[%d])" % (blocknum, len(buf)))

        sector_nr = blocknum // 8
        sector_addr = sector_nr * self.SECTOR_SIZE
        index = (blocknum << 9) & 0xfff  # index of first byte of page in sector (multiple of self.PAGE_SIZE)

        if sector_addr != self._cache_saddr:  # sector not cached -> put in cache
            self.sync()  # write back previously cached sector first (apply old changes)
            self._read(self._cache, sector_addr)
            self._cache_saddr = sector_addr
        self._cache[index:index + self.BLOCK_SIZE] = buf  # perform changes in cache
        self._cache_isdirty = True  # sync() has to be called to apply the changes!

    def readblocks(self, blocknum, buf):
        # Read data from the chip starting at block number <blocknum> to <buf> (len = multiple of self.BLOCK_SIZE)
        #print("READ %d bytes starting at block %d" % (len(buf), blocknum))
        #assert len(buf) % self.BLOCK_SIZE == 0, 'invalid buffer length: %d' % len(buf)

        buf_len = len(buf)
        if buf_len == self.BLOCK_SIZE:
            self._read(buf, blocknum << 9)
        else:
            offset = 0
            buf_mv = memoryview(buf)
            while offset < buf_len:
                self._read(buf_mv[offset:offset + self.BLOCK_SIZE], blocknum << 9)
                offset += self.BLOCK_SIZE
                blocknum += 1

    def writeblocks(self, blocknum, buf):
        # Writes the content from <buf> (len must be multiple of self.BLOCK_SIZE) to block number <blocknum>
        #print("WRITE %d bytes starting at block %d" % (len(buf), blocknum))
        #assert len(buf) % self.BLOCK_SIZE == 0, 'invalid buffer length: %d' % len(buf)

        buf_len = len(buf)
        if buf_len == self.BLOCK_SIZE:
            self._writeblock(blocknum, buf)
        else:
            offset = 0
            buf_mv = memoryview(buf)
            while offset < buf_len:
                self._writeblock(blocknum, buf_mv[offset:offset + self.BLOCK_SIZE])
                offset += self.BLOCK_SIZE
                blocknum += 1

    def sync(self):  # Writes back the sector from cache to the chip (required to apply the changes!)
        if self._cache_isdirty:
            self._sector_erase(self._cache_saddr)
            self._write(self._cache, self._cache_saddr)  # addr is multiple of self.SECTOR_SIZE, so last byte is zero
            self._cache_isdirty = False

    def count(self):  # Returns the number of blocks (self.BLOCK_SIZE bytes) available on the device
        return int(self._CAPACITY / self.BLOCK_SIZE)


Usage:

First of all, the wiring has to be ok, otherwise you won't even get the manufacturer information (__init__ method will fail). VCC and GND connected. DI/IO0 to MOSI/13, DO/IO1 to MISO/12, CLK to SCK/14 and CS to pin 15 (or any pin you like). Make sure to add pull-up resistors. Especially HOLD (and RESET) should be pulled up (or connected to VCC directly if not required)!

Code: Select all

import machine, os, winbond
flash = winbond.W25QFlash(machine.SPI(1), machine.Pin(15))
 flash.format()         # !!! only required on the very first start (will remove everything); takes some seconds/minutes!
 os.VfsFat.mkfs(flash)  # !!! only required on first setup and after formatting
os.mount(flash, '/win')  # after every reboot of the ESP
#os.chdir('win')

Post Reply