(Also the pins in the code listed below aren't hooked up to anything, so please don't mind the empty packets I'm sending using the notify)
I've tested the below code by doing:
Code: Select all
rshell
connect serial /dev/tty.usbserial-0001
cp ./* /pyboard/
repl
MicroPython v1.15 on 2021-04-18; ESP32 module with ESP32
Type "help()" for more information.
>>> import main
ESP32 output:
Code: Select all
awaiting connection...
setting connected flag...
connected...
notifying...
....
notifying...
clearing connected flag...
disconnected...
awaiting connection...
Code: Select all
nRF Connect, 2021-04-20
OwO (F0:08:D1:D2:2F:2A)
I 07:54:02.662 [Server] Server started
I 07:54:03.441 [Server] Device with address F0:08:D1:D2:2F:2A connected
I 07:54:03.451 Connected to F0:08:D1:D2:2F:2A
I 07:54:04.118 Connection parameters updated (interval: 7.5ms, latency: 0, timeout: 5000ms)
I 07:54:04.205 Services discovered
I 07:54:04.313 Connection parameters updated (interval: 45.0ms, latency: 0, timeout: 5000ms)
I 07:54:04.380 Notification received from 00002a5b-0000-1000-8000-00805f9b34fb, value: (0x) 03-00-00-00-00-00-00-00-00-00-00-00-00
......
I 07:54:16.736 Notification received from 00002a5b-0000-1000-8000-00805f9b34fb, value: (0x) 03-00-00-00-00-00-00-00-00-00-00-00-00
E 07:54:18.719 Error 22 (0x16): GATT CONN TERMINATE LOCAL HOST
I 07:54:18.719 Disconnected
I 07:54:18.720 [Server] Device disconnected
Trying to reconnect to my ESP32 using the nRF Connect Android app when prevouisly having disconnecting using said app
ESP32 output:
Doesn't change in regards to the output listed above. Why?
Which is totally not what I would have expected, I would have expected it to start printing out "notiyfing..." again.
Exiting the REPL, and then reconnecting to the REPL using rshell, causes the ESP32 to lock-up, not being able to connect to the REPL.
nRF Connect output:
Doesn't change in regards to the output listed above. Why?
CODE
main.py
Code: Select all
from bluetooth.csc_device import CSCDevice
import uasyncio
from machine import Pin
def set_global_exception():
def handle_exception(loop, context):
import sys
sys.print_exception(context["exception"])
sys.exit()
loop = uasyncio.get_event_loop()
loop.set_exception_handler(handle_exception)
async def notifier(csc):
while True:
print("awaiting connection...")
await csc.connected.wait()
print("connected...")
while csc.connected.is_set():
print("notifying...")
csc.notify()
await uasyncio.sleep_ms(100)
print("disconnected...")
async def main():
set_global_exception()
csc = CSCDevice("OwO", Pin(13), Pin(14))
await uasyncio.create_task(notifier(csc))
uasyncio.run(main()))
Code: Select all
# message.py
# Now uses ThreadSafeFlag for efficiency
# Copyright (c) 2018-2021 Peter Hinch
# Released under the MIT License (MIT) - see LICENSE file
# Usage:
# from primitives.message import Message
try:
import uasyncio as asyncio
except ImportError:
import asyncio
# A coro waiting on a message issues await message
# A coro or hard/soft ISR raising the message issues.set(payload)
# .clear() should be issued by at least one waiting task and before
# next event.
class Message(asyncio.ThreadSafeFlag):
def __init__(self, _=0): # Arg: poll interval. Compatibility with old code.
self._evt = asyncio.Event()
self._data = None # Message
self._state = False # Ensure only one task waits on ThreadSafeFlag
self._is_set = False # For .is_set()
super().__init__()
def clear(self): # At least one task must call clear when scheduled
self._state = False
self._is_set = False
def __iter__(self):
yield from self.wait()
return self._data
async def wait(self):
if self._state: # A task waits on ThreadSafeFlag
await self._evt.wait() # Wait on event
else: # First task to wait
self._state = True
# Ensure other tasks see updated ._state before they wait
await asyncio.sleep_ms(0)
await super().wait() # Wait on ThreadSafeFlag
self._evt.set()
self._evt.clear()
return self._data
def set(self, data=None): # Can be called from a hard ISR
self._data = data
self._is_set = True
super().set()
def is_set(self):
return self._is_set
def value(self):
return self._data
Code: Select all
from micropython import const
from machine import Pin
from bluetooth.advertisement import Advertisement
from threading.message import Message
import uasyncio
import ubluetooth
from utime import ticks_us
from utime import ticks_diff
__IRQ_CENTRAL_CONNECT = 0x01
__IRQ_CENTRAL_DISCONNECT = 0x02
__CSC_SERVICE_UUID = ubluetooth.UUID(0x1816)
__CSC_MEASUREMENT_CHAR_UUID = (
ubluetooth.UUID(0x2A5B),
ubluetooth.FLAG_NOTIFY,
)
__CSC_FEATURE_CHAR_UUID = (
ubluetooth.UUID(0x2A5C),
ubluetooth.FLAG_READ,
)
__CSC_SERVICE = (
__CSC_SERVICE_UUID,
(__CSC_MEASUREMENT_CHAR_UUID, __CSC_FEATURE_CHAR_UUID),
)
class CSCDevice:
def __init__(self, name, wheel_pin, crank_pin):
self.__ble = ubluetooth.BLE()
self.__ble.active(True)
((self.__measurement, self.__feature),) = self.__ble.gatts_register_services((__CSC_SERVICE,))
advertisement = Advertisement(
name=name,
services=[__CSC_SERVICE_UUID]
)
self.__ble.gap_advertise(
150000,
adv_data=advertisement.byte_array()
)
self.__connection = None
self.connected = Message()
self.__notify_task = None
self.__wheel_revolutions = 0
self.__ticks_last_wheel_event = None
self.__crank_revolutions = 0
self.__ticks_last_crank_event = None
self.__ble.irq(self.__on_bluetooth_event)
wheel_pin.irq(self.__on_wheel_rotation, Pin.IRQ_FALLING)
crank_pin.irq(self.__on_crank_rotation, Pin.IRQ_FALLING)
def __on_bluetooth_event(self, event, data):
if event == __IRQ_CENTRAL_CONNECT:
connection, addr_type, addr = data
if self.__connection is None:
self.__connection = connection
self.__reset()
print("setting connected flag...")
self.connected.set()
elif event == __IRQ_CENTRAL_DISCONNECT:
connection, addr_type, addr = data
if self.__connection == connection:
self.__connection = None
self.__reset()
print("clearing connected flag...")
self.connected.clear()
def __on_wheel_rotation(_):
if self.__connection is not None:
self.__wheel_revolutions += 1
self.__ticks_last_wheel_event = ticks_us()
def __on_crank_rotation(_):
if self.__connection is not None:
self.__crank_revolutions += 1
self.__ticks_last_crank_event = ticks_us()
def __reset(self):
self.__wheel_revolutions = 0
self.__ticks_last_wheel_event = None
self.__crank_revolutions = 0
self.__ticks_last_crank_event = None
def notify(self):
now = ticks_us()
csc_flags = int(0x03).to_bytes(1, "big")
csc_cummalative_wheel_revolutions = self.__wheel_revolutions.to_bytes(4, "big")
csc_wheel_ticks_since_last_event = int(0).to_bytes(2, "big")
csc_cummalative_crank_revolutions = self.__crank_revolutions.to_bytes(4, "big")
csc_crank_ticks_since_last_event = int(0).to_bytes(2, "big")
if self.__ticks_last_wheel_event is not None:
csc_wheel_ticks_since_last_event = int(ticks_diff(now, self.__ticks_last_wheel_event) / 976.5625)
if self.__ticks_last_crank_event is not None:
csc_crank_ticks_since_last_event = int(ticks_diff(now, self.__ticks_last_crank_event) / 976.5625)
data = csc_flags + csc_cummalative_wheel_revolutions + csc_wheel_ticks_since_last_event + csc_cummalative_crank_revolutions + csc_crank_ticks_since_last_event
self.__ble.gatts_notify(
self.__connection,
self.__measurement,
data
)