Bluetooth simple connection & parsing of results

General discussions and questions abound development of code with MicroPython that is not hardware specific.
Target audience: MicroPython Users.
mflmartin
Posts: 43
Joined: Sat Jul 23, 2016 7:30 pm

Bluetooth simple connection & parsing of results

Post by mflmartin » Thu Feb 04, 2021 10:19 pm

Hi!

Following the examples, I am able to scan, like for instance taking this example:

https://github.com/micropython/micropyt ... central.py

But when I want to parse all the addresses or any other data and I get the bytes objects, there is no way to decode that. I have tried several things, but none are working and I get:

Code: Select all

addr->  b'd\xb9\xe4]\x8e\xc9'
Traceback (most recent call last):
  File "<stdin>", line 75, in _irq
UnicodeError:
When trying this (and many other decoding options).

Code: Select all

	
	def _irq(self, event, data):
		if event == _IRQ_SCAN_RESULT:
			addr_type, addr, adv_type, rssi, adv_data = data
			print('addr-> ', addr)
			print('addr decoded-> ', addr.decode())
			
			
Is there any way to do it?

Thanks,

mflmartin
Posts: 43
Joined: Sat Jul 23, 2016 7:30 pm

Re: Bluetooth simple connection & parsing of results

Post by mflmartin » Thu Feb 04, 2021 11:28 pm

I found it myself, the simplest solution is to use the ignore flag.

Code: Select all

data.decode('utf-8', 'ignore')

User avatar
jimmo
Posts: 2262
Joined: Tue Aug 08, 2017 1:57 am
Location: Sydney, Australia
Contact:

Re: Bluetooth simple connection & parsing of results

Post by jimmo » Fri Feb 05, 2021 12:29 am

mflmartin wrote:
Thu Feb 04, 2021 10:19 pm
But when I want to parse all the addresses or any other data and I get the bytes objects,
The addresses aren't valid ascii or utf-8 data, so they cannot be decoded or parsed as such. You just keep them as bytes -- i.e. an address is a six-byte string.

The IRQ will give them to you as memoryviews (to avoid copying) but if you want to keep a reference to them you should copy them using bytes().

The advertising payloads are also passed as memoryviews, but you still parse them as byte strings (see ble_advertising.py for some examples).

mflmartin
Posts: 43
Joined: Sat Jul 23, 2016 7:30 pm

Re: Bluetooth simple connection & parsing of results

Post by mflmartin » Fri Feb 05, 2021 12:33 am

Thanks for the insight,

Nevertheless, I need to find devices by their name (av_data) or address, that is why I need to parse those bytes to a readable string that I can, then, compare.

The reason why, is because I have many devices, each one with an specific name (eg: ZM00001, ZM00002, etc). Thus, I need to force the ESP32 to connect to a particular one based on that ID. I am developing a testing app for the module, to automate the testing of connection of BLE devices...

If there is another way to get the name data and compare it, I am all ears.

:)

User avatar
jimmo
Posts: 2262
Joined: Tue Aug 08, 2017 1:57 am
Location: Sydney, Australia
Contact:

Re: Bluetooth simple connection & parsing of results

Post by jimmo » Fri Feb 05, 2021 2:03 am

If you're trying to find a device by address, then you can compare the address to a byte string.

Code: Select all

my_device = b'HJ0\x01\xbcR'

... if event == _IRQ_SCAN_RESULT:
       addr_type, addr ... = data
       if addr == my_device:
          # etc
If you'd prefer to work with the addresses in hexadecimal represnetation (more like how MAC addresses are normally displayed) then you can use

Code: Select all

>>> import binascii
>>> binascii.hexlify(b'HJ0\x01\xbcR')
b'484a3001bc52'
>>> binascii.unhexlify('484a3001bc52')
b'HJ0\x01\xbcR'
Or even if you prefer the colon-separated form:

Code: Select all

>>> binascii.unhexlify('48:4a:30:01:bc:52'.replace(':', ''))
b'HJ0\x01\xbcR'
For the device name, there are two ways that a device can report a "name" (and they will not necessarily be the same)
- In the advertising payload, it can include a name field.
- In the device information GATT service, there's a device name characteristic.

If you're scanning for devices, then you probably want the former. There's a helper in ble_advertising.py for this that knows how to decode the advertising payload format (which is length&tag-prefixed records)

