CorryWestSide
Member
Hi, i have this code by a company, i have to implement this in my b4a app but i realy dont know how to convert it in java or b4a.
Can i have a help pls? are just 2 week that i not have any result.....
Can i have a help pls? are just 2 week that i not have any result.....
PythonCode:
import asyncio
import crcmod
from crcmod.predefined import *
import datetime
from bleak import BleakClient
from bleak.backends.characteristic import BleakGATTCharacteristic
import subprocess
import sys
# UUID service to connect
SERVICE_UUID = "00005301-0000-0041-4c50-574953450000"
# Write characteristics
CHARACTERISTIC_UUID_WRITE = "00005302-0000-0041-4c50-574953450000"
# Read characteristics
CHARACTERISTIC_UUID_NOTIFY = "00005303-0000-0041-4c50-574953450000"
# Standard Ack
STD_ACK = [0x05,0x01,0x40,0x01,0x00,0x01,0xb8,0x55,0x81,0x09,0x40,0x01,0x02,0x01,0x46,0xB0,0x77,0xE2,0x00,0x01,0x01,0x00,0x00,0x00,0x00,0xF0,0x72]
STD2_ACK = [0x05,0x01,0x40,0x02,0x00,0x01,0x48,0x55]
# End session
END_SESSION_PRE = [0x81,0x02,0x40]
# End session post
END_SESSION_POST1 = [0x03,0x01]
END_SESSION_POST2 = [0x77,0xFF,0x67,0xB0]
# Block index
index = 0
# Procedura principale
async def main(device_address):
def Crc16(message: bytearray):
# Configurazione del modulo Crc-16 secondo la specifica DDCMP
crc_mod = crcmod.mkCrcFun(0x18005, rev=True, initCrc=0x0000, xorOut=0x0000)
bmessage = bytes(message)
crc_value = crc_mod(bmessage)
return crc_value.to_bytes(2)
def AppendCrc16(message: bytearray):
res = Crc16(message)
return res
async def ReadDataBlock(data):
preack = [0x05, 0x01, 0x40]
postind = [0x00, 0x01]
ack = []
global index
if data[0] == 0x81:
# Verifica se è l'ultimo blocco di dati
if data[2] != 0x40:
print("Ultimo blocco...")
# Rispondi con un'acknowledgement
ack.extend(preack)
ack.extend(index.to_bytes(1, 'little'))
ack.extend(postind)
ack_crc = AppendCrc16(ack)
ack.extend(ack_crc[1:])
ack.extend(ack_crc[0:])
await client.write_gatt_char(CHARACTERISTIC_UUID_WRITE, bytes(ack), response=False)
print("Inviato: Rispondi con un'acknowledgement ", ack)
# Termina il blocco
end_session = []
end_session.extend(END_SESSION_PRE)
end_session.extend(index.to_bytes(1, 'little'))
end_session.extend(END_SESSION_POST1)
end_session_crc = AppendCrc16(end_session)
end_session.extend(end_session_crc[1:])
end_session.extend(end_session_crc[0:])
await client.write_gatt_char(CHARACTERISTIC_UUID_WRITE, bytes(end_session), response=False)
print("Inviato: Termina il blocco endSession ", end_session)
await client.write_gatt_char(CHARACTERISTIC_UUID_WRITE, bytes(END_SESSION_POST2), response=False)
print("Inviato: Termina il blocco endSession2 ", END_SESSION_POST2)
await asyncio.sleep(1)
return
print("Leggi EVA-DTS audit...")
# Rispondi con un'acknowledgement per il blocco con l'ID di sequenza
ack.extend(preack)
ack.extend(index.to_bytes(1, 'little'))
ack.extend(postind)
ack_crc = AppendCrc16(ack)
ack.extend(ack_crc[1:])
ack.extend(ack_crc[0:])
await client.write_gatt_char(CHARACTERISTIC_UUID_WRITE, bytes(ack), response=False)
print("Inviato: Rispondi con un'acknowledgement per il blocco con l'ID di sequenza ", ack)
if index < 255:
index += 1
else:
index = 0
print("Indice", index)
await asyncio.sleep(1)
async def handle_rx(_: BleakGATTCharacteristic, data: bytearray):
print("Ricevuto HandleRX: ", data.hex())
if start_read_block:
await ReadDataBlock(data)
def return_chunk(data, uw):
# Calcola la dimensione del chunk basata sulla caratteristica bl
data_payload_size = uw.max_write_without_response_size - 3
chunks = []
for i in range(0, len(data), data_payload_size):
chunk = data[i:i + data_payload_size]
chunks.append(chunk)
return chunks
async with BleakClient(device_address) as client:
global index
print(f"Connesso a {device_address}")
start = [0x05, 0x06, 0x40, 0x00, 0x08, 0x01, 0x5B, 0x95]
service = client.services
uw = service.get_characteristic(CHARACTERISTIC_UUID_WRITE)
start_read_block = False
index = 0
await client.start_notify(CHARACTERISTIC_UUID_NOTIFY, handle_rx)
await asyncio.sleep(1)
# Il primo messaggio START è sempre ignorato dalla scheda di destinazione
await client.write_gatt_char(CHARACTERISTIC_UUID_WRITE, bytes(start), response=False)
await asyncio.sleep(1)
print("Inviato Primo Messaggio START ", start)
# Ripeti START
await client.write_gatt_char(CHARACTERISTIC_UUID_WRITE, bytes(start), response=False)
await asyncio.sleep(1)
print("Inviato Secondo Messaggio START ", start)
# Chi sei: timestamp utilizza il formato BCD
current = datetime.datetime.now()
cy = current.year % 100
date_dmy = [int(str(current.day), 16), int(str(current.month), 16), int(str(cy), 16)]
date_hms = [int(str(current.hour), 16), int(str(current.minute), 16), int(str(current.second), 16)]
pre_who = [0x81, 0x10, 0x40, 0x00, 0x01, 0x01, 0x0a, 0x42, 0x77, 0xe0, 0x00, 0x00, 0x00, 0x00, 0x00]
post_who = [0x00, 0x00, 0x0c]
who_are_you_msg = []
who_are_you_msg.extend(pre_who)
who_are_you_msg.extend(date_dmy)
who_are_you_msg.extend(date_hms)
who_are_you_msg.extend(post_who)
who_are_you_msg_crc = AppendCrc16(who_are_you_msg)
who_are_you_msg.extend(who_are_you_msg_crc[1:])
who_are_you_msg.extend(who_are_you_msg_crc[0:])
chks = return_chunk(who_are_you_msg, uw)
for ch in chks:
await client.write_gatt_char(CHARACTERISTIC_UUID_WRITE, bytes(ch), response=False)
print("Inviato I Chunk del messaggio WhoAreYou: ", ch)
await asyncio.sleep(1)
# Invia ACK
chks2 = return_chunk(STD_ACK, uw)
for ch2 in chks2:
await client.write_gatt_char(CHARACTERISTIC_UUID_WRITE, bytes(ch2), response=False)
print("Inviato Ack", ch2)
await asyncio.sleep(2)
# Leggi EVA-DTS...
# Invia ACK del primo blocco EVA-DTS
await client.write_gatt_char(CHARACTERISTIC_UUID_WRITE, bytes(STD2_ACK), response=False)
print("Inviato Leggi EVA-DTS ", STD2_ACK)
index = 3
start_read_block = True
await asyncio.sleep(5)
await client.stop_notify(CHARACTERISTIC_UUID_NOTIFY)
print("Disconnesso da ", device_address)
if __name__ == "__main__":
device_address = "00:A0:50:43:89:DD" # Inserisci qui l'indirizzo MAC del dispositivo a cui collegarsi
# Esegui la funzione per avviare la ricerca
asyncio.run(main(device_address))