Detection if device is moving (in motion) or not

Discussion about programs, libraries and tools that work with MicroPython. Mostly these are provided by a third party.
Target audience: All users and developers of MicroPython.
netsrac
Posts: 17
Joined: Sun Jun 07, 2020 7:19 pm
Location: Berlin, Germany

Re: Detection if device is moving (in motion) or

Post by netsrac » Mon Dec 20, 2021 7:06 pm

I‘m also experimenting at the moment. Unfortunately I can not drive and code at the same time. So I do some coding and use a color LED to display various levels.

Here is what I‘m currently testing: I take measurements from the accelerosensor and calculate the difference from the last measurement. Sleep time between each measurement in 20ms. Depending on the difference value I add +1, +2 or +4 to a variable. After a total of 10 seconds I return this. Now I evaluate the total number of hits and try to come up with a pattern.

At least I can avoid triggering it when slamming the door or the trunk as these are only shot movements and do not count enough in total.

Starting at a traffic light works quite well…what does not work is a slow and soft start without a lot of movement like starting in a parking lot. My „am I driving trigger“ doesn’t get hit before I‘m on the road-

But I guess that’s a way that could work for me.

So my plan now is: wake-up from deep sleep, measure 10 seconds for movement. If no movement is detected, the goto deepsleep again for 60 seconds. If I detect movement. I’ll start the LTE Modem and the GPS. The only problem is, that it still takes 3-5 min for the GPS to fix its position. But I guess there is no way around if I don’t want to let the GPS run all the time. And there is no way to do this without draining the battery to much when the car is parked…

User avatar
OlivierLenoir
Posts: 126
Joined: Fri Dec 13, 2019 7:10 pm
Location: Picardie, FR

Re: Detection if device is moving (in motion) or not

Post by OlivierLenoir » Tue Dec 21, 2021 9:12 pm

When reading MPU6050's raw data, while the sensor is static on a table, we can observe a Gaussian distribution (blue histogram). We can standardize those data using standard score (orange histogram).
This simple standardization will be sufficient to code a simple motion detector with a MPU6050.
I've simulated it, with the here bellow Python 3.7.3 code.
StandardScore.png
StandardScore.png (8.85 KiB) Viewed 19236 times

Code: Select all

"""
Created: 2021-12-21 21:25:34
License: MIT, Copyright (c) 2021 Olivier Lenoir
Language: Python 3.7.3
Project: Test standard score
"""

from random import gauss
from matplotlib import pyplot as plt

# mean
MU = 121
# standard deviation
SIGMA = 15

# Gaussian distribution data set
data = [gauss(MU, SIGMA) for _ in range(500)]
# Standardize data set with standard score
data_standard_score = [(x - MU) / SIGMA for x in data]

plt.hist(data, 50)
plt.hist(data_standard_score, 50)
plt.show()
Last edited by OlivierLenoir on Tue Dec 28, 2021 9:50 am, edited 2 times in total.

User avatar
OlivierLenoir
Posts: 126
Joined: Fri Dec 13, 2019 7:10 pm
Location: Picardie, FR

Re: Detection if device is moving (in motion) or not

Post by OlivierLenoir » Tue Dec 21, 2021 9:57 pm

It's now time to apply standard score to MPU6050 gyro sensor to detect if we move. I reset cumulative gyro every 200 measures, it's because this method is not perfect, but to detect if we move it's acceptable.
The same approach can be apply to acceleration sensor.

Code: Select all

# Created: 2021-12-19 11:06:43
# License: MIT, Copyright (c) 2021 Olivier Lenoir
# Language: MicroPython v1.17
# Project: Roll Pitch Yaw detector
# Description: GY-521 MPU-6050 3-axis gyroscope and acceleration sensor
# Detect if Roll Pitch Yaw is greater than a trigger point


from machine import Pin, I2C
from ustruct import unpack
from utime import sleep_ms

SDA = const(21)
SCL = const(22)
MPU6050_ADDR = const(0x68)
MPU6050_TEMP_OUT = const(0x41)
MPU6050_ACCEL_OUT = const(0x3b)
MPU6050_GYRO_OUT = const(0x43)