Code: Select all

... if event == _IRQ_SCAN_RESULT:
       addr_type, addr ... adv_data = data
       if ble_advertising.decode_name(adv_data) == 'my-device':
          # etc

mflmartin
Posts: 43
Joined: Sat Jul 23, 2016 7:30 pm

Re: Bluetooth simple connection & parsing of results

Post by mflmartin » Mon Feb 08, 2021 8:29 pm

Thanks for the info. I see it now.

Another question, I am able to connect but when I request a read on a service, I can't read the value that I get.

Code: Select all

		elif event == _IRQ_GATTC_READ_DONE:
			# Read completed (no-op).
			conn_handle, value_handle, status = data
			print('Class: BLECentral, Function: _irq, Line 167', data)
			print('Class: BLECentral, Function: _irq, Line 167', bluetooth.BLE.gatts_read(value_handle))

I get: (0, 65535, 258)
print('Class: BLECentral, Function: _irq, Line 167', bluetooth.BLE.gatts_read(value_handle))

I get:

Code: Select all

Traceback (most recent call last):
  File "<stdin>", line 168, in _irq
TypeError: argument has wrong type
Any idea on what I am doing wrong? Moreover, how is data returned? In bytes, that I need to convert to HEX, or in HEX directly?
Thanks,

User avatar
jimmo
Posts: 2262
Joined: Tue Aug 08, 2017 1:57 am
Location: Sydney, Australia
Contact:

Re: Bluetooth simple connection & parsing of results

Post by jimmo » Mon Feb 08, 2021 11:25 pm

mflmartin wrote:
Mon Feb 08, 2021 8:29 pm
Another question, I am able to connect but when I request a read on a service, I can't read the value that I get.
You're handing the gattc (client) event, but using the gatts_read (server) function.

In order to do a client read (to a server), you issue gattc_read(conn, handle), then get the data in the READ_RESULT IRQ, then the status in the READ_DONE IRQ.

The reason this is split over two IRQs is because of how the underlying stacks report the events to MicroPython, but also to (in the future) support chunked reads ("long reads").

mflmartin
Posts: 43
Joined: Sat Jul 23, 2016 7:30 pm

Re: Bluetooth simple connection & parsing of results

Post by mflmartin » Mon Feb 08, 2021 11:48 pm

Thanks,

I thought that was what I had to do.

I modified it that way, but I get no event in GATTC_READ_RESULT, however I get the IRQ_GATTC_READ_DONE every 3 seconds. And, I think, I am issuing a read on that service every 3 seconds. right?

Code: Select all

import binascii

import bluetooth
import struct
import time
from ble_advertising import decode_services, decode_name
from ble_utils import BluetoothUuid
from micropython import const
import ubinascii

SERVICE = bluetooth.UUID('53da53b9-0447-425a-b9ea-9837505eb59a')
WRITE_C = bluetooth.UUID('7dddca00-3e05-4651-9254-44074792c590')
READ_C = bluetooth.UUID('f9279ee9-2cd0-410c-81cc-adf11e4e5aea')

_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_GATTS_WRITE = const(3)
_IRQ_GATTS_READ_REQUEST = const(4)
_IRQ_SCAN_RESULT = const(5)
_IRQ_SCAN_DONE = const(6)
_IRQ_PERIPHERAL_CONNECT = const(7)
_IRQ_PERIPHERAL_DISCONNECT = const(8)
_IRQ_GATTC_SERVICE_RESULT = const(9)
_IRQ_GATTC_SERVICE_DONE = const(10)
_IRQ_GATTC_CHARACTERISTIC_RESULT = const(11)
_IRQ_GATTC_CHARACTERISTIC_DONE = const(12)
_IRQ_GATTC_DESCRIPTOR_RESULT = const(13)
_IRQ_GATTC_DESCRIPTOR_DONE = const(14)
_IRQ_GATTC_READ_RESULT = const(15)
_IRQ_GATTC_READ_DONE = const(16)
_IRQ_GATTC_WRITE_DONE = const(17)
_IRQ_GATTC_NOTIFY = const(18)
_IRQ_GATTC_INDICATE = const(19)

