blob: 152b76dcbae3d8e5fe73553f339b970fcb7b0510 [file] [log] [blame]
#
# Copyright (c) 2020-2022 Project CHIP Authors
# Copyright (c) 2019-2020 Google, LLC.
# Copyright (c) 2013-2018 Nest Labs, Inc.
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
# @file
# Python interface for Chip Device Manager
#
"""Chip Device Controller interface
"""
from __future__ import absolute_import
from __future__ import print_function
import asyncio
from ctypes import *
from .ChipStack import *
from .interaction_model import InteractionModelError, delegate as im
from .exceptions import *
from .clusters import Command as ClusterCommand
from .clusters import Attribute as ClusterAttribute
from .clusters import ClusterObjects as ClusterObjects
from .clusters import Objects as GeneratedObjects
from .clusters.CHIPClusters import *
import enum
import threading
import typing
import builtins
import ipdb
import ctypes
import copy
__all__ = ["ChipDeviceController"]
_DevicePairingDelegate_OnPairingCompleteFunct = CFUNCTYPE(None, c_uint32)
_DevicePairingDelegate_OnCommissioningCompleteFunct = CFUNCTYPE(
None, c_uint64, c_uint32)
_DeviceAddressUpdateDelegate_OnUpdateComplete = CFUNCTYPE(
None, c_uint64, c_uint32)
# void (*)(Device *, CHIP_ERROR).
#
# CHIP_ERROR is actually signed, so using c_uint32 is weird, but everything
# else seems to do it.
_DeviceAvailableFunct = CFUNCTYPE(None, c_void_p, c_uint32)
# This is a fix for WEAV-429. Jay Logue recommends revisiting this at a later
# date to allow for truly multiple instances so this is temporary.
def _singleton(cls):
instance = [None]
def wrapper(*args, **kwargs):
if instance[0] is None:
instance[0] = cls(*args, **kwargs)
return instance[0]
return wrapper
class DCState(enum.IntEnum):
NOT_INITIALIZED = 0
IDLE = 1
BLE_READY = 2
RENDEZVOUS_ONGOING = 3
RENDEZVOUS_CONNECTED = 4
class ChipDeviceController():
activeList = set()
def __init__(self, opCredsContext: ctypes.c_void_p, fabricId: int, fabricIndex: int, nodeId: int, useTestCommissioner: bool = False):
self.state = DCState.NOT_INITIALIZED
self.devCtrl = None
self._ChipStack = builtins.chipStack
self._dmLib = None
self._InitLib()
devCtrl = c_void_p(None)
res = self._ChipStack.Call(
lambda: self._dmLib.pychip_OpCreds_AllocateController(ctypes.c_void_p(
opCredsContext), pointer(devCtrl), fabricIndex, fabricId, nodeId, useTestCommissioner)
)
if res != 0:
raise self._ChipStack.ErrorToException(res)
self.devCtrl = devCtrl
self._Cluster = ChipClusters(builtins.chipStack)
self._Cluster.InitLib(self._dmLib)
def HandleKeyExchangeComplete(err):
if err != 0:
print("Failed to establish secure session to device: {}".format(err))
self._ChipStack.callbackRes = self._ChipStack.ErrorToException(
err)
else:
print("Established CASE with Device")
self.state = DCState.IDLE
self._ChipStack.completeEvent.set()
def HandleAddressUpdateComplete(nodeid, err):
if err != 0:
print("Failed to update node address: {}".format(err))
# Failed update address, don't wait for HandleCommissioningComplete
self.state = DCState.IDLE
self._ChipStack.callbackRes = err
self._ChipStack.completeEvent.set()
else:
print("Node address has been updated")
# Wait for HandleCommissioningComplete before setting
# self._ChipStack.callbackRes; we're not done until that happens.
def HandleCommissioningComplete(nodeid, err):
if err != 0:
print("Failed to commission: {}".format(err))
else:
print("Commissioning complete")
self.state = DCState.IDLE
self._ChipStack.callbackRes = err
self._ChipStack.completeEvent.set()
self._ChipStack.commissioningCompleteEvent.set()
self._ChipStack.commissioningEventRes = err
self.cbHandleKeyExchangeCompleteFunct = _DevicePairingDelegate_OnPairingCompleteFunct(
HandleKeyExchangeComplete)
self._dmLib.pychip_ScriptDevicePairingDelegate_SetKeyExchangeCallback(
self.devCtrl, self.cbHandleKeyExchangeCompleteFunct)
self.cbHandleCommissioningCompleteFunct = _DevicePairingDelegate_OnCommissioningCompleteFunct(
HandleCommissioningComplete)
self._dmLib.pychip_ScriptDevicePairingDelegate_SetCommissioningCompleteCallback(
self.devCtrl, self.cbHandleCommissioningCompleteFunct)
self.cbOnAddressUpdateComplete = _DeviceAddressUpdateDelegate_OnUpdateComplete(
HandleAddressUpdateComplete)
self._dmLib.pychip_ScriptDeviceAddressUpdateDelegate_SetOnAddressUpdateComplete(
self.cbOnAddressUpdateComplete)
self.state = DCState.IDLE
self.isActive = True
ChipDeviceController.activeList.add(self)
def Shutdown(self):
''' Shuts down this controller and reclaims any used resources, including the bound
C++ constructor instance in the SDK.
'''
if (self.isActive):
if self.devCtrl != None:
self._ChipStack.Call(
lambda: self._dmLib.pychip_DeviceController_DeleteDeviceController(
self.devCtrl)
)
self.devCtrl = None
ChipDeviceController.activeList.remove(self)
self.isActive = False
def ShutdownAll():
''' Shut down all active controllers and reclaim any used resources.
'''
#
# We want a shallow copy here since it would other create new instances
# of the controllers in the list.
#
# We need a copy since we're going to walk through the list and shutdown
# each controller, which in turn, will remove themselves from the active list.
#
# We cannot do that while iterating through the original list.
#
activeList = copy.copy(ChipDeviceController.activeList)
for controller in activeList:
controller.Shutdown()
ChipDeviceController.activeList.clear()
def CheckIsActive(self):
if (not self.isActive):
raise RuntimeError(
"DeviceCtrl instance was already shutdown previously!")
def __del__(self):
self.Shutdown()
def IsConnected(self):
self.CheckIsActive()
return self._ChipStack.Call(
lambda: self._dmLib.pychip_DeviceController_IsConnected(
self.devCtrl)
)
def ConnectBle(self, bleConnection):
self.CheckIsActive()
self._ChipStack.CallAsync(
lambda: self._dmLib.pychip_DeviceController_ValidateBTP(
self.devCtrl,
bleConnection,
self._ChipStack.cbHandleComplete,
self._ChipStack.cbHandleError,
)
)
def ConnectBLE(self, discriminator, setupPinCode, nodeid):
self.CheckIsActive()
self.state = DCState.RENDEZVOUS_ONGOING
self._ChipStack.CallAsync(
lambda: self._dmLib.pychip_DeviceController_ConnectBLE(
self.devCtrl, discriminator, setupPinCode, nodeid)
)
# Wait up to 5 additional seconds for the commissioning complete event
if not self._ChipStack.commissioningCompleteEvent.isSet():
self._ChipStack.commissioningCompleteEvent.wait(5.0)
if not self._ChipStack.commissioningCompleteEvent.isSet():
# Error 50 is a timeout
return False
return self._ChipStack.commissioningEventRes == 0
def CloseBLEConnection(self):
self.CheckIsActive()
return self._ChipStack.Call(
lambda: self._dmLib.pychip_DeviceCommissioner_CloseBleConnection(
self.devCtrl)
)
def CloseSession(self, nodeid):
self.CheckIsActive()
return self._ChipStack.Call(
lambda: self._dmLib.pychip_DeviceController_CloseSession(
self.devCtrl, nodeid)
)
def EstablishPASESessionIP(self, ipaddr, setupPinCode, nodeid):
self.CheckIsActive()
self.state = DCState.RENDEZVOUS_ONGOING
return self._ChipStack.CallAsync(
lambda: self._dmLib.pychip_DeviceController_EstablishPASESessionIP(
self.devCtrl, ipaddr, setupPinCode, nodeid)
)
def Commission(self, nodeid):
self.CheckIsActive()
self._ChipStack.CallAsync(
lambda: self._dmLib.pychip_DeviceController_Commission(
self.devCtrl, nodeid)
)
# Wait up to 5 additional seconds for the commissioning complete event
if not self._ChipStack.commissioningCompleteEvent.isSet():
self._ChipStack.commissioningCompleteEvent.wait(5.0)
if not self._ChipStack.commissioningCompleteEvent.isSet():
# Error 50 is a timeout
return False
return self._ChipStack.commissioningEventRes == 0
def GetTestCommissionerUsed(self):
return self._ChipStack.Call(
lambda: self._dmLib.pychip_TestCommissionerUsed()
)
def CommissionIP(self, ipaddr, setupPinCode, nodeid):
self.CheckIsActive()
# IP connection will run through full commissioning, so we need to wait
# for the commissioning complete event, not just any callback.
self.state = DCState.RENDEZVOUS_ONGOING
self._ChipStack.commissioningCompleteEvent.clear()
self._ChipStack.CallAsync(
lambda: self._dmLib.pychip_DeviceController_ConnectIP(
self.devCtrl, ipaddr, setupPinCode, nodeid)
)
# Wait up to 5 additional seconds for the commissioning complete event
if not self._ChipStack.commissioningCompleteEvent.isSet():
self._ChipStack.commissioningCompleteEvent.wait(5.0)
if not self._ChipStack.commissioningCompleteEvent.isSet():
# Error 50 is a timeout
return False
return self._ChipStack.commissioningEventRes == 0
def CommissionThread(self, discriminator, setupPinCode, nodeId, threadOperationalDataset: bytes):
''' Commissions a Thread device over BLE
'''
self.SetThreadOperationalDataset(threadOperationalDataset)
return self.ConnectBLE(discriminator, setupPinCode, nodeId)
def CommissionWiFi(self, discriminator, setupPinCode, nodeId, ssid, credentials):
''' Commissions a WiFi device over BLE
'''
self.SetWiFiCredentials(ssid, credentials)
return self.ConnectBLE(discriminator, setupPinCode, nodeId)
def SetWiFiCredentials(self, ssid, credentials):
self.CheckIsActive()
return self._ChipStack.Call(
lambda: self._dmLib.pychip_DeviceController_SetWiFiCredentials(
ssid, credentials)
)
def SetThreadOperationalDataset(self, threadOperationalDataset):
self.CheckIsActive()
return self._ChipStack.Call(
lambda: self._dmLib.pychip_DeviceController_SetThreadOperationalDataset(
threadOperationalDataset, len(threadOperationalDataset))
)
def ResolveNode(self, nodeid):
self.CheckIsActive()
return self._ChipStack.CallAsync(
lambda: self._dmLib.pychip_DeviceController_UpdateDevice(
self.devCtrl, nodeid)
)
def GetAddressAndPort(self, nodeid):
self.CheckIsActive()
address = create_string_buffer(64)
port = c_uint16(0)
error = self._ChipStack.Call(
lambda: self._dmLib.pychip_DeviceController_GetAddressAndPort(
self.devCtrl, nodeid, address, 64, pointer(port))
)
return (address.value.decode(), port.value) if error == 0 else None
def DiscoverCommissionableNodesLongDiscriminator(self, long_discriminator):
self.CheckIsActive()
return self._ChipStack.Call(
lambda: self._dmLib.pychip_DeviceController_DiscoverCommissionableNodesLongDiscriminator(
self.devCtrl, long_discriminator)
)
def DiscoverCommissionableNodesShortDiscriminator(self, short_discriminator):
self.CheckIsActive()
return self._ChipStack.Call(
lambda: self._dmLib.pychip_DeviceController_DiscoverCommissionableNodesShortDiscriminator(
self.devCtrl, short_discriminator)
)
def DiscoverCommissionableNodesVendor(self, vendor):
self.CheckIsActive()
return self._ChipStack.Call(
lambda: self._dmLib.pychip_DeviceController_DiscoverCommissionableNodesVendor(
self.devCtrl, vendor)
)
def DiscoverCommissionableNodesDeviceType(self, device_type):
self.CheckIsActive()
return self._ChipStack.Call(
lambda: self._dmLib.pychip_DeviceController_DiscoverCommissionableNodesDeviceType(
self.devCtrl, device_type)
)
def DiscoverCommissionableNodesCommissioningEnabled(self):
self.CheckIsActive()
return self._ChipStack.Call(
lambda: self._dmLib.pychip_DeviceController_DiscoverCommissionableNodesCommissioningEnabled(
self.devCtrl)
)
def PrintDiscoveredDevices(self):
self.CheckIsActive()
return self._ChipStack.Call(
lambda: self._dmLib.pychip_DeviceController_PrintDiscoveredDevices(
self.devCtrl)
)
def ParseQRCode(self, qrCode, output):
self.CheckIsActive()
print(output)
return self._ChipStack.Call(
lambda: self._dmLib.pychip_DeviceController_ParseQRCode(
qrCode, output)
)
def GetIPForDiscoveredDevice(self, idx, addrStr, length):
self.CheckIsActive()
return self._ChipStack.Call(
lambda: self._dmLib.pychip_DeviceController_GetIPForDiscoveredDevice(
self.devCtrl, idx, addrStr, length)
)
def DiscoverAllCommissioning(self):
self.CheckIsActive()
return self._ChipStack.Call(
lambda: self._dmLib.pychip_DeviceController_DiscoverAllCommissionableNodes(
self.devCtrl)
)
def OpenCommissioningWindow(self, nodeid, timeout, iteration, discriminator, option):
self.CheckIsActive()
res = self._ChipStack.Call(
lambda: self._dmLib.pychip_DeviceController_OpenCommissioningWindow(
self.devCtrl, nodeid, timeout, iteration, discriminator, option)
)
if res != 0:
raise self._ChipStack.ErrorToException(res)
def GetCompressedFabricId(self):
self.CheckIsActive()
fabricid = c_uint64(0)
res = self._ChipStack.Call(
lambda: self._dmLib.pychip_DeviceController_GetCompressedFabricId(
self.devCtrl, pointer(fabricid))
)
if res == 0:
return fabricid.value
else:
raise self._ChipStack.ErrorToException(res)
def GetFabricId(self):
self.CheckIsActive()
fabricid = c_uint64(0)
res = self._ChipStack.Call(
lambda: self._dmLib.pychip_DeviceController_GetFabricId(
self.devCtrl, pointer(fabricid))
)
if res == 0:
return fabricid.value
else:
raise self._ChipStack.ErrorToException(res)
def GetClusterHandler(self):
self.CheckIsActive()
return self._Cluster
def GetConnectedDeviceSync(self, nodeid):
self.CheckIsActive()
returnDevice = c_void_p(None)
deviceAvailableCV = threading.Condition()
@_DeviceAvailableFunct
def DeviceAvailableCallback(device, err):
nonlocal returnDevice
nonlocal deviceAvailableCV
with deviceAvailableCV:
returnDevice = c_void_p(device)
deviceAvailableCV.notify_all()
if err != 0:
print("Failed in getting the connected device: {}".format(err))
raise self._ChipStack.ErrorToException(err)
res = self._ChipStack.Call(lambda: self._dmLib.pychip_GetDeviceBeingCommissioned(
self.devCtrl, nodeid, byref(returnDevice)))
if res == 0:
# TODO: give users more contrtol over whether they want to send this command over a PASE established connection
print('Using PASE connection')
return returnDevice
res = self._ChipStack.Call(lambda: self._dmLib.pychip_GetConnectedDeviceByNodeId(
self.devCtrl, nodeid, DeviceAvailableCallback))
if res != 0:
raise self._ChipStack.ErrorToException(res)
# The callback might have been received synchronously (during self._ChipStack.Call()).
# Check if the device is already set before waiting for the callback.
if returnDevice.value is None:
with deviceAvailableCV:
deviceAvailableCV.wait()
if returnDevice.value is None:
raise self._ChipStack.ErrorToException(CHIP_ERROR_INTERNAL)
return returnDevice
async def SendCommand(self, nodeid: int, endpoint: int, payload: ClusterObjects.ClusterCommand, responseType=None, timedRequestTimeoutMs: int = None):
'''
Send a cluster-object encapsulated command to a node and get returned a future that can be awaited upon to receive the response.
If a valid responseType is passed in, that will be used to deserialize the object. If not, the type will be automatically deduced
from the metadata received over the wire.
timedWriteTimeoutMs: Timeout for a timed invoke request. Omit or set to 'None' to indicate a non-timed request.
'''
self.CheckIsActive()
eventLoop = asyncio.get_running_loop()
future = eventLoop.create_future()
device = self.GetConnectedDeviceSync(nodeid)
res = self._ChipStack.Call(
lambda: ClusterCommand.SendCommand(
future, eventLoop, responseType, device, ClusterCommand.CommandPath(
EndpointId=endpoint,
ClusterId=payload.cluster_id,
CommandId=payload.command_id,
), payload, timedRequestTimeoutMs=timedRequestTimeoutMs)
)
if res != 0:
future.set_exception(self._ChipStack.ErrorToException(res))
return await future
async def WriteAttribute(self, nodeid: int, attributes: typing.List[typing.Tuple[int, ClusterObjects.ClusterAttributeDescriptor]], timedRequestTimeoutMs: int = None):
'''
Write a list of attributes on a target node.
nodeId: Target's Node ID
timedWriteTimeoutMs: Timeout for a timed write request. Omit or set to 'None' to indicate a non-timed request.
attributes: A list of tuples of type (endpoint, cluster-object):
E.g
(1, Clusters.TestCluster.Attributes.XYZAttribute('hello')) -- Write 'hello' to the XYZ attribute on the test cluster to endpoint 1
'''
self.CheckIsActive()
eventLoop = asyncio.get_running_loop()
future = eventLoop.create_future()
device = self.GetConnectedDeviceSync(nodeid)
attrs = []
for v in attributes:
attrs.append(ClusterAttribute.AttributeWriteRequest(
v[0], v[1], v[1].value))
res = self._ChipStack.Call(
lambda: ClusterAttribute.WriteAttributes(
future, eventLoop, device, attrs, timedRequestTimeoutMs=timedRequestTimeoutMs)
)
if res != 0:
raise self._ChipStack.ErrorToException(res)
return await future
async def ReadAttribute(self, nodeid: int, attributes: typing.List[typing.Union[
None, # Empty tuple, all wildcard
typing.Tuple[int], # Endpoint
# Wildcard endpoint, Cluster id present
typing.Tuple[typing.Type[ClusterObjects.Cluster]],
# Wildcard endpoint, Cluster + Attribute present
typing.Tuple[typing.Type[ClusterObjects.ClusterAttributeDescriptor]],
# Wildcard attribute id
typing.Tuple[int, typing.Type[ClusterObjects.Cluster]],
# Concrete path
typing.Tuple[int, typing.Type[ClusterObjects.ClusterAttributeDescriptor]]
]], dataVersionFilters: typing.List[typing.Tuple[int, typing.Type[ClusterObjects.Cluster], int]] = None, returnClusterObject: bool = False, reportInterval: typing.Tuple[int, int] = None, fabricFiltered: bool = True):
'''
Read a list of attributes from a target node
nodeId: Target's Node ID
attributes: A list of tuples of varying types depending on the type of read being requested:
(endpoint, Clusters.ClusterA.AttributeA): Endpoint = specific, Cluster = specific, Attribute = specific
(endpoint, Clusters.ClusterA): Endpoint = specific, Cluster = specific, Attribute = *
(Clusters.ClusterA.AttributeA): Endpoint = *, Cluster = specific, Attribute = specific
endpoint: Endpoint = specific, Cluster = *, Attribute = *
Clusters.ClusterA: Endpoint = *, Cluster = specific, Attribute = *
'*' or (): Endpoint = *, Cluster = *, Attribute = *
The cluster and attributes specified above are to be selected from the generated cluster objects.
e.g.
ReadAttribute(1, [ 1 ] ) -- case 4 above.
ReadAttribute(1, [ Clusters.Basic ] ) -- case 5 above.
ReadAttribute(1, [ (1, Clusters.Basic.Attributes.Location ] ) -- case 1 above.
returnClusterObject: This returns the data as consolidated cluster objects, with all attributes for a cluster inside
a single cluster-wide cluster object.
reportInterval: A tuple of two int-s for (MinIntervalFloor, MaxIntervalCeiling). Used by establishing subscriptions.
When not provided, a read request will be sent.
'''
self.CheckIsActive()
eventLoop = asyncio.get_running_loop()
future = eventLoop.create_future()
device = self.GetConnectedDeviceSync(nodeid)
attrs = []
filters = []
for v in attributes:
endpoint = None
cluster = None
attribute = None
if v == ('*') or v == ():
# Wildcard
pass
elif type(v) is not tuple:
if type(v) is int:
endpoint = v
elif issubclass(v, ClusterObjects.Cluster):
cluster = v
elif issubclass(v, ClusterObjects.ClusterAttributeDescriptor):
attribute = v
else:
raise ValueError("Unsupported Attribute Path")
else:
# endpoint + (cluster) attribute / endpoint + cluster
endpoint = v[0]
if issubclass(v[1], ClusterObjects.Cluster):
cluster = v[1]
elif issubclass(v[1], ClusterAttribute.ClusterAttributeDescriptor):
attribute = v[1]
else:
raise ValueError("Unsupported Attribute Path")
attrs.append(ClusterAttribute.AttributePath(
EndpointId=endpoint, Cluster=cluster, Attribute=attribute))
if dataVersionFilters != None:
for v in dataVersionFilters:
endpoint = None
cluster = None
# endpoint + (cluster) attribute / endpoint + cluster
endpoint = v[0]
if issubclass(v[1], ClusterObjects.Cluster):
cluster = v[1]
else:
raise ValueError("Unsupported Cluster Path")
dataVersion = v[2]
filters.append(ClusterAttribute.DataVersionFilter(
EndpointId=endpoint, Cluster=cluster, DataVersion=dataVersion))
else:
filters = None
res = self._ChipStack.Call(
lambda: ClusterAttribute.ReadAttributes(future, eventLoop, device, self, attrs, filters, returnClusterObject, ClusterAttribute.SubscriptionParameters(reportInterval[0], reportInterval[1]) if reportInterval else None, fabricFiltered=fabricFiltered))
if res != 0:
raise self._ChipStack.ErrorToException(res)
return await future
async def ReadEvent(self, nodeid: int, events: typing.List[typing.Union[
None, # Empty tuple, all wildcard
typing.Tuple[int], # Endpoint
# Wildcard endpoint, Cluster id present
typing.Tuple[typing.Type[ClusterObjects.Cluster]],
# Wildcard endpoint, Cluster + Event present
typing.Tuple[typing.Type[ClusterObjects.ClusterEvent]],
# Wildcard event id
typing.Tuple[int, typing.Type[ClusterObjects.Cluster]],
# Concrete path
typing.Tuple[int, typing.Type[ClusterObjects.ClusterEvent]]
]], reportInterval: typing.Tuple[int, int] = None):
'''
Read a list of events from a target node
nodeId: Target's Node ID
events: A list of tuples of varying types depending on the type of read being requested:
(endpoint, Clusters.ClusterA.EventA): Endpoint = specific, Cluster = specific, Event = specific
(endpoint, Clusters.ClusterA): Endpoint = specific, Cluster = specific, Event = *
(Clusters.ClusterA.EventA): Endpoint = *, Cluster = specific, Event = specific
endpoint: Endpoint = specific, Cluster = *, Event = *
Clusters.ClusterA: Endpoint = *, Cluster = specific, Event = *
'*' or (): Endpoint = *, Cluster = *, Event = *
The cluster and events specified above are to be selected from the generated cluster objects.
e.g.
ReadEvent(1, [ 1 ] ) -- case 4 above.
ReadEvent(1, [ Clusters.Basic ] ) -- case 5 above.
ReadEvent(1, [ (1, Clusters.Basic.Events.Location ] ) -- case 1 above.
reportInterval: A tuple of two int-s for (MinIntervalFloor, MaxIntervalCeiling). Used by establishing subscriptions.
When not provided, a read request will be sent.
'''
self.CheckIsActive()
eventLoop = asyncio.get_running_loop()
future = eventLoop.create_future()
device = self.GetConnectedDeviceSync(nodeid)
eves = []
for v in events:
endpoint = None
cluster = None
event = None
if v in [('*'), ()]:
# Wildcard
pass
elif type(v) is not tuple:
print(type(v))
if type(v) is int:
endpoint = v
elif issubclass(v, ClusterObjects.Cluster):
cluster = v
elif issubclass(v, ClusterObjects.ClusterEvent):
event = v
else:
raise ValueError("Unsupported Event Path")
else:
# endpoint + (cluster) event / endpoint + cluster
endpoint = v[0]
if issubclass(v[1], ClusterObjects.Cluster):
cluster = v[1]
elif issubclass(v[1], ClusterAttribute.ClusterEvent):
event = v[1]
else:
raise ValueError("Unsupported Attribute Path")
eves.append(ClusterAttribute.EventPath(
EndpointId=endpoint, Cluster=cluster, Event=event))
res = self._ChipStack.Call(
lambda: ClusterAttribute.ReadEvents(future, eventLoop, device, self, eves, ClusterAttribute.SubscriptionParameters(reportInterval[0], reportInterval[1]) if reportInterval else None))
if res != 0:
raise self._ChipStack.ErrorToException(res)
outcome = await future
return await future
def ZCLSend(self, cluster, command, nodeid, endpoint, groupid, args, blocking=False):
self.CheckIsActive()
req = None
try:
req = eval(
f"GeneratedObjects.{cluster}.Commands.{command}")(**args)
except:
raise UnknownCommand(cluster, command)
try:
res = asyncio.run(self.SendCommand(nodeid, endpoint, req))
print(f"CommandResponse {res}")
return (0, res)
except InteractionModelError as ex:
return (int(ex.state), None)
def ZCLReadAttribute(self, cluster, attribute, nodeid, endpoint, groupid, blocking=True):
self.CheckIsActive()
req = None
clusterType = eval(f"GeneratedObjects.{cluster}")
try:
attributeType = eval(
f"GeneratedObjects.{cluster}.Attributes.{attribute}")
except:
raise UnknownAttribute(cluster, attribute)
result = asyncio.run(self.ReadAttribute(
nodeid, [(endpoint, attributeType)]))
path = ClusterAttribute.AttributePath(
EndpointId=endpoint, Attribute=attributeType)
return im.AttributeReadResult(path=im.AttributePath(nodeId=nodeid, endpointId=path.EndpointId, clusterId=path.ClusterId, attributeId=path.AttributeId), status=0, value=result[0][endpoint][clusterType][attributeType])
def ZCLWriteAttribute(self, cluster: str, attribute: str, nodeid, endpoint, groupid, value, blocking=True):
req = None
try:
req = eval(
f"GeneratedObjects.{cluster}.Attributes.{attribute}")(value)
except:
raise UnknownAttribute(cluster, attribute)
return asyncio.run(self.WriteAttribute(nodeid, [(endpoint, req)]))
def ZCLSubscribeAttribute(self, cluster, attribute, nodeid, endpoint, minInterval, maxInterval, blocking=True):
self.CheckIsActive()
req = None
try:
req = eval(f"GeneratedObjects.{cluster}.Attributes.{attribute}")
except:
raise UnknownAttribute(cluster, attribute)
return asyncio.run(self.ReadAttribute(nodeid, [(endpoint, req)], None, False, reportInterval=(minInterval, maxInterval)))
def ZCLCommandList(self):
self.CheckIsActive()
return self._Cluster.ListClusterCommands()
def ZCLAttributeList(self):
self.CheckIsActive()
return self._Cluster.ListClusterAttributes()
def SetLogFilter(self, category):
self.CheckIsActive()
if category < 0 or category > pow(2, 8):
raise ValueError("category must be an unsigned 8-bit integer")
self._ChipStack.Call(
lambda: self._dmLib.pychip_DeviceController_SetLogFilter(category)
)
def GetLogFilter(self):
self.CheckIsActive()
self._ChipStack.Call(
lambda: self._dmLib.pychip_DeviceController_GetLogFilter()
)
def SetBlockingCB(self, blockingCB):
self.CheckIsActive()
self._ChipStack.blockingCB = blockingCB
# ----- Private Members -----
def _InitLib(self):
if self._dmLib is None:
self._dmLib = CDLL(self._ChipStack.LocateChipDLL())
self._dmLib.pychip_DeviceController_DeleteDeviceController.argtypes = [
c_void_p]
self._dmLib.pychip_DeviceController_DeleteDeviceController.restype = c_uint32
self._dmLib.pychip_DeviceController_ConnectBLE.argtypes = [
c_void_p, c_uint16, c_uint32, c_uint64]
self._dmLib.pychip_DeviceController_ConnectBLE.restype = c_uint32
self._dmLib.pychip_DeviceController_ConnectIP.argtypes = [
c_void_p, c_char_p, c_uint32, c_uint64]
self._dmLib.pychip_DeviceController_SetThreadOperationalDataset.argtypes = [
c_char_p, c_uint32]
self._dmLib.pychip_DeviceController_SetThreadOperationalDataset.restype = c_uint32
self._dmLib.pychip_DeviceController_SetWiFiCredentials.argtypes = [
c_char_p, c_char_p]
self._dmLib.pychip_DeviceController_SetWiFiCredentials.restype = c_uint32
self._dmLib.pychip_DeviceController_Commission.argtypes = [
c_void_p, c_uint64]
self._dmLib.pychip_DeviceController_Commission.restype = c_uint32
self._dmLib.pychip_DeviceController_DiscoverAllCommissionableNodes.argtypes = [
c_void_p]
self._dmLib.pychip_DeviceController_DiscoverAllCommissionableNodes.restype = c_uint32
self._dmLib.pychip_DeviceController_DiscoverCommissionableNodesLongDiscriminator.argtypes = [
c_void_p, c_uint16]
self._dmLib.pychip_DeviceController_DiscoverCommissionableNodesLongDiscriminator.restype = c_uint32
self._dmLib.pychip_DeviceController_DiscoverCommissionableNodesShortDiscriminator.argtypes = [
c_void_p, c_uint16]
self._dmLib.pychip_DeviceController_DiscoverCommissionableNodesShortDiscriminator.restype = c_uint32
self._dmLib.pychip_DeviceController_DiscoverCommissionableNodesVendor.argtypes = [
c_void_p, c_uint16]
self._dmLib.pychip_DeviceController_DiscoverCommissionableNodesVendor.restype = c_uint32
self._dmLib.pychip_DeviceController_DiscoverCommissionableNodesDeviceType.argtypes = [
c_void_p, c_uint16]
self._dmLib.pychip_DeviceController_DiscoverCommissionableNodesDeviceType.restype = c_uint32
self._dmLib.pychip_DeviceController_DiscoverCommissionableNodesCommissioningEnabled.argtypes = [
c_void_p]
self._dmLib.pychip_DeviceController_DiscoverCommissionableNodesCommissioningEnabled.restype = c_uint32
self._dmLib.pychip_DeviceController_EstablishPASESessionIP.argtypes = [
c_void_p, c_char_p, c_uint32, c_uint64]
self._dmLib.pychip_DeviceController_EstablishPASESessionIP.restype = c_uint32
self._dmLib.pychip_DeviceController_DiscoverAllCommissionableNodes.argtypes = [
c_void_p]
self._dmLib.pychip_DeviceController_PrintDiscoveredDevices.argtypes = [
c_void_p]
self._dmLib.pychip_DeviceController_GetIPForDiscoveredDevice.argtypes = [
c_void_p, c_int, c_char_p, c_uint32]
self._dmLib.pychip_DeviceController_GetIPForDiscoveredDevice.restype = c_bool
self._dmLib.pychip_DeviceController_ConnectIP.argtypes = [
c_void_p, c_char_p, c_uint32, c_uint64]
self._dmLib.pychip_DeviceController_ConnectIP.restype = c_uint32
self._dmLib.pychip_DeviceController_CloseSession.argtypes = [
c_void_p, c_uint64]
self._dmLib.pychip_DeviceController_CloseSession.restype = c_uint32
self._dmLib.pychip_DeviceController_GetAddressAndPort.argtypes = [
c_void_p, c_uint64, c_char_p, c_uint64, POINTER(c_uint16)]
self._dmLib.pychip_DeviceController_GetAddressAndPort.restype = c_uint32
self._dmLib.pychip_ScriptDevicePairingDelegate_SetKeyExchangeCallback.argtypes = [
c_void_p, _DevicePairingDelegate_OnPairingCompleteFunct]
self._dmLib.pychip_ScriptDevicePairingDelegate_SetKeyExchangeCallback.restype = c_uint32
self._dmLib.pychip_ScriptDevicePairingDelegate_SetCommissioningCompleteCallback.argtypes = [
c_void_p, _DevicePairingDelegate_OnCommissioningCompleteFunct]
self._dmLib.pychip_ScriptDevicePairingDelegate_SetCommissioningCompleteCallback.restype = c_uint32
self._dmLib.pychip_ScriptDeviceAddressUpdateDelegate_SetOnAddressUpdateComplete.argtypes = [
_DeviceAddressUpdateDelegate_OnUpdateComplete]
self._dmLib.pychip_ScriptDeviceAddressUpdateDelegate_SetOnAddressUpdateComplete.restype = None
self._dmLib.pychip_DeviceController_UpdateDevice.argtypes = [
c_void_p, c_uint64]
self._dmLib.pychip_DeviceController_UpdateDevice.restype = c_uint32
self._dmLib.pychip_GetConnectedDeviceByNodeId.argtypes = [
c_void_p, c_uint64, _DeviceAvailableFunct]
self._dmLib.pychip_GetConnectedDeviceByNodeId.restype = c_uint32
self._dmLib.pychip_GetDeviceBeingCommissioned.argtypes = [
c_void_p, c_uint64, c_void_p]
self._dmLib.pychip_GetDeviceBeingCommissioned.restype = c_uint32
self._dmLib.pychip_DeviceCommissioner_CloseBleConnection.argtypes = [
c_void_p]
self._dmLib.pychip_DeviceCommissioner_CloseBleConnection.restype = c_uint32
self._dmLib.pychip_GetCommandSenderHandle.argtypes = [c_void_p]
self._dmLib.pychip_GetCommandSenderHandle.restype = c_uint64
self._dmLib.pychip_DeviceController_GetCompressedFabricId.argtypes = [
c_void_p, POINTER(c_uint64)]
self._dmLib.pychip_DeviceController_GetCompressedFabricId.restype = c_uint32
self._dmLib.pychip_DeviceController_OpenCommissioningWindow.argtypes = [
c_void_p, c_uint64, c_uint16, c_uint32, c_uint16, c_uint8]
self._dmLib.pychip_DeviceController_OpenCommissioningWindow.restype = c_uint32
self._dmLib.pychip_TestCommissionerUsed.argtypes = []
self._dmLib.pychip_TestCommissionerUsed.restype = c_bool