blob: 410475042bcdae169add7eba12b6078ad4685de6 [file] [log] [blame]
#
# Copyright (c) 2020 Project CHIP Authors
# Copyright (c) 2019-2020 Google LLC.
# Copyright (c) 2015-2018 Nest Labs, Inc.
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
# @file
# This file is utility for Chip BLE
#
from __future__ import absolute_import, print_function
from ctypes import Structure, c_bool, c_int32, c_uint16, c_void_p
from .ChipUtility import ChipUtility
# Duplicates of BLE definitions in ChipDeviceController-ScriptBinding.cpp
BLE_EVENT_TYPE_RX = 1
BLE_EVENT_TYPE_TX = 2
BLE_EVENT_TYPE_SUBSCRIBE = 3
BLE_EVENT_TYPE_DISCONNECT = 4
BLE_SUBSCRIBE_OPERATION_SUBSCRIBE = 1
BLE_SUBSCRIBE_OPERATION_UNSUBSCRIBE = 2
# From BleError.h:
BLE_ERROR_REMOTE_DEVICE_DISCONNECTED = 12
FAKE_CONN_OBJ_VALUE = 12121212
# Number of bytes in service data payload
SERVICE_DATA_LEN = 8
def VoidPtrToUUIDString(ptr, len):
try:
ptr = ChipUtility.VoidPtrToByteArray(ptr, len)
ptr = ChipUtility.Hexlify(ptr)
ptr = (
ptr[:8]
+ "-"
+ ptr[8:12]
+ "-"
+ ptr[12:16]
+ "-"
+ ptr[16:20]
+ "-"
+ ptr[20:]
)
ptr = str(ptr)
except Exception:
print("ERROR: failed to convert void * to UUID")
ptr = None
return ptr
def ParseBleEventType(val):
if isinstance(val, int):
return val
if val.lower() == "rx":
return BLE_EVENT_TYPE_RX
if val.lower() == "tx":
return BLE_EVENT_TYPE_TX
raise Exception("Invalid Ble Event Type: " + str(val))
class BleTxEvent:
def __init__(self, svcId=None, charId=None, status=False):
self.EventType = BLE_EVENT_TYPE_TX
self.ConnObj = None
self.SvcId = svcId
self.CharId = charId
self.Status = status
def Print(self, prefix=""):
print(
"%sBleEvent Type: %s"
% (prefix, ("TX" if self.EventType == BLE_EVENT_TYPE_TX else "ERROR"))
)
print("%sStatus: %s" % (prefix, str(self.Status)))
if self.SvcId:
print("%sSvcId:" % (prefix))
print(ChipUtility.Hexlify(self.SvcId))
if self.CharId:
print("%sCharId:" % (prefix))
print(ChipUtility.Hexlify(self.CharId))
def SetField(self, name, val):
name = name.lower()
if name == "eventtype" or name == "event-type" or name == "type":
self.EventType = ParseBleEventType(val)
elif name == "status":
self.Status = val
elif name == "svcid":
self.SvcId = val
elif name == "charid":
self.CharId = val
else:
raise Exception("Invalid BleTxEvent field: " + str(name))
class BleDisconnectEvent:
def __init__(self, error=0):
self.EventType = BLE_EVENT_TYPE_DISCONNECT
self.ConnObj = None
self.Error = error
def Print(self, prefix=""):
print(
"%sBleEvent Type: %s"
% (prefix, ("DC" if self.EventType == BLE_EVENT_TYPE_DISCONNECT else "ERROR"))
)
print("%sError: %s" % (prefix, str(self.Error)))
def SetField(self, name, val):
name = name.lower()
if name == "eventtype" or name == "event-type" or name == "type":
self.EventType = ParseBleEventType(val)
elif name == "error":
self.Error = val
else:
raise Exception("Invalid BleDisconnectEvent field: " + str(name))
class BleRxEvent:
def __init__(self, svcId=None, charId=None, buffer=None):
self.EventType = BLE_EVENT_TYPE_RX
self.ConnObj = None
self.SvcId = svcId
self.CharId = charId
self.Buffer = buffer
def Print(self, prefix=""):
print(
"%sBleEvent Type: %s"
% (prefix, ("RX" if self.EventType == BLE_EVENT_TYPE_RX else "ERROR"))
)
if self.Buffer:
print("%sBuffer:" % (prefix))
print(ChipUtility.Hexlify(self.Buffer))
if self.SvcId:
print("%sSvcId:" % (prefix))
print(ChipUtility.Hexlify(self.SvcId))
if self.CharId:
print("%sCharId:" % (prefix))
print(ChipUtility.Hexlify(self.CharId))
def SetField(self, name, val):
name = name.lower()
if name == "eventtype" or name == "event-type" or name == "type":
self.EventType = ParseBleEventType(val)
elif name == "buffer":
self.Buffer = val
elif name == "svcid":
self.SvcId = val
elif name == "charid":
self.CharId = val
else:
raise Exception("Invalid BleRxEvent field: " + str(name))
class BleSubscribeEvent:
def __init__(
self,
svcId=None,
charId=None,
status=True,
operation=BLE_SUBSCRIBE_OPERATION_SUBSCRIBE,
):
self.EventType = BLE_EVENT_TYPE_SUBSCRIBE
self.ConnObj = None
self.SvcId = svcId
self.CharId = charId
self.Status = status
self.Operation = operation
def Print(self, prefix=""):
print(
"%sBleEvent Type: %s"
% (
prefix,
("SUBSCRIBE" if self.EventType ==
BLE_EVENT_TYPE_SUBSCRIBE else "ERROR"),
)
)
print("%sStatus: %s" % (prefix, str(self.Status)))
print(
"%sOperation: %s"
% (
prefix,
(
"UNSUBSCRIBE"
if self.Operation == BLE_SUBSCRIBE_OPERATION_UNSUBSCRIBE
else "SUBSCRIBE"
),
)
)
if self.SvcId:
print("%sSvcId:" % (prefix))
print(ChipUtility.Hexlify(self.SvcId))
if self.CharId:
print("%sCharId:" % (prefix))
print(ChipUtility.Hexlify(self.CharId))
def SetField(self, name, val):
name = name.lower()
if name == "eventtype" or name == "event-type" or name == "type":
self.EventType = ParseBleEventType(val)
elif name == "status":
self.Status = val
elif name == "svcid":
self.SvcId = val
elif name == "charid":
self.CharId = val
elif name == "operation":
self.Operation = val
else:
raise Exception("Invalid BleSubscribeEvent field: " + str(name))
class BleTxEventStruct(Structure):
_fields_ = [
("EventType", c_int32), # The type of event.
# a Handle back to the connection object or None.
("ConnObj", c_void_p),
("SvcId", c_void_p), # the byte array of the service UUID.
("CharId", c_void_p), # the byte array of the characteristic UUID.
("Status", c_bool), # The status of the previous Tx request
]
def toBleTxEvent(self):
return BleTxEvent(
svcId=ChipUtility.VoidPtrToByteArray(self.SvcId, 16),
charId=ChipUtility.VoidPtrToByteArray(self.CharId, 16),
status=self.Status,
)
@classmethod
def fromBleTxEvent(cls, bleTxEvent):
bleTxEventStruct = cls()
bleTxEventStruct.EventType = bleTxEvent.EventType
bleTxEventStruct.ConnObj = c_void_p(FAKE_CONN_OBJ_VALUE)
bleTxEventStruct.SvcId = ChipUtility.ByteArrayToVoidPtr(
bleTxEvent.SvcId)
bleTxEventStruct.CharId = ChipUtility.ByteArrayToVoidPtr(
bleTxEvent.CharId)
bleTxEventStruct.Status = bleTxEvent.Status
return bleTxEventStruct
class BleDisconnectEventStruct(Structure):
_fields_ = [
("EventType", c_int32), # The type of event.
# a Handle back to the connection object or None.
("ConnObj", c_void_p),
("Error", c_int32), # The disconnect error code.
]
def toBleDisconnectEvent(self):
return BleDisconnectEvent(error=self.Error)
@classmethod
def fromBleDisconnectEvent(cls, bleDisconnectEvent):
bleDisconnectEventStruct = cls()
bleDisconnectEventStruct.EventType = bleDisconnectEvent.EventType
bleDisconnectEventStruct.ConnObj = c_void_p(FAKE_CONN_OBJ_VALUE)
bleDisconnectEventStruct.Error = bleDisconnectEvent.Error
return bleDisconnectEventStruct
class BleRxEventStruct(Structure):
_fields_ = [
("EventType", c_int32), # The type of event.
# a Handle back to the connection object or None.
("ConnObj", c_void_p),
("SvcId", c_void_p), # the byte array of the service UUID.
("CharId", c_void_p), # the byte array of the characteristic UUID.
("Buffer", c_void_p), # the byte array of the Rx packet.
("Length", c_uint16), # the length of the byte array (buffer).
]
def toBleRxEvent(self):
return BleRxEvent(
svcId=ChipUtility.VoidPtrToByteArray(self.SvcId, 16),
charId=ChipUtility.VoidPtrToByteArray(self.CharId, 16),
buffer=ChipUtility.VoidPtrToByteArray(self.Buffer, self.Length),
)
@classmethod
def fromBleRxEvent(cls, bleRxEvent):
bleRxEventStruct = cls()
bleRxEventStruct.EventType = bleRxEvent.EventType
bleRxEventStruct.ConnObj = c_void_p(FAKE_CONN_OBJ_VALUE)
bleRxEventStruct.SvcId = ChipUtility.ByteArrayToVoidPtr(
bleRxEvent.SvcId)
bleRxEventStruct.CharId = ChipUtility.ByteArrayToVoidPtr(
bleRxEvent.CharId)
bleRxEventStruct.Buffer = ChipUtility.ByteArrayToVoidPtr(
bleRxEvent.Buffer)
bleRxEventStruct.Length = (
len(bleRxEvent.Buffer) if (bleRxEvent.Buffer is not None) else 0
)
return bleRxEventStruct
class BleSubscribeEventStruct(Structure):
_fields_ = [
("EventType", c_int32), # The type of event.
# a Handle back to the connection object or None.
("ConnObj", c_void_p),
("SvcId", c_void_p), # the byte array of the service UUID.
("CharId", c_void_p), # the byte array of the characteristic UUID.
("Operation", c_int32), # The subscribe operation.
("Status", c_bool), # The status of the previous Tx request
]
def toBleSubscribeEvent(self):
return BleSubscribeEvent(
svcId=ChipUtility.VoidPtrToByteArray(self.SvcId, 16),
charId=ChipUtility.VoidPtrToByteArray(self.CharId, 16),
status=self.Status,
operation=self.Operation,
)
@classmethod
def fromBleSubscribeEvent(cls, bleSubscribeEvent):
bleSubscribeEventStruct = cls()
bleSubscribeEventStruct.EventType = bleSubscribeEvent.EventType
bleSubscribeEventStruct.ConnObj = c_void_p(FAKE_CONN_OBJ_VALUE)
bleSubscribeEventStruct.SvcId = ChipUtility.ByteArrayToVoidPtr(
bleSubscribeEvent.SvcId
)
bleSubscribeEventStruct.CharId = ChipUtility.ByteArrayToVoidPtr(
bleSubscribeEvent.CharId
)
bleSubscribeEventStruct.Operation = bleSubscribeEvent.Operation
bleSubscribeEventStruct.Status = bleSubscribeEvent.Status
return bleSubscribeEventStruct
class BleDeviceIdentificationInfo:
def __init__(self, pairingState, discriminator, vendorId, productId):
self.pairingState = pairingState
self.discriminator = discriminator
self.vendorId = vendorId
self.productId = productId
def ParseServiceData(data):
if len(data) != SERVICE_DATA_LEN:
return None
return BleDeviceIdentificationInfo(
int(data[0]),
int.from_bytes(data[1:3], byteorder='little'),
int.from_bytes(data[3:5], byteorder='little'),
int.from_bytes(data[5:7], byteorder='little'),
)