_ADV_IND = const(0x00)
_ADV_DIRECT_IND = const(0x01)
_ADV_SCAN_IND = const(0x02)
_ADV_NONCONN_IND = const(0x03)


class BLECentral:
	def __init__(self, ble):
		self._ble = ble
		self._ble.active(True)
		self._ble.irq(self._irq)
		self._search_name = None
		self._reset()
	
	def _reset(self):
		# Cached name and address from a successful scan.
		self._name = None
		self._addr_type = None
		self._addr = None
		
		# Cached value (if we have one)
		self._value = None
		
		# Callbacks for completion of various operations.
		# These reset back to None after being invoked.
		self._scan_callback = None
		self._conn_callback = None
		self._read_callback = None
		
		# Persistent callback for when new data is notified from the device.
		self._notify_callback = None
		
		# Connected device.
		self._conn_handle = None
		self._start_handle = None
		self._end_handle = None
		self._read_handle = None
	
	def _irq(self, event, data):
		if event == _IRQ_SCAN_RESULT:
			
			addr_type, addr, adv_type, rssi, adv_data = data
			
			try:
				name = decode_name(adv_data)
				if name:
					self._addr_type = addr_type
					self._addr = bytes(
						addr
					)  # Note: address buffer is owned by caller so need to copy it.
					self._name = decode_name(adv_data) or "?"
					self._ble.gap_scan(None)
					
			except UnicodeError as e:
				return
			
		elif event == _IRQ_SCAN_DONE:
			if self._scan_callback:
				if self._addr:
					# Found a device during the scan (and the scan was explicitly stopped).
					self._scan_callback(self._addr_type, self._addr, self._name)
					self._scan_callback = None
				else:
					# Scan timed out.
					self._scan_callback(None, None, None)
		
		elif event == _IRQ_PERIPHERAL_CONNECT:
			# Connect successful.
			conn_handle, addr_type, addr = data
			if addr_type == self._addr_type and addr == self._addr:
				self._conn_handle = conn_handle
				self._ble.gattc_discover_services(self._conn_handle)
		
		elif event == _IRQ_PERIPHERAL_DISCONNECT:
			# Disconnect (either initiated by us or the remote end).
			conn_handle, _, _ = data
			if conn_handle == self._conn_handle:
				# If it was initiated by us, it'll already be reset.
				self._reset()
		
		elif event == _IRQ_GATTC_SERVICE_RESULT:
			# Connected device returned a service.
			conn_handle, start_handle, end_handle, uuid = data
			print('Service uuid-> ', uuid)
			if conn_handle == self._conn_handle and uuid == SERVICE:
				self._start_handle, self._end_handle = start_handle, end_handle
		
		elif event == _IRQ_GATTC_SERVICE_DONE:
			# Service query complete.
			if self._start_handle and self._end_handle:
				self._ble.gattc_discover_characteristics(
					self._conn_handle, self._start_handle, self._end_handle
				)
			else:
				print("Failed to find write service.")
		
		elif event == _IRQ_GATTC_CHARACTERISTIC_RESULT:
			print('Class: BLECentral, Function: _irq, Line 132')
			# Connected device returned a characteristic.
			conn_handle, def_handle, value_handle, properties, uuid = data
			print('uuid-> ', uuid)
			if conn_handle == self._conn_handle and uuid == READ_C:
				self._read_handle = value_handle
				print('_read_handle-> SET')
			
			if conn_handle == self._conn_handle and uuid == WRITE_C:
				self._write_handle = value_handle
				print('_write_handle-> SET')
		
		elif event == _IRQ_GATTC_CHARACTERISTIC_DONE:
			# Characteristic query complete.
			if self._read_handle:
				# We've finished connecting and discovering device, fire the connect callback.
				if self._conn_callback:
					self._conn_callback()
			else:
				print("Failed to find read uuid.")
		
		elif event == _IRQ_GATTC_READ_RESULT:
			# A read completed successfully.
			print('_IRQ_GATTC_READ_RESULT-> ', _IRQ_GATTC_READ_RESULT)
			conn_handle, value_handle, char_data = data
			print('Class: BLECentral, Function: _irq, Line 167', self._ble.gattc_read(value_handle))
			if conn_handle == self._conn_handle and value_handle == self._read_handle:
				self._update_value(char_data)
				if self._read_callback:
					self._read_callback(self._value)
					self._read_callback = None
		
		elif event == _IRQ_GATTC_READ_DONE:
			# Read completed (no-op).
			conn_handle, value_handle, status = data
		
		elif event == _IRQ_GATTC_NOTIFY:
			print('_IRQ_GATTC_NOTIFY-> ')
			# The ble_temperature.py demo periodically notifies its value.
			conn_handle, value_handle, notify_data = data
			if conn_handle == self._conn_handle and value_handle == self._read_handle:
				print('notify_data-> ', binascii.unhexlify(notify_data))
				self._update_value(notify_data)
				if self._notify_callback:
					self._notify_callback(self._value)
	
	# Returns true if we've successfully connected and discovered characteristics.
	def is_connected(self):
		return self._conn_handle is not None and self._read_handle is not None
	
	# Find a device advertising the environmental sensor service.
	def scan(self, callback=None):
		self._addr_type = None
		self._addr = None
		self._scan_callback = callback
		self._ble.gap_scan(2000, 30000, 30000)
	
	# Connect to the specified device (otherwise use cached address from a scan).
	def connect(self, addr_type=None, addr=None, callback=None):
		self._addr_type = addr_type or self._addr_type
		self._addr = addr or self._addr
		self._conn_callback = callback
		if self._addr_type is None or self._addr is None:
			return False
		self._ble.gap_connect(self._addr_type, self._addr)
		return True
	
	# Disconnect from current device.
	def disconnect(self):
		if not self._conn_handle:
			return
		self._ble.gap_disconnect(self._conn_handle)
		self._reset()
	
	# Issues an (asynchronous) read, will invoke callback with data.
	def read(self, callback):
		if not self.is_connected():
			return
		self._read_callback = callback
		self._ble.gattc_read(self._conn_handle, self._read_handle)
	
	# Sets a callback to be invoked when the device notifies us.
	def on_notify(self, callback):
		self._notify_callback = callback
	
	def _update_value(self, data):
		# Data is sint16 in degrees Celsius with a resolution of 0.01 degrees Celsius.
		self._value = struct.unpack("<h", data)[0] / 100
		return self._value
	
	def value(self):
		return self._value


