Have started to explore the new PyBridge and how to use as a B4J client of an ESP32 running as BLE server using the B4R library rBLEServer.
Created a simple Python script using Bleak.
This script could run as an external process with jShell and handle bidirectional communication, but that is probably the old way since the new PyBridge has been developed.
Questions
How to use this kind of communication with PyBridge? or is the PyBridge not designed for this?
How to handle sending data or receiving data from the BLEServer?
Any guidance appreciated.
Created a simple Python script using Bleak.
This script could run as an external process with jShell and handle bidirectional communication, but that is probably the old way since the new PyBridge has been developed.
Questions
How to use this kind of communication with PyBridge? or is the PyBridge not designed for this?
How to handle sending data or receiving data from the BLEServer?
Any guidance appreciated.
B4X:
import asyncio
import argparse
from bleak import BleakScanner, BleakClient
import struct
ESP32_NAME = "BLEServer"
ESP32_MAC = "30:C9:22:D1:80:2E"
SERVICE_UUID = "6E400001-B5A3-F393-E0A9-E50E24DCCA9E" # UART Service UUID
RX_CHARACTERISTIC_UUID = "6E400002-B5A3-F393-E0A9-E50E24DCCA9E" # RX Notify & Write Characteristic
async def scan_and_parse():
def callback(device, advertisement_data):
if device.name == ESP32_NAME or device.address == ESP32_MAC:
service_data = advertisement_data.service_data.get(SERVICE_UUID)
if service_data and len(service_data) >= 4:
int1, int2 = struct.unpack("hh", service_data[:4])
print(f"Received: {int1}, {int2}")
scanner = BleakScanner(callback)
await scanner.start()
await asyncio.sleep(10) # Scan for 10 seconds
await scanner.stop()
async def connect():
client = BleakClient(ESP32_MAC)
await client.connect()
if client.is_connected:
print("Connected to ESP32")
return client
async def write(client, command: bytes, commanddata: bytes):
if client.is_connected:
await client.write_gatt_char(RX_CHARACTERISTIC_UUID, command + commanddata)
print(f"Wrote command: {command.hex()} data: {commanddata.hex()}")
async def main(command: bytes, commanddata: bytes):
await scan_and_parse()
client = await connect()
await write(client, command, commanddata)
await client.disconnect()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Send command to ESP32 BLE server")
parser.add_argument("-c", "--command", required=True, type=lambda x: bytes.fromhex(x), help="Command in hex format (e.g., 01)")
parser.add_argument("-d", "--commanddata", required=True, type=lambda x: bytes.fromhex(x), help="Command data in hex format (e.g., 02)")
args = parser.parse_args()
asyncio.run(main(args.command, args.commanddata))