Application for Robert-hh's ads1x15 library

Discussion about programs, libraries and tools that work with MicroPython. Mostly these are provided by a third party.
Target audience: All users and developers of MicroPython.
Post Reply
doceave
Posts: 11
Joined: Fri Feb 14, 2020 4:02 pm

Application for Robert-hh's ads1x15 library

Post by doceave » Fri Feb 14, 2020 4:19 pm

Hi there MicroPython community

I hope to achieve the following:
> I am a hoping to use a 4-channel ADC (an ADS1115), interfacing an ESP32 via I2C, to sample each of the 4 channels at just 10Hz. Higher frequency sampling may be useful as this may help filter out any noise on the sensors.
> My hope is to use a timer driven interrupt to, at 1/10th of a second intervals, take these 4 readings and add them to an array along with corresponding timestamp.
> The array should at all times hold ADC values taken over the past 10 seconds; then discard older values.

My issue is that the library is somewhat confusing for a beginner like me --- I am, despite a few hours of battling I have had little success.

Would an expert kindly assist by demonstrating a function that I can call at a frequency of 10Hz which:
> Takes for the 4 readings
> Adds them to array
> Deletes older data from the array

I am certain that detailed commentary will greatly aid others who similarly will be grappling with this, relatively, more complex library.

I will write a different interrupt that will, at 2sec intervals consider the data stored in the array and take appropriate action.

Many thanks in advance.

User avatar
Roberthh
Posts: 1838
Joined: Sat May 09, 2015 4:13 pm
Location: Rhineland, Europe

Re: Application for Robert-hh's ads1x15 library

Post by Roberthh » Fri Feb 14, 2020 8:00 pm

Since you want to read all 4 channels, the example about timer triggered reading may be a little bit mosleading. You need to initiliaze the ADC and set up a timer callback which is called every 100ms, and in that callback you use the read() method of the ADS1115 class to read all four channels one after the other. There will be a time difference between the four readings, but that cannot be avoided. For simplicity, you can handle the buffer outside that class. Once that works, you can optimize it.
I may give you some sample code tomorrow.

User avatar
Roberthh
Posts: 1838
Joined: Sat May 09, 2015 4:13 pm
Location: Rhineland, Europe

Re: Application for Robert-hh's ads1x15 library

Post by Roberthh » Sat Feb 15, 2020 8:34 am

OK. So here is a sample code, that reads all four channels and stores the data in global circular buffers. It is not a complex code. It maybe difficult to understand the first time you see it, how the acquisition in the timer callback happens and how data is exchanged. The key are the global buffers and the global variables index_put and index_get, which synchronize storing data in an taking out of these buffers.
It is more that three lines of code, but most of that is needed.

Code: Select all

from machine import I2C, Pin, Timer
from time import sleep_ms, ticks_ms
from array import array
import ads1x15

i2c_addr = 72
gain = 1
# set the conversion rate to 860 SPS = ~2 ms per value; that leaves
# >90 ms time for processing the data with a 100 ms timer
ADC_RATE = const(7)
ADC_PERIOD = const(100)  # repeat every 100 ms
BUFFERSIZE = const(128)  # Size of the circular buffer

#  Allocate the buffers for data
data_ch0 = array("h", [0] * BUFFERSIZE)
data_ch1 = array("h", [0] * BUFFERSIZE)
data_ch2 = array("h", [0] * BUFFERSIZE)
data_ch3 = array("h", [0] * BUFFERSIZE)
timestamp = array("L", [0] * BUFFERSIZE)

i2c = I2C(scl=Pin(5), sda=Pin(4), freq=400000)
adc = ads1x15.ADS1115(i2c, i2c_addr, gain)

tim = Timer(-1)

#
# Handler for the actual data sampling
# It gets the values from all four channels and
# puts them into the circular buffer.
# Activated by the timer interrupt.
#
def sample(x):
    global index_put, index_get, adc
    global data_ch1, data_ch2, data_ch3, data_ch2, timestamp

    if ((index_put + 1) % BUFFERSIZE) != index_get: # test for overrun
        timestamp[index_put] = ticks_ms()
        data_ch0[index_put] = adc.read(ADC_RATE, 0)
        data_ch1[index_put] = adc.read(ADC_RATE, 1)
        data_ch2[index_put] = adc.read(ADC_RATE, 2)
        data_ch3[index_put] = adc.read(ADC_RATE, 3)
        index_put = (index_put + 1) % BUFFERSIZE


# Setting these to values to zero invalidates the buffer content
index_put = 0
index_get = 0
# Start the timer. From now on, sampling occurs.
tim.init(period=ADC_PERIOD, mode=Timer.PERIODIC, callback=sample)

#
# Here starts the main processing loop. In this example,
# it just waits for data and print it. Due to the circular buffer,
# up to 127 values can be stored, leaving time for longish
# processing without loosing values.
# In this example, the loop can be terminated with Ctrl-C
#