class RPY(object):
    def __init__(self, ic2, addr=MPU6050_ADDR):
        self.i2c = i2c
        self.addr = addr
        self.mean = [0, 0, 0]
        self.stddev = [1, 1, 1]
        self._init()

    def _init(self):
        # Wake sensor
        self.i2c.writeto_mem(MPU6050_ADDR, 0x6b, bytes([0]))
        sleep_ms(50)

    def gyro_raw(self):
        #Read and unpack gyro raw data
        return unpack('>hhh', self.i2c.readfrom_mem(self.addr, MPU6050_GYRO_OUT, 6))

    def gyro(self):
        # Read and convert gyro to °/s
        return [a / 131 for a in self.gyro_raw()]

    def gyro_mean_sigma(self, rate=20, p=100):
        # Calculate sensor mean and standard deviation, sentor need to be static
        s = [0, 0, 0]
        sq = [0, 0, 0]
        for _ in range(p):
            d = self.gyro_raw()
            s = [a + b for a, b in zip(s, d)]
            sq = [a + b * b for a, b in zip(sq, d)]
            sleep_ms(rate)
        self.mean = [_s / p for _s in s]
        self.stddev = [(_sq / p - (_s / p) ** 2) ** 0.5 for _s, _sq in zip(s, sq)]

    def gyro_raw_std_score(self):
        # Standardize gyro raw with standard score
        return [(x - m) / p for x, m, p in zip(detector.gyro_raw(), self.mean, self.stddev)]


if __name__ == '__main__':
    i2c = I2C(0, scl=Pin(SCL), sda=Pin(SDA), freq=400000)
    detector = RPY(i2c)
    detector.gyro_mean_sigma()

    while True:
        m = [0, 0, 0]
        print('Reset')
        for _ in range(200):
            # cumulative gyro
            m = [sum(a) for a in zip(m, detector.gyro_raw_std_score())]
            print(m)
            # trigger point 1000
            if any(a > 1000 for a in map(abs, m)):
                print('You move!')
            sleep_ms(100)

lofer
Posts: 13
Joined: Tue Mar 31, 2020 5:17 am

Re: Detection if device is moving (in motion) or

Post by lofer » Thu Dec 30, 2021 10:44 am

netsrac wrote:
Mon Dec 20, 2021 7:06 pm
The only problem is, that it still takes 3-5 min for the GPS to fix its position. But I guess there is no way around if I don’t want to let the GPS run all the time. And there is no way to do this without draining the battery to much when the car is parked…
I'm making a similar project and I decided to invest (a lot of) money into a better GPS and more sensitive antennae - I purchased a new Ublox ZED-F9P which gets a fix in less than 30 seconds in nearly any weather conditions. The another plus of that hardware setup is much more precise location as it allows more satellites and their bands to be observed.

netsrac
Posts: 17
Joined: Sun Jun 07, 2020 7:19 pm
Location: Berlin, Germany

Re: Detection if device is moving (in motion) or

Post by netsrac » Mon Jan 03, 2022 8:55 am

lofer wrote:
Thu Dec 30, 2021 10:44 am
I'm making a similar project and I decided to invest (a lot of) money into a better GPS and more sensitive antennae - I purchased a new Ublox ZED-F9P which gets a fix in less than 30 seconds in nearly any weather conditions. The another plus of that hardware setup is much more precise location as it allows more satellites and their bands to be observed.
Well, for simplicity I like to stay with the SIM7000 module I currently use. It's an LTE-Modem together with a GPS receiver from Waveshare. So only one serial device that can be configured via AT commands for both, modem and GPS

My biggest problem is not the time for a GPS fix, it's more the power consumption to get the fix. The GPS uses around 80mA on top of the ESP32. So I try to avoid turning on the GPS and the Modem. Especially because I like to check the status often to not loose any tracking information.

The current solution with the accelerator sensor seems quite promising.

lofer
Posts: 13
Joined: Tue Mar 31, 2020 5:17 am

Re: Detection if device is moving (in motion) or not

Post by lofer » Tue Jan 11, 2022 1:42 pm

OlivierLenoir wrote:
Tue Dec 21, 2021 9:57 pm
Hi Olivier, your approach looks very promising indeed, but I have colossal problems rewriting it for asyncio :cry: Are you able to give any hints how to make it async-able?