def demo(search_name):
	ble = bluetooth.BLE()
	central = BLECentral(ble)
	central._search_name = search_name
	not_found = False
	
	def on_scan(addr_type, addr, name):
		
		if name and search_name.upper() in name.upper():
			print("Found sensor:", addr_type, addr, name)
			central.connect()
		else:
			nonlocal not_found
			not_found = True
			print("No device found.")
	
	central.scan(callback=on_scan)
	
	# Wait for connection...
	while not central.is_connected():
		time.sleep_ms(100)
		if not_found:
			return
	
	print("Connected")
	
	# Explicitly issue reads, using "print" as the callback.
	while central.is_connected():
		central.read(callback=print)
		time.sleep_ms(3000)
	
	print("Disconnected")


if __name__ == "__main__":
	demo('xbee')
	# data = b'\x02\x01\x06\r\tXBee3 Zigbee'
	# hex_string = ''.join(['{:02x}'.format(b) for b in data])
	# print('lol-> ', data.decode('utf-8', 'ignore'))
	# print('hex_string-> ', hex_string)
	# # bytes_object = bytes.fromhex(hex_string)
	# # ascii_string = bytes_object.decode("ASCII")
	# # print(ascii_string)


User avatar
jimmo
Posts: 2262
Joined: Tue Aug 08, 2017 1:57 am
Location: Sydney, Australia
Contact:

Re: Bluetooth simple connection & parsing of results

Post by jimmo » Tue Feb 09, 2021 12:09 am

mflmartin wrote:
Mon Feb 08, 2021 11:48 pm
I modified it that way, but I get no event in GATTC_READ_RESULT, however I get the IRQ_GATTC_READ_DONE every 3 seconds. And, I think, I am issuing a read on that service every 3 seconds. right?
That looks right... what status do you get for the READ_DONE?

mflmartin
Posts: 43
Joined: Sat Jul 23, 2016 7:30 pm

Re: Bluetooth simple connection & parsing of results

Post by mflmartin » Tue Feb 09, 2021 10:50 am

Hummm: I get Status: 258, but the READ_RESULT never fires.

Post Reply