try:
    while True:

        while index_put == index_get:  ## whait until data is available
            sleep_ms(1)

        print ("Timestamp and acquired data: ", timestamp[index_get], data_ch0[index_get], data_ch1[index_get],
                                data_ch2[index_get],data_ch3[index_get])

        index_get = (index_get + 1) % BUFFERSIZE  # advance to next sample

except KeyboardInterrupt:
    pass

# Stop the timer and thus the acquisition.
tim.deinit()

P.S.: I tested the code on a ESP32, firmware:
MicroPython v1.12-164 on 2020-02-11; ESP32 module (spiram) with ESP32

doceave
Posts: 11
Joined: Fri Feb 14, 2020 4:02 pm

Re: Application for Robert-hh's ads1x15 library

Post by doceave » Sat Feb 15, 2020 9:02 am

Robert-hh himself! I am honored and deeply appreciative of the reply. I will implement this today and feedback with the results :)

Totally awesome! I hope to pay if forward asap :)

doceave
Posts: 11
Joined: Fri Feb 14, 2020 4:02 pm

Re: Application for Robert-hh's ads1x15 library

Post by doceave » Sat Feb 15, 2020 2:23 pm

Many many thanks Robert!

I adjusted the I2C addresses accordingly and it works! Beautiful.

Having exported a sample of data into Excel for graphing I can see the patterns that I need... Just a bit of thought and the code will take care of automating an important task.

You are an asset to the community.

Keep well.
Roberthh wrote:
Sat Feb 15, 2020 8:34 am
OK. So here is a sample code, that reads all four channels and stores the data in global circular buffers. It is not a complex code. It maybe difficult to understand the first time you see it, how the acquisition in the timer callback happens and how data is exchanged. The key are the global buffers and the global variables index_put and index_get, which synchronize storing data in an taking out of these buffers.
It is more that three lines of code, but most of that is needed.

Code: Select all

from machine import I2C, Pin, Timer
from time import sleep_ms, ticks_ms
from array import array
import ads1x15

i2c_addr = 72
gain = 1
# set the conversion rate to 860 SPS = ~2 ms per value; that leaves
# >90 ms time for processing the data with a 100 ms timer
ADC_RATE = const(7)
ADC_PERIOD = const(100)  # repeat every 100 ms
BUFFERSIZE = const(128)  # Size of the circular buffer

#  Allocate the buffers for data
data_ch0 = array("h", [0] * BUFFERSIZE)
data_ch1 = array("h", [0] * BUFFERSIZE)
data_ch2 = array("h", [0] * BUFFERSIZE)
data_ch3 = array("h", [0] * BUFFERSIZE)
timestamp = array("L", [0] * BUFFERSIZE)

i2c = I2C(scl=Pin(5), sda=Pin(4), freq=400000)
adc = ads1x15.ADS1115(i2c, i2c_addr, gain)

tim = Timer(-1)

#
# Handler for the actual data sampling
# It gets the values from all four channels and
# puts them into the circular buffer.
# Activated by the timer interrupt.
#
def sample(x):
    global index_put, index_get, adc
    global data_ch1, data_ch2, data_ch3, data_ch2, timestamp

    if ((index_put + 1) % BUFFERSIZE) != index_get: # test for overrun
        timestamp[index_put] = ticks_ms()
        data_ch0[index_put] = adc.read(ADC_RATE, 0)
        data_ch1[index_put] = adc.read(ADC_RATE, 1)
        data_ch2[index_put] = adc.read(ADC_RATE, 2)
        data_ch3[index_put] = adc.read(ADC_RATE, 3)
        index_put = (index_put + 1) % BUFFERSIZE


# Setting these to values to zero invalidates the buffer content
index_put = 0
index_get = 0
# Start the timer. From now on, sampling occurs.
tim.init(period=ADC_PERIOD, mode=Timer.PERIODIC, callback=sample)

#
# Here starts the main processing loop. In this example,
# it just waits for data and print it. Due to the circular buffer,
# up to 127 values can be stored, leaving time for longish
# processing without loosing values.
# In this example, the loop can be terminated with Ctrl-C
#

try:
    while True:

        while index_put == index_get:  ## whait until data is available
            sleep_ms(1)

        print ("Timestamp and acquired data: ", timestamp[index_get], data_ch0[index_get], data_ch1[index_get],
                                data_ch2[index_get],data_ch3[index_get])

        index_get = (index_get + 1) % BUFFERSIZE  # advance to next sample

except KeyboardInterrupt:
    pass

# Stop the timer and thus the acquisition.
tim.deinit()

P.S.: I tested the code on a ESP32, firmware:
MicroPython v1.12-164 on 2020-02-11; ESP32 module (spiram) with ESP32

Post Reply