Detection if device is moving (in motion) or not
Re: Detection if device is moving (in motion) or
I‘m also experimenting at the moment. Unfortunately I can not drive and code at the same time. So I do some coding and use a color LED to display various levels.
Here is what I‘m currently testing: I take measurements from the accelerosensor and calculate the difference from the last measurement. Sleep time between each measurement in 20ms. Depending on the difference value I add +1, +2 or +4 to a variable. After a total of 10 seconds I return this. Now I evaluate the total number of hits and try to come up with a pattern.
At least I can avoid triggering it when slamming the door or the trunk as these are only shot movements and do not count enough in total.
Starting at a traffic light works quite well…what does not work is a slow and soft start without a lot of movement like starting in a parking lot. My „am I driving trigger“ doesn’t get hit before I‘m on the road-
But I guess that’s a way that could work for me.
So my plan now is: wake-up from deep sleep, measure 10 seconds for movement. If no movement is detected, the goto deepsleep again for 60 seconds. If I detect movement. I’ll start the LTE Modem and the GPS. The only problem is, that it still takes 3-5 min for the GPS to fix its position. But I guess there is no way around if I don’t want to let the GPS run all the time. And there is no way to do this without draining the battery to much when the car is parked…
Here is what I‘m currently testing: I take measurements from the accelerosensor and calculate the difference from the last measurement. Sleep time between each measurement in 20ms. Depending on the difference value I add +1, +2 or +4 to a variable. After a total of 10 seconds I return this. Now I evaluate the total number of hits and try to come up with a pattern.
At least I can avoid triggering it when slamming the door or the trunk as these are only shot movements and do not count enough in total.
Starting at a traffic light works quite well…what does not work is a slow and soft start without a lot of movement like starting in a parking lot. My „am I driving trigger“ doesn’t get hit before I‘m on the road-
But I guess that’s a way that could work for me.
So my plan now is: wake-up from deep sleep, measure 10 seconds for movement. If no movement is detected, the goto deepsleep again for 60 seconds. If I detect movement. I’ll start the LTE Modem and the GPS. The only problem is, that it still takes 3-5 min for the GPS to fix its position. But I guess there is no way around if I don’t want to let the GPS run all the time. And there is no way to do this without draining the battery to much when the car is parked…
- OlivierLenoir
- Posts: 126
- Joined: Fri Dec 13, 2019 7:10 pm
- Location: Picardie, FR
Re: Detection if device is moving (in motion) or not
When reading MPU6050's raw data, while the sensor is static on a table, we can observe a Gaussian distribution (blue histogram). We can standardize those data using standard score (orange histogram).
This simple standardization will be sufficient to code a simple motion detector with a MPU6050.
I've simulated it, with the here bellow Python 3.7.3 code.
This simple standardization will be sufficient to code a simple motion detector with a MPU6050.
I've simulated it, with the here bellow Python 3.7.3 code.
Code: Select all
"""
Created: 2021-12-21 21:25:34
License: MIT, Copyright (c) 2021 Olivier Lenoir
Language: Python 3.7.3
Project: Test standard score
"""
from random import gauss
from matplotlib import pyplot as plt
# mean
MU = 121
# standard deviation
SIGMA = 15
# Gaussian distribution data set
data = [gauss(MU, SIGMA) for _ in range(500)]
# Standardize data set with standard score
data_standard_score = [(x - MU) / SIGMA for x in data]
plt.hist(data, 50)
plt.hist(data_standard_score, 50)
plt.show()
Last edited by OlivierLenoir on Tue Dec 28, 2021 9:50 am, edited 2 times in total.
Olivier Lenoir
https://gitlab.com/olivierlenoir
https://gitlab.com/olivierlenoir
- OlivierLenoir
- Posts: 126
- Joined: Fri Dec 13, 2019 7:10 pm
- Location: Picardie, FR
Re: Detection if device is moving (in motion) or not
It's now time to apply standard score to MPU6050 gyro sensor to detect if we move. I reset cumulative gyro every 200 measures, it's because this method is not perfect, but to detect if we move it's acceptable.
The same approach can be apply to acceleration sensor.
The same approach can be apply to acceleration sensor.
Code: Select all
# Created: 2021-12-19 11:06:43
# License: MIT, Copyright (c) 2021 Olivier Lenoir
# Language: MicroPython v1.17
# Project: Roll Pitch Yaw detector
# Description: GY-521 MPU-6050 3-axis gyroscope and acceleration sensor
# Detect if Roll Pitch Yaw is greater than a trigger point
from machine import Pin, I2C
from ustruct import unpack
from utime import sleep_ms
SDA = const(21)
SCL = const(22)
MPU6050_ADDR = const(0x68)
MPU6050_TEMP_OUT = const(0x41)
MPU6050_ACCEL_OUT = const(0x3b)
MPU6050_GYRO_OUT = const(0x43)
class RPY(object):
def __init__(self, ic2, addr=MPU6050_ADDR):
self.i2c = i2c
self.addr = addr
self.mean = [0, 0, 0]
self.stddev = [1, 1, 1]
self._init()
def _init(self):
# Wake sensor
self.i2c.writeto_mem(MPU6050_ADDR, 0x6b, bytes([0]))
sleep_ms(50)
def gyro_raw(self):
#Read and unpack gyro raw data
return unpack('>hhh', self.i2c.readfrom_mem(self.addr, MPU6050_GYRO_OUT, 6))
def gyro(self):
# Read and convert gyro to °/s
return [a / 131 for a in self.gyro_raw()]
def gyro_mean_sigma(self, rate=20, p=100):
# Calculate sensor mean and standard deviation, sentor need to be static
s = [0, 0, 0]
sq = [0, 0, 0]
for _ in range(p):
d = self.gyro_raw()
s = [a + b for a, b in zip(s, d)]
sq = [a + b * b for a, b in zip(sq, d)]
sleep_ms(rate)
self.mean = [_s / p for _s in s]
self.stddev = [(_sq / p - (_s / p) ** 2) ** 0.5 for _s, _sq in zip(s, sq)]
def gyro_raw_std_score(self):
# Standardize gyro raw with standard score
return [(x - m) / p for x, m, p in zip(detector.gyro_raw(), self.mean, self.stddev)]
if __name__ == '__main__':
i2c = I2C(0, scl=Pin(SCL), sda=Pin(SDA), freq=400000)
detector = RPY(i2c)
detector.gyro_mean_sigma()
while True:
m = [0, 0, 0]
print('Reset')
for _ in range(200):
# cumulative gyro
m = [sum(a) for a in zip(m, detector.gyro_raw_std_score())]
print(m)
# trigger point 1000
if any(a > 1000 for a in map(abs, m)):
print('You move!')
sleep_ms(100)
Olivier Lenoir
https://gitlab.com/olivierlenoir
https://gitlab.com/olivierlenoir
Re: Detection if device is moving (in motion) or
I'm making a similar project and I decided to invest (a lot of) money into a better GPS and more sensitive antennae - I purchased a new Ublox ZED-F9P which gets a fix in less than 30 seconds in nearly any weather conditions. The another plus of that hardware setup is much more precise location as it allows more satellites and their bands to be observed.
Re: Detection if device is moving (in motion) or
Well, for simplicity I like to stay with the SIM7000 module I currently use. It's an LTE-Modem together with a GPS receiver from Waveshare. So only one serial device that can be configured via AT commands for both, modem and GPSlofer wrote: ↑Thu Dec 30, 2021 10:44 amI'm making a similar project and I decided to invest (a lot of) money into a better GPS and more sensitive antennae - I purchased a new Ublox ZED-F9P which gets a fix in less than 30 seconds in nearly any weather conditions. The another plus of that hardware setup is much more precise location as it allows more satellites and their bands to be observed.
My biggest problem is not the time for a GPS fix, it's more the power consumption to get the fix. The GPS uses around 80mA on top of the ESP32. So I try to avoid turning on the GPS and the Modem. Especially because I like to check the status often to not loose any tracking information.
The current solution with the accelerator sensor seems quite promising.
Re: Detection if device is moving (in motion) or not
Hi Olivier, your approach looks very promising indeed, but I have colossal problems rewriting it for asyncio Are you able to give any hints how to make it async-able?
My current code never shows "you move"
Code: Select all
async def gyro_raw():
#Read and unpack gyro raw data
r = await unpack('>hhh', i2c.readfrom_mem(MPU6050_ADDR, MPU6050_GYRO_OUT, 6))
return r
async def gyro():
# Read and convert gyro to °/s
g = await [a / 131 for a in gyro_raw()]
return g
async def gyro_mean_sigma(rate=20, p=100):
# Calculate sensor mean and standard deviation, sentor need to be static
s = [0, 0, 0]
sq = [0, 0, 0]
for _ in range(p):
d = gyro_raw()
s = [a + b for a, b in zip(s, d)]
sq = [a + b * b for a, b in zip(sq, d)]
await asyncio.sleep_ms(rate)
mean = [_s / p for _s in s]
stddev = [(_sq / p - (_s / p) ** 2) ** 0.5 for _s, _sq in zip(s, sq)]
return mean, stddev
async def gyro_raw_std_score(mean, stddev):
# Standardize gyro raw with standard score
g = await [(x - m) / p for x, m, p in zip(await gyro_raw(), mean, stddev)]
return g
async def accelo():
#i2c = I2C(0, scl=Pin(SCL), sda=Pin(SDA), freq=400000)
#detector = RPY(i2c)
mean = [0, 0, 0]
stddev = [1, 1, 1]
# Wake sensor
i2c.writeto_mem(MPU6050_ADDR, 0x6b, bytes([0]))
await asyncio.sleep_ms(50)
mean, stddev = await gyro_mean_sigma()
while True:
m = [0, 0, 0]
print('Reset')
for _ in range(200):
# cumulative gyro
m = [sum(a) for a in zip(m, await gyro_raw_std_score(mean, stddev))]
#print(m)
# trigger point 1000
if any(a > 1000 for a in map(abs, m)):
print('You move!')
await asyncio.sleep_ms(100)
- pythoncoder
- Posts: 5956
- Joined: Fri Jul 18, 2014 8:01 am
- Location: UK
- Contact:
Re: Detection if device is moving (in motion) or not
This demo illustrates the use of MPU9150 with uasyncio. (MPU9150 is just a 6050 with a magnetometer).
Lines like these are wrong:
You can only await an asynchronous function, not general synchronous code.
Since these routines will return "immediately" they should be written as conventional synchronous code. There is no point in declaring an asynchronous function unless at some point in the code it yields to the scheduler.
gyro_mean_sigma looks OK to me but I think the rest should look like this:
I've probably missed something here, but I think you're on the right track. There is no need to rewrite the driver.
Lines like these are wrong:
Code: Select all
async def gyro_raw():
#Read and unpack gyro raw data
r = await unpack('>hhh', i2c.readfrom_mem(MPU6050_ADDR, MPU6050_GYRO_OUT, 6))
return r
async def gyro():
# Read and convert gyro to °/s
g = await [a / 131 for a in gyro_raw()]
return g
Since these routines will return "immediately" they should be written as conventional synchronous code. There is no point in declaring an asynchronous function unless at some point in the code it yields to the scheduler.
Code: Select all
def gyro_raw():
#Read and unpack gyro raw data
r = unpack('>hhh', i2c.readfrom_mem(MPU6050_ADDR, MPU6050_GYRO_OUT, 6))
return r
def gyro():
# Read and convert gyro to °/s
g = [a / 131 for a in gyro_raw()]
return g
Code: Select all
def gyro_raw_std_score(mean, stddev):
# Standardize gyro raw with standard score
g = [(x - m) / p for x, m, p in zip(await gyro_raw(), mean, stddev)]
return g
async def accelo():
#i2c = I2C(0, scl=Pin(SCL), sda=Pin(SDA), freq=400000)
#detector = RPY(i2c)
mean = [0, 0, 0]
stddev = [1, 1, 1]
# Wake sensor
i2c.writeto_mem(MPU6050_ADDR, 0x6b, bytes([0]))
await asyncio.sleep_ms(50)
mean, stddev = await gyro_mean_sigma()
while True:
m = [0, 0, 0]
print('Reset')
for _ in range(200):
# cumulative gyro
m = [sum(a) for a in zip(m, gyro_raw_std_score(mean, stddev))] # No await
#print(m)
# trigger point 1000
if any(a > 1000 for a in map(abs, m)):
print('You move!')
await asyncio.sleep_ms(100)
Peter Hinch
Index to my micropython libraries.
Index to my micropython libraries.
- OlivierLenoir
- Posts: 126
- Joined: Fri Dec 13, 2019 7:10 pm
- Location: Picardie, FR
Re: Detection if device is moving (in motion) or not
For asyncio, Peter alias pythoncoder is the reference.
I'm coding a driver for MPU6050 using interrupt pin. With the interrupt and setting at 15.625Hz I get all updated data without losing any. I've implemented a complementary filter.
Read the code and let know if you have questions.
I'm coding a driver for MPU6050 using interrupt pin. With the interrupt and setting at 15.625Hz I get all updated data without losing any. I've implemented a complementary filter.
Read the code and let know if you have questions.
Code: Select all
# Author: Olivier Lenoir - <olivier.len02@gmail.com>
# Created: 2021-12-29 14:06:10
# License: MIT, Copyright (c) 2021 Olivier Lenoir
# Language: MicroPython v1.17
# Project: MPU-6050
# Description: GY-521 MPU-6050 3-axis gyroscope and acceleration sensor
# picocom /dev/ttyUSB0 -b 115200 | tee MPU6050raw.dat
from machine import Pin, I2C
from ustruct import unpack
from utime import sleep_ms, ticks_us, ticks_diff
from math import sin, asin, radians, degrees
SDA = const(21)
SCL = const(22)
INT_PIN = const(15)
MPU6050_ADDR = const(0x68)
MPU6050_SMPRT_DIV = const(0x19)
MPU6050_CONFIG = const(0x1a)
MPU6050_INT_ENABLE = const(0x38)
MPU6050_ACCEL_OUT = const(0x3b)
MPU6050_TEMP_OUT = const(0x41)
MPU6050_GYRO_OUT = const(0x43)
MPU6050_PWR_MGMT_1 = const(0x6b)
class MPU6050(object):
def __init__(self, ic2, int_pin, addr=MPU6050_ADDR):
self.i2c = i2c
self.int_pin = Pin(int_pin, Pin.IN, Pin.PULL_UP)
self.addr = addr
self.angle_pitch = 0
self.angle_roll = 0
self.angle_pitch_output = 0
self.angle_roll_output = 0
self.set_gyro_angles = False
self._init()
self.calibrate()
self.irq_start(self.gyro)
def _init(self):
# Init MPU6050
# Power sensor
self.i2c.writeto_mem(MPU6050_ADDR, MPU6050_PWR_MGMT_1, bytes([0]))
# Set Digital Low Pass Filter (DLPF) 260Hz, Output rate 1kHz
self.i2c.writeto_mem(MPU6050_ADDR, MPU6050_CONFIG, bytes([6]))
# Set Sample Rate Divider at 64
# Output rate: 1kHz / 64 = 15.625Hz
# Output period: 1 / 15.625Hz = 64ms
self.i2c.writeto_mem(MPU6050_ADDR, MPU6050_SMPRT_DIV, bytes([63]))
self.period = 64 / 1000 # one measure every period in second
self.gyro_lsb = 131 # Gyro LSB Sensitivity
self.gyro_conv = self.period / self.gyro_lsb
# Interrupt enable on data ready
self.i2c.writeto_mem(MPU6050_ADDR, MPU6050_INT_ENABLE, bytes([1]))
sleep_ms(200)
def raw(self, irq=None):
# Get accel, temp and gyro raw data
return unpack('>hhhhhhh', self.i2c.readfrom_mem(self.addr, MPU6050_ACCEL_OUT, 14))
def raw_stdscr(self, irq=None):
# Get accel and gyro raw data normalize with standard score
raw = list(self.raw())
raw.pop(3) # Pop temperature
# Raw accel and standard score gyro
return raw[0:3] + [(r - m) / s for r, m, s in zip(raw[3:6], self.mean[3:6], self.stddev[3:6])]
def calibrate(self, sample=200):
# Calculate mean and standard deviation for accel and gyro
self._sample = 0
self.sample = sample
self.sum = self.sum2 = [0, 0, 0, 0, 0, 0]
self.irq_start(self.mean_stddev)
print('Calibrating', end='')
while self._sample < self.sample:
print('.', end='')
sleep_ms(800)
print()
print('Calibration completed')
def mean_stddev(self, irq=None):
# Calculate mean and standard deviation for accel and gyro
raw_data = list(self.raw())
raw_data.pop(3) # Pop temperature
self.sum = [rd + s for rd, s in zip(raw_data, self.sum)]
self.sum2 = [rd ** 2 + s2 for rd, s2 in zip(raw_data, self.sum2)]
self._sample += 1
if self._sample >= self.sample:
self.irq_stop()
p = self._sample
self.mean = [s / p for s in self.sum]
self.stddev = [(s2 / p - m ** 2) ** 0.5 for m, s2 in zip(self.mean, self.sum2)]
def gyro(self, irq=None):
# update gyro pitch and roll
raw_stdscr = self.raw_stdscr()
accel_x, accel_y, accel_z, gyro_x, gyro_y, gyro_z = raw_stdscr
accel_vector = sum(a ** 2 for a in raw_stdscr[0:3]) ** 0.5
# update gyro
self.angle_pitch += gyro_x * self.gyro_conv
self.angle_roll += gyro_y * self.gyro_conv
yawed = sin(radians(gyro_z * self.gyro_conv))
self.angle_pitch += self.angle_roll * yawed
self.angle_roll -= self.angle_pitch * yawed
# Accelerometer angle
angle_pitch_accel = degrees(asin(accel_y / accel_vector))
angle_roll_accel = - degrees(asin(accel_x / accel_vector))
# calibration
# angle_pitch_accel -= 0
# angle_roll_accel -= 0
if self.set_gyro_angles:
# Drift correction
self.angle_pitch = self.angle_pitch * 0.98 + angle_pitch_accel * 0.02
self.angle_roll = self.angle_roll * 0.98 + angle_roll_accel * 0.02
else:
# Gyro origin based on gravity
self.angle_pitch = angle_pitch_accel
self.angle_roll = angle_roll_accel
self.set_gyro_angles = True
# Complementary filter
self.angle_pitch_output = self.angle_pitch_output * 0.9 + self.angle_pitch * 0.1
self.angle_roll_output = self.angle_roll_output * 0.9 + self.angle_roll * 0.1
def irq_start(self, func):
# Start IRQ
self.int_pin.irq(trigger=Pin.IRQ_FALLING, handler=func)
def irq_stop(self):
# Stop IRQ
self.int_pin.irq(trigger=Pin.IRQ_FALLING)
def print_raw(self, irq=None):
print(self.raw())
if __name__ == '__main__':
i2c = I2C(0, scl=Pin(SCL), sda=Pin(SDA), freq=400000)
detector = MPU6050(i2c, INT_PIN)
print('Mean:', detector.mean)
print('StdDev:', detector.stddev)
while True:
print(detector.angle_pitch_output, detector.angle_roll_output)
sleep_ms(500)
Olivier Lenoir
https://gitlab.com/olivierlenoir
https://gitlab.com/olivierlenoir
Re: Detection if device is moving (in motion) or not
Stupid question: That's a hardware interrupt of the MPU6050 sensor connected to the Microcontroller?OlivierLenoir wrote: ↑Wed Jan 12, 2022 7:54 pmFor asyncio, Peter alias pythoncoder is the reference.
I'm coding a driver for MPU6050 using interrupt pin. With the interrupt and setting at 15.625Hz I get all updated data without losing any. I've implemented a complementary filter.
Never found any references how to use this.
Re: Detection if device is moving (in motion) or not
Now everything is awesome! Thanks for the new driver, Olivier, and for asyncio comments, Peter.
This attached code seems to work in asyncio, I have not modified the class at all, and I am OK with initial calibration which does not have to run concurrently with other code. As I restart my micro-controller very rarely, I can wait these few seconds.
Replying last question: you need to connect INT pin on the MPU6050 device to any of digital pins of your micro-controller. In the code it is "Y3" pin.
If you have any suggestions to improve this approach, please share
This attached code seems to work in asyncio, I have not modified the class at all, and I am OK with initial calibration which does not have to run concurrently with other code. As I restart my micro-controller very rarely, I can wait these few seconds.
Replying last question: you need to connect INT pin on the MPU6050 device to any of digital pins of your micro-controller. In the code it is "Y3" pin.
Code: Select all
async def ismoving():
print('Mean:', detector.mean)
print('StdDev:', detector.stddev)
p, r = 0, 0
while True:
olp, olr = p, r
p, r = detector.angle_pitch_output, detector.angle_roll_output
dp, dr = abs(olp - p), abs(olr - r)
#print([p, r])
#print([dp, dr])
if any(a > 0.03 for a in [dp, dr]):
print('You move!')
await asyncio.sleep_ms(300)
if __name__ == '__main__':
#i2c = I2C(0, scl=Pin(SCL), sda=Pin(SDA), freq=400000)
detector = MPU6050(i2c, 'Y3')
asyncio.run(ismoving())
If you have any suggestions to improve this approach, please share