My current code never shows "you move" :(

Code: Select all

async def gyro_raw():
    #Read and unpack gyro raw data
    r = await unpack('>hhh', i2c.readfrom_mem(MPU6050_ADDR, MPU6050_GYRO_OUT, 6))
    return r

async def gyro():
    # Read and convert gyro to °/s
    g = await [a / 131 for a in gyro_raw()]
    return g

async def gyro_mean_sigma(rate=20, p=100):
    # Calculate sensor mean and standard deviation, sentor need to be static
    s = [0, 0, 0]
    sq = [0, 0, 0]
    for _ in range(p):
        d = gyro_raw()
        s = [a + b for a, b in zip(s, d)]
        sq = [a + b * b for a, b in zip(sq, d)]
        await asyncio.sleep_ms(rate)
    mean = [_s / p for _s in s]
    stddev = [(_sq / p - (_s / p) ** 2) ** 0.5 for _s, _sq in zip(s, sq)]
    return mean, stddev

async def gyro_raw_std_score(mean, stddev):
    # Standardize gyro raw with standard score
    g = await [(x - m) / p for x, m, p in zip(await gyro_raw(), mean, stddev)]
    return g
    async def accelo():
    #i2c = I2C(0, scl=Pin(SCL), sda=Pin(SDA), freq=400000)
    #detector = RPY(i2c)
    mean = [0, 0, 0]
    stddev = [1, 1, 1]

    # Wake sensor
    i2c.writeto_mem(MPU6050_ADDR, 0x6b, bytes([0]))
    await asyncio.sleep_ms(50)

    mean, stddev = await gyro_mean_sigma()

    while True:
        m = [0, 0, 0]
        print('Reset')
        for _ in range(200):
            # cumulative gyro
            m = [sum(a) for a in zip(m, await gyro_raw_std_score(mean, stddev))]
            #print(m)
            # trigger point 1000
            if any(a > 1000 for a in map(abs, m)):
                print('You move!')
            await asyncio.sleep_ms(100)
EDIT: I've just realised that I probably need to rewrite whole MPU6050 library for async, am I correct? :) :)

User avatar
pythoncoder
Posts: 5956
Joined: Fri Jul 18, 2014 8:01 am
Location: UK
Contact:

Re: Detection if device is moving (in motion) or not

Post by pythoncoder » Tue Jan 11, 2022 6:11 pm

This demo illustrates the use of MPU9150 with uasyncio. (MPU9150 is just a 6050 with a magnetometer).

Lines like these are wrong:

Code: Select all

async def gyro_raw():
    #Read and unpack gyro raw data
    r = await unpack('>hhh', i2c.readfrom_mem(MPU6050_ADDR, MPU6050_GYRO_OUT, 6))
    return r

async def gyro():
    # Read and convert gyro to °/s
    g = await [a / 131 for a in gyro_raw()]
    return g
You can only await an asynchronous function, not general synchronous code.

Since these routines will return "immediately" they should be written as conventional synchronous code. There is no point in declaring an asynchronous function unless at some point in the code it yields to the scheduler.

Code: Select all

def gyro_raw():
    #Read and unpack gyro raw data
    r = unpack('>hhh', i2c.readfrom_mem(MPU6050_ADDR, MPU6050_GYRO_OUT, 6))
    return r

def gyro():
    # Read and convert gyro to °/s
    g = [a / 131 for a in gyro_raw()]
    return g
gyro_mean_sigma looks OK to me but I think the rest should look like this:

Code: Select all

def gyro_raw_std_score(mean, stddev):
    # Standardize gyro raw with standard score
    g = [(x - m) / p for x, m, p in zip(await gyro_raw(), mean, stddev)]
    return g

async def accelo():
    #i2c = I2C(0, scl=Pin(SCL), sda=Pin(SDA), freq=400000)
    #detector = RPY(i2c)
    mean = [0, 0, 0]
    stddev = [1, 1, 1]

    # Wake sensor
    i2c.writeto_mem(MPU6050_ADDR, 0x6b, bytes([0]))
    await asyncio.sleep_ms(50)

    mean, stddev = await gyro_mean_sigma()

    while True:
        m = [0, 0, 0]
        print('Reset')
        for _ in range(200):
            # cumulative gyro
            m = [sum(a) for a in zip(m, gyro_raw_std_score(mean, stddev))]  # No await
            #print(m)
            # trigger point 1000
            if any(a > 1000 for a in map(abs, m)):
                print('You move!')
            await asyncio.sleep_ms(100)
I've probably missed something here, but I think you're on the right track. There is no need to rewrite the driver.
Peter Hinch
Index to my micropython libraries.

User avatar
OlivierLenoir
Posts: 126
Joined: Fri Dec 13, 2019 7:10 pm
Location: Picardie, FR

Re: Detection if device is moving (in motion) or not

Post by OlivierLenoir » Wed Jan 12, 2022 7:54 pm

For asyncio, Peter alias pythoncoder is the reference.
I'm coding a driver for MPU6050 using interrupt pin. With the interrupt and setting at 15.625Hz I get all updated data without losing any. I've implemented a complementary filter.
Read the code and let know if you have questions.

Code: Select all

# Author: Olivier Lenoir - <olivier.len02@gmail.com>
# Created: 2021-12-29 14:06:10
# License: MIT, Copyright (c) 2021 Olivier Lenoir
# Language: MicroPython v1.17
# Project: MPU-6050
# Description: GY-521 MPU-6050 3-axis gyroscope and acceleration sensor
# picocom /dev/ttyUSB0 -b 115200 | tee MPU6050raw.dat


from machine import Pin, I2C
from ustruct import unpack
from utime import sleep_ms, ticks_us, ticks_diff
from math import sin, asin, radians, degrees

SDA = const(21)
SCL = const(22)
INT_PIN = const(15)

MPU6050_ADDR = const(0x68)

MPU6050_SMPRT_DIV = const(0x19)
MPU6050_CONFIG = const(0x1a)
MPU6050_INT_ENABLE = const(0x38)
MPU6050_ACCEL_OUT = const(0x3b)
MPU6050_TEMP_OUT = const(0x41)
MPU6050_GYRO_OUT = const(0x43)
MPU6050_PWR_MGMT_1 = const(0x6b)


class MPU6050(object):
    def __init__(self, ic2, int_pin, addr=MPU6050_ADDR):
        self.i2c = i2c
        self.int_pin = Pin(int_pin, Pin.IN, Pin.PULL_UP)
        self.addr = addr
        self.angle_pitch = 0
        self.angle_roll = 0
        self.angle_pitch_output = 0
        self.angle_roll_output = 0
        self.set_gyro_angles = False
        self._init()
        self.calibrate()
        self.irq_start(self.gyro)

    def _init(self):
        # Init MPU6050
        # Power sensor
        self.i2c.writeto_mem(MPU6050_ADDR, MPU6050_PWR_MGMT_1, bytes([0]))
        # Set Digital Low Pass Filter (DLPF) 260Hz, Output rate 1kHz
        self.i2c.writeto_mem(MPU6050_ADDR, MPU6050_CONFIG, bytes([6]))
        # Set Sample Rate Divider at 64
        # Output rate: 1kHz / 64 = 15.625Hz
        # Output period: 1 / 15.625Hz = 64ms
        self.i2c.writeto_mem(MPU6050_ADDR, MPU6050_SMPRT_DIV, bytes([63]))
        self.period = 64 / 1000  # one measure every period in second
        self.gyro_lsb = 131  # Gyro LSB Sensitivity
        self.gyro_conv = self.period / self.gyro_lsb
        # Interrupt enable on data ready
        self.i2c.writeto_mem(MPU6050_ADDR, MPU6050_INT_ENABLE, bytes([1]))
        sleep_ms(200)

    def raw(self, irq=None):
        # Get accel, temp and gyro raw data
        return unpack('>hhhhhhh', self.i2c.readfrom_mem(self.addr, MPU6050_ACCEL_OUT, 14))

    def raw_stdscr(self, irq=None):
        # Get accel and gyro raw data normalize with standard score
        raw = list(self.raw())
        raw.pop(3)  # Pop temperature
        # Raw accel and standard score gyro
        return raw[0:3] + [(r - m) / s for r, m, s in zip(raw[3:6], self.mean[3:6], self.stddev[3:6])]

    def calibrate(self, sample=200):
        # Calculate mean and standard deviation for accel and gyro
        self._sample = 0
        self.sample = sample
        self.sum = self.sum2 = [0, 0, 0, 0, 0, 0]
        self.irq_start(self.mean_stddev)
        print('Calibrating', end='')
        while self._sample < self.sample:
            print('.', end='')
            sleep_ms(800)
        print()
        print('Calibration completed')

    def mean_stddev(self, irq=None):
        # Calculate mean and standard deviation for accel and gyro
        raw_data = list(self.raw())
        raw_data.pop(3)  # Pop temperature
        self.sum = [rd + s for rd, s in zip(raw_data, self.sum)]
        self.sum2 = [rd ** 2 + s2 for rd, s2 in zip(raw_data, self.sum2)]
        self._sample += 1
        if self._sample >= self.sample:
            self.irq_stop()
            p = self._sample
            self.mean = [s / p for s in self.sum]
            self.stddev = [(s2 / p - m ** 2) ** 0.5 for m, s2 in zip(self.mean, self.sum2)]

    def gyro(self, irq=None):
        # update gyro pitch and roll
        raw_stdscr = self.raw_stdscr()
        accel_x, accel_y, accel_z, gyro_x, gyro_y, gyro_z = raw_stdscr
        accel_vector = sum(a ** 2 for a in raw_stdscr[0:3]) ** 0.5
        # update gyro
        self.angle_pitch += gyro_x * self.gyro_conv
        self.angle_roll += gyro_y * self.gyro_conv
        yawed = sin(radians(gyro_z * self.gyro_conv))
        self.angle_pitch += self.angle_roll * yawed
        self.angle_roll -= self.angle_pitch * yawed
        # Accelerometer angle
        angle_pitch_accel = degrees(asin(accel_y / accel_vector))
        angle_roll_accel = - degrees(asin(accel_x / accel_vector))
        # calibration
        # angle_pitch_accel -= 0
        # angle_roll_accel -= 0
        if self.set_gyro_angles:
            # Drift correction
            self.angle_pitch = self.angle_pitch * 0.98 + angle_pitch_accel * 0.02
            self.angle_roll = self.angle_roll * 0.98 + angle_roll_accel * 0.02
        else:
            # Gyro origin based on gravity
            self.angle_pitch = angle_pitch_accel
            self.angle_roll = angle_roll_accel
            self.set_gyro_angles = True
        # Complementary filter
        self.angle_pitch_output = self.angle_pitch_output * 0.9 + self.angle_pitch * 0.1
        self.angle_roll_output = self.angle_roll_output * 0.9 + self.angle_roll * 0.1

    def irq_start(self, func):
        # Start IRQ
        self.int_pin.irq(trigger=Pin.IRQ_FALLING, handler=func)

    def irq_stop(self):
        # Stop IRQ
        self.int_pin.irq(trigger=Pin.IRQ_FALLING)

    def print_raw(self, irq=None):
        print(self.raw())


if __name__ == '__main__':
    i2c = I2C(0, scl=Pin(SCL), sda=Pin(SDA), freq=400000)
    detector = MPU6050(i2c, INT_PIN)
    print('Mean:', detector.mean)
    print('StdDev:', detector.stddev)
    while True:
        print(detector.angle_pitch_output, detector.angle_roll_output)
        sleep_ms(500)

netsrac
Posts: 17
Joined: Sun Jun 07, 2020 7:19 pm
Location: Berlin, Germany

Re: Detection if device is moving (in motion) or not

Post by netsrac » Thu Jan 13, 2022 8:10 am

OlivierLenoir wrote:
Wed Jan 12, 2022 7:54 pm
For asyncio, Peter alias pythoncoder is the reference.
I'm coding a driver for MPU6050 using interrupt pin. With the interrupt and setting at 15.625Hz I get all updated data without losing any. I've implemented a complementary filter.
Stupid question: That's a hardware interrupt of the MPU6050 sensor connected to the Microcontroller?

Never found any references how to use this.

lofer
Posts: 13
Joined: Tue Mar 31, 2020 5:17 am

Re: Detection if device is moving (in motion) or not

Post by lofer » Thu Jan 13, 2022 9:29 am

Now everything is awesome! Thanks for the new driver, Olivier, and for asyncio comments, Peter.

This attached code seems to work in asyncio, I have not modified the class at all, and I am OK with initial calibration which does not have to run concurrently with other code. As I restart my micro-controller very rarely, I can wait these few seconds.

Replying last question: you need to connect INT pin on the MPU6050 device to any of digital pins of your micro-controller. In the code it is "Y3" pin.

Code: Select all


async def ismoving():
    print('Mean:', detector.mean)
    print('StdDev:', detector.stddev)
    p, r = 0, 0
    while True:
        olp, olr = p, r
        p, r = detector.angle_pitch_output, detector.angle_roll_output
        dp, dr = abs(olp - p), abs(olr - r)
        #print([p, r])
        #print([dp, dr])
        if any(a > 0.03 for a in [dp, dr]):
            print('You move!')
        await asyncio.sleep_ms(300)

if __name__ == '__main__':
     #i2c = I2C(0, scl=Pin(SCL), sda=Pin(SDA), freq=400000)
     detector = MPU6050(i2c, 'Y3')
     asyncio.run(ismoving())



If you have any suggestions to improve this approach, please share :)

Post Reply