I thought that was what I had to do.
I modified it that way, but I get no event in GATTC_READ_RESULT, however I get the IRQ_GATTC_READ_DONE every 3 seconds. And, I think, I am issuing a read on that service every 3 seconds. right?
Code: Select all
import binascii
import bluetooth
import struct
import time
from ble_advertising import decode_services, decode_name
from ble_utils import BluetoothUuid
from micropython import const
import ubinascii
SERVICE = bluetooth.UUID('53da53b9-0447-425a-b9ea-9837505eb59a')
WRITE_C = bluetooth.UUID('7dddca00-3e05-4651-9254-44074792c590')
READ_C = bluetooth.UUID('f9279ee9-2cd0-410c-81cc-adf11e4e5aea')
_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_GATTS_WRITE = const(3)
_IRQ_GATTS_READ_REQUEST = const(4)
_IRQ_SCAN_RESULT = const(5)
_IRQ_SCAN_DONE = const(6)
_IRQ_PERIPHERAL_CONNECT = const(7)
_IRQ_PERIPHERAL_DISCONNECT = const(8)
_IRQ_GATTC_SERVICE_RESULT = const(9)
_IRQ_GATTC_SERVICE_DONE = const(10)
_IRQ_GATTC_CHARACTERISTIC_RESULT = const(11)
_IRQ_GATTC_CHARACTERISTIC_DONE = const(12)
_IRQ_GATTC_DESCRIPTOR_RESULT = const(13)
_IRQ_GATTC_DESCRIPTOR_DONE = const(14)
_IRQ_GATTC_READ_RESULT = const(15)
_IRQ_GATTC_READ_DONE = const(16)
_IRQ_GATTC_WRITE_DONE = const(17)
_IRQ_GATTC_NOTIFY = const(18)
_IRQ_GATTC_INDICATE = const(19)
_ADV_IND = const(0x00)
_ADV_DIRECT_IND = const(0x01)
_ADV_SCAN_IND = const(0x02)
_ADV_NONCONN_IND = const(0x03)
class BLECentral:
def __init__(self, ble):
self._ble = ble
self._ble.active(True)
self._ble.irq(self._irq)
self._search_name = None
self._reset()
def _reset(self):
# Cached name and address from a successful scan.
self._name = None
self._addr_type = None
self._addr = None
# Cached value (if we have one)
self._value = None
# Callbacks for completion of various operations.
# These reset back to None after being invoked.
self._scan_callback = None
self._conn_callback = None
self._read_callback = None
# Persistent callback for when new data is notified from the device.
self._notify_callback = None
# Connected device.
self._conn_handle = None
self._start_handle = None
self._end_handle = None
self._read_handle = None
def _irq(self, event, data):
if event == _IRQ_SCAN_RESULT:
addr_type, addr, adv_type, rssi, adv_data = data
try:
name = decode_name(adv_data)
if name:
self._addr_type = addr_type
self._addr = bytes(
addr
) # Note: address buffer is owned by caller so need to copy it.
self._name = decode_name(adv_data) or "?"
self._ble.gap_scan(None)
except UnicodeError as e:
return
elif event == _IRQ_SCAN_DONE:
if self._scan_callback:
if self._addr:
# Found a device during the scan (and the scan was explicitly stopped).
self._scan_callback(self._addr_type, self._addr, self._name)
self._scan_callback = None
else:
# Scan timed out.
self._scan_callback(None, None, None)
elif event == _IRQ_PERIPHERAL_CONNECT:
# Connect successful.
conn_handle, addr_type, addr = data
if addr_type == self._addr_type and addr == self._addr:
self._conn_handle = conn_handle
self._ble.gattc_discover_services(self._conn_handle)
elif event == _IRQ_PERIPHERAL_DISCONNECT:
# Disconnect (either initiated by us or the remote end).
conn_handle, _, _ = data
if conn_handle == self._conn_handle:
# If it was initiated by us, it'll already be reset.
self._reset()
elif event == _IRQ_GATTC_SERVICE_RESULT:
# Connected device returned a service.
conn_handle, start_handle, end_handle, uuid = data
print('Service uuid-> ', uuid)
if conn_handle == self._conn_handle and uuid == SERVICE:
self._start_handle, self._end_handle = start_handle, end_handle
elif event == _IRQ_GATTC_SERVICE_DONE:
# Service query complete.
if self._start_handle and self._end_handle:
self._ble.gattc_discover_characteristics(
self._conn_handle, self._start_handle, self._end_handle
)
else:
print("Failed to find write service.")
elif event == _IRQ_GATTC_CHARACTERISTIC_RESULT:
print('Class: BLECentral, Function: _irq, Line 132')
# Connected device returned a characteristic.
conn_handle, def_handle, value_handle, properties, uuid = data
print('uuid-> ', uuid)
if conn_handle == self._conn_handle and uuid == READ_C:
self._read_handle = value_handle
print('_read_handle-> SET')
if conn_handle == self._conn_handle and uuid == WRITE_C:
self._write_handle = value_handle
print('_write_handle-> SET')
elif event == _IRQ_GATTC_CHARACTERISTIC_DONE:
# Characteristic query complete.
if self._read_handle:
# We've finished connecting and discovering device, fire the connect callback.
if self._conn_callback:
self._conn_callback()
else:
print("Failed to find read uuid.")
elif event == _IRQ_GATTC_READ_RESULT:
# A read completed successfully.
print('_IRQ_GATTC_READ_RESULT-> ', _IRQ_GATTC_READ_RESULT)
conn_handle, value_handle, char_data = data
print('Class: BLECentral, Function: _irq, Line 167', self._ble.gattc_read(value_handle))
if conn_handle == self._conn_handle and value_handle == self._read_handle:
self._update_value(char_data)
if self._read_callback:
self._read_callback(self._value)
self._read_callback = None
elif event == _IRQ_GATTC_READ_DONE:
# Read completed (no-op).
conn_handle, value_handle, status = data
elif event == _IRQ_GATTC_NOTIFY:
print('_IRQ_GATTC_NOTIFY-> ')
# The ble_temperature.py demo periodically notifies its value.
conn_handle, value_handle, notify_data = data
if conn_handle == self._conn_handle and value_handle == self._read_handle:
print('notify_data-> ', binascii.unhexlify(notify_data))
self._update_value(notify_data)
if self._notify_callback:
self._notify_callback(self._value)
# Returns true if we've successfully connected and discovered characteristics.
def is_connected(self):
return self._conn_handle is not None and self._read_handle is not None
# Find a device advertising the environmental sensor service.
def scan(self, callback=None):
self._addr_type = None
self._addr = None
self._scan_callback = callback
self._ble.gap_scan(2000, 30000, 30000)
# Connect to the specified device (otherwise use cached address from a scan).
def connect(self, addr_type=None, addr=None, callback=None):
self._addr_type = addr_type or self._addr_type
self._addr = addr or self._addr
self._conn_callback = callback
if self._addr_type is None or self._addr is None:
return False
self._ble.gap_connect(self._addr_type, self._addr)
return True
# Disconnect from current device.
def disconnect(self):
if not self._conn_handle:
return
self._ble.gap_disconnect(self._conn_handle)
self._reset()
# Issues an (asynchronous) read, will invoke callback with data.
def read(self, callback):
if not self.is_connected():
return
self._read_callback = callback
self._ble.gattc_read(self._conn_handle, self._read_handle)
# Sets a callback to be invoked when the device notifies us.
def on_notify(self, callback):
self._notify_callback = callback
def _update_value(self, data):
# Data is sint16 in degrees Celsius with a resolution of 0.01 degrees Celsius.
self._value = struct.unpack("<h", data)[0] / 100
return self._value
def value(self):
return self._value
def demo(search_name):
ble = bluetooth.BLE()
central = BLECentral(ble)
central._search_name = search_name
not_found = False
def on_scan(addr_type, addr, name):
if name and search_name.upper() in name.upper():
print("Found sensor:", addr_type, addr, name)
central.connect()
else:
nonlocal not_found
not_found = True
print("No device found.")
central.scan(callback=on_scan)
# Wait for connection...
while not central.is_connected():
time.sleep_ms(100)
if not_found:
return
print("Connected")
# Explicitly issue reads, using "print" as the callback.
while central.is_connected():
central.read(callback=print)
time.sleep_ms(3000)
print("Disconnected")
if __name__ == "__main__":
demo('xbee')
# data = b'\x02\x01\x06\r\tXBee3 Zigbee'
# hex_string = ''.join(['{:02x}'.format(b) for b in data])
# print('lol-> ', data.decode('utf-8', 'ignore'))
# print('hex_string-> ', hex_string)
# # bytes_object = bytes.fromhex(hex_string)
# # ascii_string = bytes_object.decode("ASCII")
# # print(ascii_string)