|  | # | 
|  | #    Copyright (c) 2020-2025 Project CHIP Authors | 
|  | #    Copyright (c) 2019-2020 Google, LLC. | 
|  | #    Copyright (c) 2013-2018 Nest Labs, Inc. | 
|  | #    All rights reserved. | 
|  | # | 
|  | #    Licensed under the Apache License, Version 2.0 (the "License"); | 
|  | #    you may not use this file except in compliance with the License. | 
|  | #    You may obtain a copy of the License at | 
|  | # | 
|  | #        http://www.apache.org/licenses/LICENSE-2.0 | 
|  | # | 
|  | #    Unless required by applicable law or agreed to in writing, software | 
|  | #    distributed under the License is distributed on an "AS IS" BASIS, | 
|  | #    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|  | #    See the License for the specific language governing permissions and | 
|  | #    limitations under the License. | 
|  | # | 
|  |  | 
|  | # | 
|  | #    @file | 
|  | #      Python interface for Chip Device Manager | 
|  | # | 
|  |  | 
|  | """Chip Device Controller interface | 
|  | """ | 
|  |  | 
|  | # Needed to use types in type hints before they are fully defined. | 
|  | from __future__ import absolute_import, annotations, print_function | 
|  |  | 
|  | import asyncio | 
|  | import builtins | 
|  | import concurrent.futures | 
|  | import copy | 
|  | import ctypes | 
|  | import enum | 
|  | import json | 
|  | import logging | 
|  | import secrets | 
|  | import threading | 
|  | import typing | 
|  | from ctypes import (CDLL, CFUNCTYPE, POINTER, Structure, byref, c_bool, c_char, c_char_p, c_int, c_int32, c_size_t, c_uint8, | 
|  | c_uint16, c_uint32, c_uint64, c_void_p, cast, create_string_buffer, pointer, py_object, string_at) | 
|  | from dataclasses import dataclass | 
|  |  | 
|  | import dacite  # type: ignore | 
|  |  | 
|  | from . import FabricAdmin | 
|  | from . import clusters as Clusters | 
|  | from . import discovery | 
|  | from .bdx import Bdx | 
|  | from .clusters import Attribute as ClusterAttribute | 
|  | from .clusters import ClusterObjects as ClusterObjects | 
|  | from .clusters import Command as ClusterCommand | 
|  | from .clusters.CHIPClusters import ChipClusters | 
|  | from .crypto import p256keypair | 
|  | from .interaction_model import SessionParameters, SessionParametersStruct | 
|  | from .native import PyChipError | 
|  |  | 
|  | __all__ = ["ChipDeviceController", "CommissioningParameters", | 
|  | "AttributeReadRequest", "AttributeReadRequestList", "SubscriptionTargetList"] | 
|  |  | 
|  | # Type aliases for ReadAttribute method to improve type safety | 
|  | AttributeReadRequest = typing.Union[ | 
|  | None,  # Empty tuple, all wildcard | 
|  | typing.Tuple[int],  # Endpoint | 
|  | # Wildcard endpoint, Cluster id present | 
|  | typing.Tuple[typing.Type[ClusterObjects.Cluster]], | 
|  | # Wildcard endpoint, Cluster + Attribute present | 
|  | typing.Tuple[typing.Type[ClusterObjects.ClusterAttributeDescriptor]], | 
|  | typing.Tuple[int, typing.Type[ClusterObjects.Cluster] | 
|  | ],  # Wildcard attribute id | 
|  | # Concrete path | 
|  | typing.Tuple[int, typing.Type[ClusterObjects.ClusterAttributeDescriptor]], | 
|  | ClusterAttribute.TypedAttributePath  # Directly specified attribute path | 
|  | ] | 
|  |  | 
|  | AttributeReadRequestList = typing.Optional[typing.List[AttributeReadRequest]] | 
|  |  | 
|  | # Type alias for subscription target specifications | 
|  | SubscriptionTargetList = typing.List[typing.Tuple[int, | 
|  | typing.Union[ClusterObjects.Cluster, ClusterObjects.ClusterAttributeDescriptor]]] | 
|  |  | 
|  |  | 
|  | # Defined in $CHIP_ROOT/src/lib/core/CHIPError.h | 
|  | CHIP_ERROR_TIMEOUT: int = 50 | 
|  |  | 
|  | _RCACCallbackType = CFUNCTYPE(None, POINTER(c_uint8), c_size_t) | 
|  |  | 
|  | LOGGER = logging.getLogger(__name__) | 
|  |  | 
|  | _DevicePairingDelegate_OnPairingCompleteFunct = CFUNCTYPE(None, PyChipError) | 
|  | _DeviceUnpairingCompleteFunct = CFUNCTYPE(None, c_uint64, PyChipError) | 
|  | _DevicePairingDelegate_OnCommissioningCompleteFunct = CFUNCTYPE( | 
|  | None, c_uint64, PyChipError) | 
|  | _DevicePairingDelegate_OnOpenWindowCompleteFunct = CFUNCTYPE( | 
|  | None, c_uint64, c_uint32, c_char_p, c_char_p, PyChipError) | 
|  | _DevicePairingDelegate_OnCommissioningStatusUpdateFunct = CFUNCTYPE( | 
|  | None, c_uint64, c_uint8, PyChipError) | 
|  | _DevicePairingDelegate_OnFabricCheckFunct = CFUNCTYPE( | 
|  | None, c_uint64) | 
|  | # void (*)(Device *, CHIP_ERROR). | 
|  | # | 
|  | # CHIP_ERROR is actually signed, so using c_uint32 is weird, but everything | 
|  | # else seems to do it. | 
|  | _DeviceAvailableCallbackFunct = CFUNCTYPE(None, py_object, c_void_p, PyChipError) | 
|  |  | 
|  | _IssueNOCChainCallbackPythonCallbackFunct = CFUNCTYPE( | 
|  | None, py_object, PyChipError, c_void_p, c_size_t, c_void_p, c_size_t, c_void_p, c_size_t, c_void_p, c_size_t, c_uint64) | 
|  |  | 
|  | _ChipDeviceController_IterateDiscoveredCommissionableNodesFunct = CFUNCTYPE(None, c_char_p, c_size_t) | 
|  |  | 
|  | # Defines for the transport payload types to use to select the suitable | 
|  | # underlying transport of the session. | 
|  | # class TransportPayloadCapability(ctypes.c_int): | 
|  |  | 
|  |  | 
|  | class TransportPayloadCapability(ctypes.c_int): | 
|  | MRP_PAYLOAD = 0 | 
|  | LARGE_PAYLOAD = 1 | 
|  | MRP_OR_TCP_PAYLOAD = 2 | 
|  |  | 
|  |  | 
|  | @dataclass | 
|  | class CommissioningParameters: | 
|  | setupPinCode: int | 
|  | setupManualCode: str | 
|  | setupQRCode: str | 
|  |  | 
|  |  | 
|  | @dataclass | 
|  | class NOCChain: | 
|  | nocBytes: typing.Optional[bytes] | 
|  | icacBytes: typing.Optional[bytes] | 
|  | rcacBytes: typing.Optional[bytes] | 
|  | ipkBytes: typing.Optional[bytes] | 
|  | adminSubject: int | 
|  |  | 
|  |  | 
|  | @dataclass | 
|  | class ICDRegistrationParameters: | 
|  | symmetricKey: typing.Optional[bytes] | 
|  | checkInNodeId: typing.Optional[int] | 
|  | monitoredSubject: typing.Optional[int] | 
|  | stayActiveMs: typing.Optional[int] | 
|  | clientType: typing.Optional[Clusters.IcdManagement.Enums.ClientTypeEnum] | 
|  |  | 
|  | class CStruct(Structure): | 
|  | _fields_ = [('symmetricKey', c_char_p), ('symmetricKeyLength', c_size_t), ('checkInNodeId', | 
|  | c_uint64), ('monitoredSubject', c_uint64), ('stayActiveMsec', c_uint32), ('clientType', c_uint8)] | 
|  |  | 
|  | def to_c(self): | 
|  | return ICDRegistrationParameters.CStruct(self.symmetricKey, len(self.symmetricKey), self.checkInNodeId, self.monitoredSubject, self.stayActiveMs, self.clientType.value) | 
|  |  | 
|  |  | 
|  | @_DeviceAvailableCallbackFunct | 
|  | def _DeviceAvailableCallback(closure, device, err): | 
|  | closure.deviceAvailable(device, err) | 
|  |  | 
|  |  | 
|  | @_IssueNOCChainCallbackPythonCallbackFunct | 
|  | def _IssueNOCChainCallbackPythonCallback(devCtrl, status: PyChipError, noc: c_void_p, nocLen: int, icac: c_void_p, | 
|  | icacLen: int, rcac: c_void_p, rcacLen: int, ipk: c_void_p, ipkLen: int, adminSubject: int): | 
|  |  | 
|  | nocChain = NOCChain(None, None, None, None, 0) | 
|  | if status.is_success: | 
|  | nocBytes = None | 
|  | if nocLen > 0: | 
|  | nocBytes = string_at(noc, nocLen)[:] | 
|  | icacBytes = None | 
|  | if icacLen > 0: | 
|  | icacBytes = string_at(icac, icacLen)[:] | 
|  | rcacBytes = None | 
|  | if rcacLen > 0: | 
|  | rcacBytes = string_at(rcac, rcacLen)[:] | 
|  | ipkBytes = None | 
|  | if ipkLen > 0: | 
|  | ipkBytes = string_at(ipk, ipkLen)[:] | 
|  | nocChain = NOCChain(nocBytes, icacBytes, rcacBytes, ipkBytes, adminSubject) | 
|  | else: | 
|  | LOGGER.error(f"Failure to generate NOC Chain: {status}. All NOCChain field will be None and commissioning will fail!") | 
|  | devCtrl.NOCChainCallback(nocChain) | 
|  |  | 
|  |  | 
|  | # Methods for ICD | 
|  | class ScopedNodeId(Structure): | 
|  | _fields_ = [("nodeId", c_uint64), ("fabricIndex", c_uint8)] | 
|  |  | 
|  | def __hash__(self): | 
|  | return self.nodeId << 8 | self.fabricIndex | 
|  |  | 
|  | def __str__(self): | 
|  | return f"({self.fabricIndex}:{self.nodeId:16x})" | 
|  |  | 
|  | def __eq__(self, other): | 
|  | return self.nodeId == other.nodeId and self.fabricIndex == other.fabricIndex | 
|  |  | 
|  |  | 
|  | _OnCheckInCompleteFunct = CFUNCTYPE(None, ScopedNodeId) | 
|  |  | 
|  | _OnCheckInCompleteWaitListLock = threading.Lock() | 
|  | _OnCheckInCompleteWaitList: typing.Dict[ScopedNodeId, set] = dict() | 
|  |  | 
|  |  | 
|  | @_OnCheckInCompleteFunct | 
|  | def _OnCheckInComplete(scopedNodeId: ScopedNodeId): | 
|  | callbacks = [] | 
|  | with _OnCheckInCompleteWaitListLock: | 
|  | callbacks = list(_OnCheckInCompleteWaitList.get(scopedNodeId, set())) | 
|  |  | 
|  | for callback in callbacks: | 
|  | callback(scopedNodeId) | 
|  |  | 
|  |  | 
|  | def RegisterOnActiveCallback(scopedNodeId: ScopedNodeId, callback: typing.Callable[[ScopedNodeId], None]): | 
|  | ''' Registers a callback when the device with given (fabric index, node id) becomes active. | 
|  |  | 
|  | Does nothing if the callback is already registered. | 
|  | ''' | 
|  | with _OnCheckInCompleteWaitListLock: | 
|  | waitList = _OnCheckInCompleteWaitList.get(scopedNodeId, set()) | 
|  | waitList.add(callback) | 
|  | _OnCheckInCompleteWaitList[scopedNodeId] = waitList | 
|  |  | 
|  |  | 
|  | def UnregisterOnActiveCallback(scopedNodeId: ScopedNodeId, callback: typing.Callable[[ScopedNodeId], None]): | 
|  | ''' Unregisters a callback when the device with given (fabric index, node id) becomes active. | 
|  |  | 
|  | Does nothing if the callback has not been registered. | 
|  | ''' | 
|  | with _OnCheckInCompleteWaitListLock: | 
|  | _OnCheckInCompleteWaitList.get(scopedNodeId, set()).remove(callback) | 
|  |  | 
|  |  | 
|  | async def WaitForCheckIn(scopedNodeId: ScopedNodeId, timeoutSeconds: float): | 
|  | ''' Waits for a device becomes active. | 
|  |  | 
|  | Returns: | 
|  | - A future, completes when the device becomes active. | 
|  | ''' | 
|  | eventLoop = asyncio.get_running_loop() | 
|  | future = eventLoop.create_future() | 
|  |  | 
|  | def OnCheckInCallback(nodeid): | 
|  | eventLoop.call_soon_threadsafe(lambda: future.done() or future.set_result(None)) | 
|  |  | 
|  | RegisterOnActiveCallback(scopedNodeId, OnCheckInCallback) | 
|  |  | 
|  | try: | 
|  | await asyncio.wait_for(future, timeout=timeoutSeconds) | 
|  | finally: | 
|  | UnregisterOnActiveCallback(scopedNodeId, OnCheckInCallback) | 
|  |  | 
|  | # This is a fix for WEAV-429. Jay Logue recommends revisiting this at a later | 
|  | # date to allow for truly multiple instances so this is temporary. | 
|  |  | 
|  |  | 
|  | def _singleton(cls): | 
|  | instance = [None] | 
|  |  | 
|  | def wrapper(*args, **kwargs): | 
|  | if instance[0] is None: | 
|  | instance[0] = cls(*args, **kwargs) | 
|  | return instance[0] | 
|  |  | 
|  | return wrapper | 
|  |  | 
|  |  | 
|  | class CallbackContext: | 
|  | """A context manager for handling callbacks that are expected to be called exactly once. | 
|  |  | 
|  | The context manager makes sure that no concurrent operations which use the same callback | 
|  | handlers are executed. | 
|  | """ | 
|  |  | 
|  | def __init__(self, lock: asyncio.Lock) -> None: | 
|  | self._lock = lock | 
|  | self._future = None | 
|  |  | 
|  | async def __aenter__(self): | 
|  | await self._lock.acquire() | 
|  | self._future = concurrent.futures.Future() | 
|  | return self | 
|  |  | 
|  | @property | 
|  | def future(self) -> typing.Optional[concurrent.futures.Future]: | 
|  | return self._future | 
|  |  | 
|  | async def __aexit__(self, exc_type, exc_value, traceback): | 
|  | if not self._future.done(): | 
|  | # In case the initial call (which sets up for the callback) fails, | 
|  | # the future will never be used actually. So just cancel it here | 
|  | # for completeness, in case somebody is expecting it to be completed. | 
|  | self._future.cancel() | 
|  | self._future = None | 
|  | self._lock.release() | 
|  |  | 
|  |  | 
|  | class CommissioningContext(CallbackContext): | 
|  | """A context manager for handling commissioning callbacks that are expected to be called exactly once. | 
|  |  | 
|  | This context also resets commissioning related device controller state. | 
|  | """ | 
|  |  | 
|  | def __init__(self, devCtrl: ChipDeviceControllerBase, lock: asyncio.Lock) -> None: | 
|  | super().__init__(lock) | 
|  | self._devCtrl = devCtrl | 
|  |  | 
|  | async def __aenter__(self): | 
|  | await super().__aenter__() | 
|  | self._devCtrl._fabricCheckNodeId = -1 | 
|  | return self | 
|  |  | 
|  | async def __aexit__(self, exc_type, exc_value, traceback): | 
|  | await super().__aexit__(exc_type, exc_value, traceback) | 
|  |  | 
|  |  | 
|  | class CommissionableNode(discovery.CommissionableNode): | 
|  | def SetDeviceController(self, devCtrl: 'ChipDeviceController'): | 
|  | self._devCtrl = devCtrl | 
|  |  | 
|  | def Commission(self, nodeId: int, setupPinCode: int) -> int: | 
|  | ''' Commission the device using the device controller discovered this device. | 
|  |  | 
|  | nodeId: The nodeId commissioned to the device | 
|  | setupPinCode: The setup pin code of the device | 
|  |  | 
|  | Returns: | 
|  | int: Effective Node ID of the device (as defined by the assigned NOC) | 
|  | ''' | 
|  | # mypy errors ignored due to coroutine returned without await. | 
|  | # Fixing this typing error risks affecting existing functionality. | 
|  | # TODO:  Explore proper typing for dynamic attributes in ChipDeviceCtrl.py #618 | 
|  | return self._devCtrl.CommissionOnNetwork(  # type: ignore[return-value] | 
|  | nodeId, setupPinCode, filterType=discovery.FilterType.INSTANCE_NAME, filter=self.instanceName) | 
|  |  | 
|  | def __rich_repr__(self): | 
|  | yield "(To Be Commissioned By)", self._devCtrl.name | 
|  |  | 
|  | for k in self.__dataclass_fields__.keys(): | 
|  | if k in self.__dict__: | 
|  | yield k, self.__dict__[k] | 
|  |  | 
|  |  | 
|  | class DeviceProxyWrapper(): | 
|  | ''' Encapsulates a pointer to OperationalDeviceProxy on the c++ side that needs to be | 
|  | freed when DeviceProxyWrapper goes out of scope. There is a potential issue where | 
|  | if this is copied around that a double free will occur, but how this is used today | 
|  | that is not an issue that needs to be accounted for and it will become very apparent | 
|  | if that happens. | 
|  | ''' | 
|  | class DeviceProxyType(enum.Enum): | 
|  | OPERATIONAL = enum.auto() | 
|  | COMMISSIONEE = enum.auto() | 
|  |  | 
|  | def __init__(self, deviceProxy: ctypes.c_void_p, proxyType, dmLib=None): | 
|  | self._deviceProxy = deviceProxy | 
|  | self._dmLib = dmLib | 
|  | self._proxyType = proxyType | 
|  |  | 
|  | def __del__(self): | 
|  | # Commissionee device proxies are owned by the DeviceCommissioner. See #33031 | 
|  | if (self._proxyType == self.DeviceProxyType.OPERATIONAL and self._dmLib is not None and hasattr(builtins, 'chipStack') and builtins.chipStack is not None): | 
|  | # This destructor is called from any threading context, including on the Matter threading context. | 
|  | # So, we cannot call chipStack.Call or chipStack.CallAsyncWithCompleteCallback which waits for the posted work to | 
|  | # actually be executed. Instead, we just post/schedule the work and move on. | 
|  | builtins.chipStack.PostTaskOnChipThread(lambda: self._dmLib.pychip_FreeOperationalDeviceProxy(self._deviceProxy)) | 
|  |  | 
|  | @property | 
|  | def deviceProxy(self) -> ctypes.c_void_p: | 
|  | return self._deviceProxy | 
|  |  | 
|  | @property | 
|  | def localSessionId(self) -> int: | 
|  | self._dmLib.pychip_GetLocalSessionId.argtypes = [ctypes.c_void_p, POINTER(ctypes.c_uint16)] | 
|  | self._dmLib.pychip_GetLocalSessionId.restype = PyChipError | 
|  |  | 
|  | localSessionId = ctypes.c_uint16(0) | 
|  |  | 
|  | builtins.chipStack.Call(        # type: ignore[attr-defined]  # 'chipStack' is dynamically added; referred to in DeviceProxyWrapper class __del__ method | 
|  | lambda: self._dmLib.pychip_GetLocalSessionId(self._deviceProxy, pointer(localSessionId)) | 
|  | ).raise_on_error() | 
|  |  | 
|  | return localSessionId.value | 
|  |  | 
|  | @property | 
|  | def numTotalSessions(self) -> int: | 
|  | self._dmLib.pychip_GetNumSessionsToPeer.argtypes = [ctypes.c_void_p, POINTER(ctypes.c_uint32)] | 
|  | self._dmLib.pychip_GetNumSessionsToPeer.restype = PyChipError | 
|  |  | 
|  | numSessions = ctypes.c_uint32(0) | 
|  |  | 
|  | builtins.chipStack.Call(        # type: ignore[attr-defined]  # 'chipStack' is dynamically added; referred to in DeviceProxyWrapper class __del__ method | 
|  | lambda: self._dmLib.pychip_GetNumSessionsToPeer(self._deviceProxy, pointer(numSessions)) | 
|  | ).raise_on_error() | 
|  |  | 
|  | return numSessions.value | 
|  |  | 
|  | @property | 
|  | def attestationChallenge(self) -> bytes: | 
|  | self._dmLib.pychip_GetAttestationChallenge.argtypes = (c_void_p, POINTER(c_uint8), POINTER(c_size_t)) | 
|  | self._dmLib.pychip_GetAttestationChallenge.restype = PyChipError | 
|  |  | 
|  | size = 64 | 
|  | buf = (ctypes.c_uint8 * size)() | 
|  | csize = ctypes.c_size_t(size) | 
|  | builtins.chipStack.Call(        # type: ignore[attr-defined]  # 'chipStack' is dynamically added; referred to in DeviceProxyWrapper class __del__ method | 
|  | lambda: self._dmLib.pychip_GetAttestationChallenge(self._deviceProxy, buf, ctypes.byref(csize)) | 
|  | ).raise_on_error() | 
|  |  | 
|  | return bytes(buf[:csize.value]) | 
|  |  | 
|  | @property | 
|  | def sessionAllowsLargePayload(self) -> bool: | 
|  | self._dmLib.pychip_SessionAllowsLargePayload.argtypes = [ctypes.c_void_p, POINTER(ctypes.c_bool)] | 
|  | self._dmLib.pychip_SessionAllowsLargePayload.restype = PyChipError | 
|  |  | 
|  | supportsLargePayload = ctypes.c_bool(False) | 
|  |  | 
|  | builtins.chipStack.Call(        # type: ignore[attr-defined]  # 'chipStack' is dynamically added; referred to in DeviceProxyWrapper class __del__ method | 
|  | lambda: self._dmLib.pychip_SessionAllowsLargePayload(self._deviceProxy, pointer(supportsLargePayload)) | 
|  | ).raise_on_error() | 
|  |  | 
|  | return supportsLargePayload.value | 
|  |  | 
|  | @property | 
|  | def isSessionOverTCPConnection(self) -> bool: | 
|  | self._dmLib.pychip_IsSessionOverTCPConnection.argtypes = [ctypes.c_void_p, POINTER(ctypes.c_bool)] | 
|  | self._dmLib.pychip_IsSessionOverTCPConnection.restype = PyChipError | 
|  |  | 
|  | isSessionOverTCP = ctypes.c_bool(False) | 
|  |  | 
|  | builtins.chipStack.Call(        # type: ignore[attr-defined]  # 'chipStack' is dynamically added; referred to in DeviceProxyWrapper class __del__ method | 
|  | lambda: self._dmLib.pychip_IsSessionOverTCPConnection(self._deviceProxy, pointer(isSessionOverTCP)) | 
|  | ).raise_on_error() | 
|  |  | 
|  | return isSessionOverTCP.value | 
|  |  | 
|  | @property | 
|  | def isActiveSession(self) -> bool: | 
|  | self._dmLib.pychip_IsActiveSession.argtypes = [ctypes.c_void_p, POINTER(ctypes.c_bool)] | 
|  | self._dmLib.pychip_IsActiveSession.restype = PyChipError | 
|  |  | 
|  | isActiveSession = ctypes.c_bool(False) | 
|  |  | 
|  | builtins.chipStack.Call(        # type: ignore[attr-defined]  # 'chipStack' is dynamically added; referred to in DeviceProxyWrapper class __del__ method | 
|  | lambda: self._dmLib.pychip_IsActiveSession(self._deviceProxy, pointer(isActiveSession)) | 
|  | ).raise_on_error() | 
|  |  | 
|  | return isActiveSession.value | 
|  |  | 
|  | def closeTCPConnectionWithPeer(self): | 
|  | self._dmLib.pychip_CloseTCPConnectionWithPeer.argtypes = [ctypes.c_void_p] | 
|  | self._dmLib.pychip_CloseTCPConnectionWithPeer.restype = PyChipError | 
|  |  | 
|  | builtins.chipStack.Call( | 
|  | lambda: self._dmLib.pychip_CloseTCPConnectionWithPeer(self._deviceProxy) | 
|  | ).raise_on_error() | 
|  |  | 
|  |  | 
|  | DiscoveryFilterType: typing.TypeAlias = discovery.FilterType | 
|  | DiscoveryType: typing.TypeAlias = discovery.DiscoveryType | 
|  |  | 
|  |  | 
|  | class ChipDeviceControllerBase(): | 
|  | activeList: typing.Set = set() | 
|  |  | 
|  | def __init__(self, name: str = ''): | 
|  | self.devCtrl = None | 
|  | # 'chipStack' is dynamically added; referred to in DeviceProxyWrapper class __del__ method | 
|  | self._ChipStack = builtins.chipStack  # type: ignore[attr-defined] | 
|  | self._dmLib: typing.Any = None | 
|  | self._isActive = False | 
|  |  | 
|  | self._InitLib() | 
|  |  | 
|  | pairingDelegate = c_void_p(None) | 
|  | devCtrl = c_void_p(None) | 
|  |  | 
|  | self.pairingDelegate = pairingDelegate | 
|  | self.devCtrl = devCtrl | 
|  | self.name = name | 
|  | self._fabricCheckNodeId = -1 | 
|  |  | 
|  | # 'chipStack' is dynamically added; referred to in DeviceProxyWrapper class __del__ method | 
|  | self._Cluster = ChipClusters(builtins.chipStack)  # type: ignore[attr-defined] | 
|  | self._commissioning_lock: asyncio.Lock = asyncio.Lock() | 
|  | self._commissioning_context: CommissioningContext = CommissioningContext(self, self._commissioning_lock) | 
|  | self._open_window_context: CallbackContext = CallbackContext(asyncio.Lock()) | 
|  | self._unpair_device_context: CallbackContext = CallbackContext(asyncio.Lock()) | 
|  | self._pase_establishment_context: CallbackContext = CallbackContext(self._commissioning_lock) | 
|  |  | 
|  | def _set_dev_ctrl(self, devCtrl, pairingDelegate): | 
|  | def HandleCommissioningComplete(nodeId: int, err: PyChipError): | 
|  | if err.is_success: | 
|  | LOGGER.info("Commissioning complete") | 
|  | else: | 
|  | LOGGER.warning("Failed to commission: {}".format(err)) | 
|  |  | 
|  | self._dmLib.pychip_DeviceController_SetIcdRegistrationParameters(False, None) | 
|  |  | 
|  | if self._dmLib.pychip_TestCommissionerUsed(): | 
|  | err = self._dmLib.pychip_GetCompletionError() | 
|  |  | 
|  | if self._commissioning_context.future is None: | 
|  | LOGGER.exception("HandleCommissioningComplete called unexpectedly") | 
|  | return | 
|  |  | 
|  | if err.is_success: | 
|  | self._commissioning_context.future.set_result(nodeId) | 
|  |  | 
|  | else: | 
|  | self._commissioning_context.future.set_exception(err.to_exception()) | 
|  |  | 
|  | def HandleFabricCheck(nodeId): | 
|  | self._fabricCheckNodeId = nodeId | 
|  |  | 
|  | def HandleOpenWindowComplete(nodeid: int, setupPinCode: int, setupManualCode: bytes, | 
|  | setupQRCode: bytes, err: PyChipError) -> None: | 
|  | if err.is_success: | 
|  | LOGGER.info("Open Commissioning Window complete setting node ID 0x%016X pincode to %d", nodeid, setupPinCode) | 
|  | commissioningParameters = CommissioningParameters( | 
|  | setupPinCode=setupPinCode, setupManualCode=setupManualCode.decode(), setupQRCode=setupQRCode.decode()) | 
|  | else: | 
|  | LOGGER.warning("Failed to open commissioning window: {}".format(err)) | 
|  |  | 
|  | if self._open_window_context.future is None: | 
|  | LOGGER.exception("HandleOpenWindowComplete called unexpectedly") | 
|  | return | 
|  |  | 
|  | if err.is_success: | 
|  | self._open_window_context.future.set_result(commissioningParameters) | 
|  | else: | 
|  | self._open_window_context.future.set_exception(err.to_exception()) | 
|  |  | 
|  | def HandleUnpairDeviceComplete(nodeid: int, err: PyChipError): | 
|  | if err.is_success: | 
|  | LOGGER.info("Successfully unpaired device with node ID 0x%016X", nodeid) | 
|  | else: | 
|  | LOGGER.warning("Failed to unpair device: {}".format(err)) | 
|  |  | 
|  | if self._unpair_device_context.future is None: | 
|  | LOGGER.exception("HandleUnpairDeviceComplete called unexpectedly") | 
|  | return | 
|  |  | 
|  | if err.is_success: | 
|  | self._unpair_device_context.future.set_result(None) | 
|  | else: | 
|  | self._unpair_device_context.future.set_exception(err.to_exception()) | 
|  |  | 
|  | def HandlePASEEstablishmentComplete(err: PyChipError): | 
|  | if not err.is_success: | 
|  | LOGGER.warning("Failed to establish secure session to device: {}".format(err)) | 
|  | else: | 
|  | LOGGER.info("Established secure session with Device") | 
|  |  | 
|  | if self._commissioning_context.future is not None: | 
|  | # During Commissioning, HandlePASEEstablishmentComplete will also be called. | 
|  | # Only complete the future if PASE session establishment failed. | 
|  | if not err.is_success: | 
|  | self._commissioning_context.future.set_exception(err.to_exception()) | 
|  | return | 
|  |  | 
|  | if self._pase_establishment_context.future is None: | 
|  | LOGGER.exception("HandlePASEEstablishmentComplete called unexpectedly") | 
|  | return | 
|  |  | 
|  | if err.is_success: | 
|  | self._pase_establishment_context.future.set_result(None) | 
|  | else: | 
|  | self._pase_establishment_context.future.set_exception(err.to_exception()) | 
|  |  | 
|  | self.pairingDelegate = pairingDelegate | 
|  | self.devCtrl = devCtrl | 
|  |  | 
|  | self.cbHandlePASEEstablishmentCompleteFunct = _DevicePairingDelegate_OnPairingCompleteFunct( | 
|  | HandlePASEEstablishmentComplete) | 
|  | self._dmLib.pychip_ScriptDevicePairingDelegate_SetKeyExchangeCallback( | 
|  | self.pairingDelegate, self.cbHandlePASEEstablishmentCompleteFunct) | 
|  |  | 
|  | self.cbHandleCommissioningCompleteFunct = _DevicePairingDelegate_OnCommissioningCompleteFunct( | 
|  | HandleCommissioningComplete) | 
|  |  | 
|  | self._dmLib.pychip_ScriptDevicePairingDelegate_SetCommissioningCompleteCallback( | 
|  | self.pairingDelegate, self.cbHandleCommissioningCompleteFunct) | 
|  |  | 
|  | self.cbHandleFabricCheckFunct = _DevicePairingDelegate_OnFabricCheckFunct(HandleFabricCheck) | 
|  | self._dmLib.pychip_ScriptDevicePairingDelegate_SetFabricCheckCallback(self.pairingDelegate, self.cbHandleFabricCheckFunct) | 
|  |  | 
|  | self.cbHandleOpenWindowCompleteFunct = _DevicePairingDelegate_OnOpenWindowCompleteFunct( | 
|  | HandleOpenWindowComplete) | 
|  | self._dmLib.pychip_ScriptDevicePairingDelegate_SetOpenWindowCompleteCallback( | 
|  | self.pairingDelegate, self.cbHandleOpenWindowCompleteFunct) | 
|  |  | 
|  | self.cbHandleDeviceUnpairCompleteFunct = _DeviceUnpairingCompleteFunct(HandleUnpairDeviceComplete) | 
|  |  | 
|  | self._isActive = True | 
|  | # Validate FabricID/NodeID followed from NOC Chain | 
|  | self._fabricId = self.GetFabricIdInternal() | 
|  | self._fabricIndex = self.GetFabricIndexInternal() | 
|  | self._nodeId = self.GetNodeIdInternal() | 
|  | self._rootPublicKeyBytes = self.GetRootPublicKeyBytesInternal() | 
|  |  | 
|  | def _finish_init(self): | 
|  | self._isActive = True | 
|  |  | 
|  | ChipDeviceController.activeList.add(self) | 
|  |  | 
|  | def _enablePairingCompleteCallback(self, value: bool): | 
|  | self._dmLib.pychip_ScriptDevicePairingDelegate_SetExpectingPairingComplete(self.pairingDelegate, value) | 
|  |  | 
|  | @property | 
|  | def nodeId(self) -> int: | 
|  | return self._nodeId | 
|  |  | 
|  | @property | 
|  | def fabricId(self) -> int: | 
|  | return self._fabricId | 
|  |  | 
|  | @property | 
|  | def rootPublicKeyBytes(self) -> bytes: | 
|  | return self._rootPublicKeyBytes | 
|  |  | 
|  | @property | 
|  | def name(self) -> str: | 
|  | return self._name | 
|  |  | 
|  | @name.setter | 
|  | def name(self, new_name: str): | 
|  | self._name = new_name | 
|  |  | 
|  | @property | 
|  | def isActive(self) -> bool: | 
|  | return self._isActive | 
|  |  | 
|  | def Shutdown(self): | 
|  | ''' | 
|  | Shuts down this controller and reclaims any used resources, including the bound | 
|  | C++ constructor instance in the SDK. | 
|  |  | 
|  | Raises: | 
|  | ChipStackError: On failure. | 
|  |  | 
|  | Returns: | 
|  | None | 
|  | ''' | 
|  | if not self._isActive: | 
|  | return | 
|  |  | 
|  | if self.devCtrl is not None: | 
|  | self._ChipStack.Call( | 
|  | lambda: self._dmLib.pychip_DeviceController_DeleteDeviceController( | 
|  | self.devCtrl, self.pairingDelegate) | 
|  | ).raise_on_error() | 
|  | self.pairingDelegate = None | 
|  | self.devCtrl = None | 
|  |  | 
|  | ChipDeviceController.activeList.remove(self) | 
|  | self._isActive = False | 
|  |  | 
|  | def ShutdownAll(self): | 
|  | ''' | 
|  | Shut down all active controllers and reclaim any used resources. | 
|  | ''' | 
|  | # | 
|  | # We want a shallow copy here since it would other create new instances | 
|  | # of the controllers in the list. | 
|  | # | 
|  | # We need a copy since we're going to walk through the list and shutdown | 
|  | # each controller, which in turn, will remove themselves from the active list. | 
|  | # | 
|  | # We cannot do that while iterating through the original list. | 
|  | # | 
|  | activeList = copy.copy(ChipDeviceController.activeList) | 
|  |  | 
|  | for controller in activeList: | 
|  | controller.Shutdown() | 
|  |  | 
|  | ChipDeviceController.activeList.clear() | 
|  |  | 
|  | def CheckIsActive(self): | 
|  | ''' | 
|  | Checks if the device controller is active. | 
|  |  | 
|  | Raises: | 
|  | RuntimeError: On failure. | 
|  | ''' | 
|  | if (not self._isActive): | 
|  | raise RuntimeError( | 
|  | "DeviceCtrl instance was already shutdown previously!") | 
|  |  | 
|  | def __del__(self): | 
|  | self.Shutdown() | 
|  |  | 
|  | def IsConnected(self): | 
|  | ''' | 
|  | Checks if the device controller is connected. | 
|  |  | 
|  | Returns: | 
|  | bool: True if is connected, False if not connected. | 
|  |  | 
|  | Raises: | 
|  | RuntimeError: If '_isActive' is False (from the call to CheckIsActive). | 
|  | ''' | 
|  | self.CheckIsActive() | 
|  |  | 
|  | return self._ChipStack.Call( | 
|  | lambda: self._dmLib.pychip_DeviceController_IsConnected( | 
|  | self.devCtrl) | 
|  | ) | 
|  |  | 
|  | async def ConnectBLE(self, discriminator: int, setupPinCode: int, nodeid: int, isShortDiscriminator: bool = False) -> int: | 
|  | ''' | 
|  | Connect to a BLE device via PASE using the given discriminator and setup pin code. | 
|  |  | 
|  | Args: | 
|  | discriminator (int): The long discriminator for the DNS-SD advertisement. Valid range: 0-4095. | 
|  | setupPinCode (int): The setup pin code of the device. | 
|  | nodeid (int): Node id of the device. | 
|  | isShortDiscriminator (Optional[bool]): Optional short discriminator. | 
|  |  | 
|  | Returns: | 
|  | int: Effective Node ID of the device (as defined by the assigned NOC). | 
|  | ''' | 
|  | self.CheckIsActive() | 
|  |  | 
|  | async with self._commissioning_context as ctx: | 
|  | self._enablePairingCompleteCallback(True) | 
|  | await self._ChipStack.CallAsync( | 
|  | lambda: self._dmLib.pychip_DeviceController_ConnectBLE( | 
|  | self.devCtrl, discriminator, isShortDiscriminator, setupPinCode, nodeid) | 
|  | ) | 
|  |  | 
|  | return await asyncio.futures.wrap_future(ctx.future) | 
|  |  | 
|  | async def ConnectNFC(self, discriminator: int, setupPinCode: int, nodeid: int) -> int: | 
|  | ''' | 
|  | Connect to a NFC device via PASE using the given discriminator and setup pin code. | 
|  |  | 
|  | Args: | 
|  | discriminator (int): The long discriminator for the DNS-SD advertisement. Valid range: 0-4095. | 
|  | setupPinCode (int): The setup pin code of the device. | 
|  | nodeid (int): Node id of the device. | 
|  |  | 
|  | Returns: | 
|  | int: Effective Node ID of the device (as defined by the assigned NOC). | 
|  | ''' | 
|  | self.CheckIsActive() | 
|  |  | 
|  | async with self._commissioning_context as ctx: | 
|  | self._enablePairingCompleteCallback(True) | 
|  | await self._ChipStack.CallAsync( | 
|  | lambda: self._dmLib.pychip_DeviceController_ConnectNFC( | 
|  | self.devCtrl, discriminator, setupPinCode, nodeid) | 
|  | ) | 
|  |  | 
|  | return await asyncio.futures.wrap_future(ctx.future) | 
|  |  | 
|  | async def UnpairDevice(self, nodeid: int) -> None: | 
|  | ''' | 
|  | Unpairs the device with the specified node ID. | 
|  |  | 
|  | Args: | 
|  | nodeid (int): Node id of the device. | 
|  |  | 
|  | Returns: | 
|  | None. | 
|  | ''' | 
|  | self.CheckIsActive() | 
|  |  | 
|  | async with self._unpair_device_context as ctx: | 
|  | await self._ChipStack.CallAsync( | 
|  | lambda: self._dmLib.pychip_DeviceController_UnpairDevice( | 
|  | self.devCtrl, nodeid, self.cbHandleDeviceUnpairCompleteFunct) | 
|  | ) | 
|  |  | 
|  | return await asyncio.futures.wrap_future(ctx.future) | 
|  |  | 
|  | def CloseBLEConnection(self): | 
|  | ''' | 
|  | Closes the BLE connection for the device controller. | 
|  |  | 
|  | Raises: | 
|  | ChipStackError: On failure. | 
|  | ''' | 
|  | self.CheckIsActive() | 
|  |  | 
|  | self._ChipStack.Call( | 
|  | lambda: self._dmLib.pychip_DeviceCommissioner_CloseBleConnection( | 
|  | self.devCtrl) | 
|  | ).raise_on_error() | 
|  |  | 
|  | def ExpireSessions(self, nodeid): | 
|  | ''' | 
|  | Close all sessions with `nodeid` (if any existed) so that sessions get re-established. | 
|  |  | 
|  | This is needed to properly handle operations that invalidate a node's state, such as | 
|  | UpdateNOC. | 
|  |  | 
|  | WARNING: ONLY CALL THIS IF YOU UNDERSTAND THE SIDE-EFFECTS | 
|  |  | 
|  | Args: | 
|  | nodeid (int): Node id of the device. | 
|  |  | 
|  | Raises: | 
|  | ChipStackError: On failure. | 
|  | ''' | 
|  | self.CheckIsActive() | 
|  |  | 
|  | self._ChipStack.Call(lambda: self._dmLib.pychip_ExpireSessions(self.devCtrl, nodeid)).raise_on_error() | 
|  |  | 
|  | def MarkSessionDefunct(self, nodeid): | 
|  | ''' | 
|  | Marks a previously active session with the specified node as defunct to temporarily prevent it | 
|  | from being used with new exchanges to send or receive messages. This should be called when there | 
|  | is suspicion of a loss-of-sync with the session state on the associated peer, such as evidence | 
|  | of transport failure. | 
|  |  | 
|  | If messages are received thereafter on this session, the session will be put back into the Active state. | 
|  |  | 
|  | This function should only be called on an active session. | 
|  | This will NOT detach any existing SessionHolders. | 
|  |  | 
|  | Args: | 
|  | nodeid (int): The node ID of the device whose session should be marked as defunct. | 
|  |  | 
|  | Raises: | 
|  | RuntimeError: If the controller is not active. | 
|  | PyChipError: If the operation fails. | 
|  | ''' | 
|  | self.CheckIsActive() | 
|  |  | 
|  | self._ChipStack.Call( | 
|  | lambda: self._dmLib.pychip_DeviceController_MarkSessionDefunct( | 
|  | self.devCtrl, nodeid) | 
|  | ).raise_on_error() | 
|  |  | 
|  | def MarkSessionForEviction(self, nodeid): | 
|  | ''' | 
|  | Marks the session with the specified node for eviction. It will first detach all SessionHolders | 
|  | attached to this session by calling 'OnSessionReleased' on each of them. This will force them | 
|  | to release their reference to the session. If there are no more references left, the session | 
|  | will then be de-allocated. | 
|  |  | 
|  | Once marked for eviction, the session SHALL NOT ever become active again. | 
|  |  | 
|  | Args: | 
|  | nodeid (int): The node ID of the device whose session should be marked for eviction. | 
|  |  | 
|  | Raises: | 
|  | RuntimeError: If the controller is not active. | 
|  | PyChipError: If the operation fails. | 
|  | ''' | 
|  | self.CheckIsActive() | 
|  |  | 
|  | self._ChipStack.Call( | 
|  | lambda: self._dmLib.pychip_DeviceController_MarkSessionForEviction( | 
|  | self.devCtrl, nodeid) | 
|  | ).raise_on_error() | 
|  |  | 
|  | def DeleteAllSessionResumptionStorage(self): | 
|  | ''' | 
|  | Remove all session resumption information associated with the fabric index of the controller. | 
|  |  | 
|  | Raises: | 
|  | RuntimeError: If the controller is not active. | 
|  | PyChipError: If the operation fails. | 
|  | ''' | 
|  |  | 
|  | self.CheckIsActive() | 
|  | self._ChipStack.Call( | 
|  | lambda: self._dmLib.pychip_DeviceController_DeleteAllSessionResumption( | 
|  | self.devCtrl)).raise_on_error() | 
|  |  | 
|  | async def _establishPASESession(self, callFunct): | 
|  | self.CheckIsActive() | 
|  |  | 
|  | async with self._pase_establishment_context as ctx: | 
|  | self._enablePairingCompleteCallback(True) | 
|  | await self._ChipStack.CallAsync(callFunct) | 
|  | await asyncio.futures.wrap_future(ctx.future) | 
|  |  | 
|  | async def EstablishPASESessionBLE(self, setupPinCode: int, discriminator: int, nodeid: int) -> None: | 
|  | ''' | 
|  | Establish a PASE session over BLE. | 
|  |  | 
|  | Warning: This method attempts to establish a new PASE session, even if an open session already exists. | 
|  | For safer session management that reuses existing sessions, see `FindOrEstablishPASESession`. | 
|  |  | 
|  | Args: | 
|  | discriminator (int): The long discriminator for the DNS-SD advertisement. Valid range: 0-4095. | 
|  | setupPinCode (int): The setup pin code of the device. | 
|  | nodeid (int): Node id of the device. | 
|  |  | 
|  | Returns: | 
|  | None | 
|  | ''' | 
|  | await self._establishPASESession( | 
|  | lambda: self._dmLib.pychip_DeviceController_EstablishPASESessionBLE( | 
|  | self.devCtrl, setupPinCode, discriminator, nodeid) | 
|  | ) | 
|  |  | 
|  | async def EstablishPASESessionIP(self, ipaddr: str, setupPinCode: int, nodeid: int, port: int = 0) -> None: | 
|  | ''' | 
|  | Establish a PASE session over IP. | 
|  |  | 
|  | Warning: This method attempts to establish a new PASE session, even if an open session already exists. | 
|  | For safer session management that reuses existing sessions, see `FindOrEstablishPASESession`. | 
|  |  | 
|  | Args: | 
|  | ipaddr (str): IP address. | 
|  | port (int): IP port to use (default is 0). | 
|  | setupPinCode (int): The setup pin code of the device. | 
|  | nodeid (int): Node id of the device. | 
|  |  | 
|  | Returns: | 
|  | None | 
|  | ''' | 
|  | await self._establishPASESession( | 
|  | lambda: self._dmLib.pychip_DeviceController_EstablishPASESessionIP( | 
|  | self.devCtrl, ipaddr.encode("utf-8"), setupPinCode, nodeid, port) | 
|  | ) | 
|  |  | 
|  | async def EstablishPASESession(self, setUpCode: str, nodeid: int) -> None: | 
|  | ''' | 
|  | Establish a PASE session using setUpCode. | 
|  |  | 
|  | Warning: This method attempts to establish a new PASE session, even if an open session already exists. | 
|  | For safer session management that reuses existing sessions, see `FindOrEstablishPASESession`. | 
|  |  | 
|  | Args: | 
|  | setUpCode (str): The setup code of the device. | 
|  | nodeid (int):  The node ID assigned to the device for the PASE session. | 
|  |  | 
|  | Returns: | 
|  | None | 
|  | ''' | 
|  | await self._establishPASESession( | 
|  | lambda: self._dmLib.pychip_DeviceController_EstablishPASESession( | 
|  | self.devCtrl, setUpCode.encode("utf-8"), nodeid) | 
|  | ) | 
|  |  | 
|  | def GetTestCommissionerUsed(self): | 
|  | ''' | 
|  | Get the status of test commissioner in use. | 
|  |  | 
|  | Returns: | 
|  | bool: True if the test commissioner is in use, False if not. | 
|  | ''' | 
|  | return self._ChipStack.Call( | 
|  | lambda: self._dmLib.pychip_TestCommissionerUsed() | 
|  | ) | 
|  |  | 
|  | def ResetTestCommissioner(self): | 
|  | self._dmLib.pychip_ResetCommissioningTests() | 
|  |  | 
|  | def SetTestCommissionerSimulateFailureOnStage(self, stage: int): | 
|  | ''' | 
|  | Simulates a failure on a specific stage of the test commissioner. | 
|  |  | 
|  | Args: | 
|  | stage (int): The commissioning stage where failure will be simulated. | 
|  | This corresponds to the enum `CommissioningStage` (e.g. kError, kSecurePairing, etc.). For full details | 
|  | ref https://github.com/project-chip/connectedhomeip/blob/master/src/controller/CommissioningDelegate.h | 
|  |  | 
|  | Returns: | 
|  | bool: True if the failure simulate success, False if not. | 
|  | ''' | 
|  | return self._dmLib.pychip_SetTestCommissionerSimulateFailureOnStage( | 
|  | stage) | 
|  |  | 
|  | def SetTestCommissionerSimulateFailureOnReport(self, stage: int): | 
|  | ''' | 
|  | Simulates a failure on report of the test commissioner. | 
|  |  | 
|  | Args: | 
|  | stage (int): The commissioning stage where failure will be simulated. | 
|  | This corresponds to the enum `CommissioningStage` (e.g. kError, kSecurePairing, etc.). For full details | 
|  | ref https://github.com/project-chip/connectedhomeip/blob/master/src/controller/CommissioningDelegate.h | 
|  |  | 
|  | Returns: | 
|  | bool: True if the failure simulate success, False if not. | 
|  | ''' | 
|  | return self._dmLib.pychip_SetTestCommissionerSimulateFailureOnReport( | 
|  | stage) | 
|  |  | 
|  | def SetTestCommissionerPrematureCompleteAfter(self, stage: int): | 
|  | ''' | 
|  | Premature complete of the test commissioner. | 
|  |  | 
|  | Args: | 
|  | stage (int): The commissioning stage after a premature completion is simulated. | 
|  | This corresponds to the enum `CommissioningStage` (e.g. kError, kSecurePairing, etc.). For full details | 
|  | ref https://github.com/project-chip/connectedhomeip/blob/master/src/controller/CommissioningDelegate.h | 
|  |  | 
|  | Returns: | 
|  | bool: True if the premature complete success, False if not. | 
|  | ''' | 
|  | return self._dmLib.pychip_SetTestCommissionerPrematureCompleteAfter( | 
|  | stage) | 
|  |  | 
|  | def CheckTestCommissionerCallbacks(self): | 
|  | ''' | 
|  | Check the test commissioner callbacks. | 
|  |  | 
|  | Returns: | 
|  | bool: True if the test commissioner callbacks success, False if not. | 
|  | ''' | 
|  | return self._ChipStack.Call( | 
|  | lambda: self._dmLib.pychip_TestCommissioningCallbacks() | 
|  | ) | 
|  |  | 
|  | def CheckStageSuccessful(self, stage: int): | 
|  | ''' | 
|  | Check the test commissioner stage sucess. | 
|  |  | 
|  | Args: | 
|  | stage (int): The commissioning to simulate. | 
|  |  | 
|  | Returns: | 
|  | bool: True if test commissioner stage success, False if not. | 
|  | ''' | 
|  | return self._ChipStack.Call( | 
|  | lambda: self._dmLib.pychip_TestCommissioningStageSuccessful(stage) | 
|  | ) | 
|  |  | 
|  | def CheckTestCommissionerPaseConnection(self, nodeid): | 
|  | ''' | 
|  | Check the test commissioner Pase connection sucess. | 
|  |  | 
|  | Args: | 
|  | nodeid (int): Node ID of the device. | 
|  |  | 
|  | Returns: | 
|  | bool: True if test commissioner Pase connection success, False if not. | 
|  | ''' | 
|  | return self._dmLib.pychip_TestPaseConnection(nodeid) | 
|  |  | 
|  | def ResolveNode(self, nodeid): | 
|  | ''' | 
|  | Resolve node ID. | 
|  |  | 
|  | Args: | 
|  | nodeid (int): Node ID of the device. | 
|  | ''' | 
|  | self.CheckIsActive() | 
|  |  | 
|  | self.GetConnectedDeviceSync(nodeid, allowPASE=False) | 
|  |  | 
|  | def GetAddressAndPort(self, nodeid): | 
|  | ''' | 
|  | Get the address and port. | 
|  |  | 
|  | Args: | 
|  | nodeid (int): Node ID of the device. | 
|  |  | 
|  | Returns: | 
|  | tuple: The address and port if no error occurs or None on failure. | 
|  | ''' | 
|  | self.CheckIsActive() | 
|  |  | 
|  | address = create_string_buffer(64) | 
|  | port = c_uint16(0) | 
|  |  | 
|  | error = self._ChipStack.Call( | 
|  | lambda: self._dmLib.pychip_DeviceController_GetAddressAndPort( | 
|  | self.devCtrl, nodeid, address, 64, pointer(port)) | 
|  | ) | 
|  |  | 
|  | # Intentionally return None instead of raising exceptions on error | 
|  | return (address.value.decode(), port.value) if error == 0 else None | 
|  |  | 
|  | async def DiscoverCommissionableNodes(self, filterType: discovery.FilterType = discovery.FilterType.NONE, filter: typing.Any = None, | 
|  | stopOnFirst: bool = False, timeoutSecond: int = 5) -> typing.Union[None, CommissionableNode, typing.List[CommissionableNode]]: | 
|  | ''' | 
|  | Discover commissionable nodes via DNS-SD with specified filters. | 
|  | Supported filters are: | 
|  |  | 
|  | discovery.FilterType.NONE | 
|  | discovery.FilterType.SHORT_DISCRIMINATOR | 
|  | discovery.FilterType.LONG_DISCRIMINATOR | 
|  | discovery.FilterType.VENDOR_ID | 
|  | discovery.FilterType.DEVICE_TYPE | 
|  | discovery.FilterType.COMMISSIONING_MODE | 
|  | discovery.FilterType.INSTANCE_NAME | 
|  | discovery.FilterType.COMMISSIONER | 
|  | discovery.FilterType.COMPRESSED_FABRIC_ID | 
|  |  | 
|  | This function will always return a list of CommissionableDevice. When stopOnFirst is set, | 
|  | this function will return when at least one device is discovered or on timeout. | 
|  |  | 
|  | Returns: | 
|  | list: A list of discovered devices. | 
|  | ''' | 
|  | self.CheckIsActive() | 
|  |  | 
|  | if isinstance(filter, int): | 
|  | filter = str(filter) | 
|  |  | 
|  | # Discovery is also used during commissioning. Make sure this manual discovery | 
|  | # and commissioning attempts do not interfere with each other. | 
|  | async with self._commissioning_lock: | 
|  | await self._ChipStack.CallAsync( | 
|  | lambda: self._dmLib.pychip_DeviceController_DiscoverCommissionableNodes( | 
|  | self.devCtrl, int(filterType), str(filter).encode("utf-8"))) | 
|  |  | 
|  | async def _wait_discovery(): | 
|  | while not await self._ChipStack.CallAsyncWithResult( | 
|  | lambda: self._dmLib.pychip_DeviceController_HasDiscoveredCommissionableNode(self.devCtrl)): | 
|  | await asyncio.sleep(0.1) | 
|  | return | 
|  |  | 
|  | try: | 
|  | if stopOnFirst: | 
|  | await asyncio.wait_for(_wait_discovery(), timeoutSecond) | 
|  | else: | 
|  | await asyncio.sleep(timeoutSecond) | 
|  | except TimeoutError: | 
|  | # Expected timeout, do nothing | 
|  | pass | 
|  | finally: | 
|  | await self._ChipStack.CallAsync( | 
|  | lambda: self._dmLib.pychip_DeviceController_StopCommissionableDiscovery(self.devCtrl)) | 
|  |  | 
|  | return await self.GetDiscoveredDevices() | 
|  |  | 
|  | async def GetDiscoveredDevices(self): | 
|  | ''' | 
|  | Get the discovered devices. | 
|  |  | 
|  | Returns: | 
|  | list: A list of discovered devices. | 
|  | ''' | 
|  | def GetDevices(devCtrl): | 
|  | devices = [] | 
|  |  | 
|  | @_ChipDeviceController_IterateDiscoveredCommissionableNodesFunct | 
|  | def HandleDevice(deviceJson, deviceJsonLen): | 
|  | jsonStr = ctypes.string_at(deviceJson, deviceJsonLen).decode("utf-8") | 
|  | device = dacite.from_dict(data_class=CommissionableNode, data=json.loads(jsonStr)) | 
|  | device.SetDeviceController(devCtrl) | 
|  | devices.append(device) | 
|  |  | 
|  | self._dmLib.pychip_DeviceController_IterateDiscoveredCommissionableNodes(devCtrl.devCtrl, HandleDevice) | 
|  | return devices | 
|  |  | 
|  | return await self._ChipStack.CallAsyncWithResult(lambda: GetDevices(self)) | 
|  |  | 
|  | def GetIPForDiscoveredDevice(self, idx, addrStr, length): | 
|  | ''' | 
|  | Get the IP address for a discovered device. | 
|  |  | 
|  | Args: | 
|  | idx (int): Index of the discovered device. | 
|  | addrStr (str): Address of the device. | 
|  | length (int): Length of the address. | 
|  |  | 
|  | Returns: | 
|  | bool: True if IP for discovered device success, False if not. | 
|  | ''' | 
|  | self.CheckIsActive() | 
|  |  | 
|  | return self._ChipStack.Call( | 
|  | lambda: self._dmLib.pychip_DeviceController_GetIPForDiscoveredDevice( | 
|  | self.devCtrl, idx, addrStr, length) | 
|  | ) | 
|  |  | 
|  | class CommissioningWindowPasscode(enum.IntEnum): | 
|  | kOriginalSetupCode = 0 | 
|  | kTokenWithRandomPin = 1 | 
|  |  | 
|  | async def OpenCommissioningWindow(self, nodeid: int, timeout: int, iteration: int, | 
|  | discriminator: int, option: CommissioningWindowPasscode) -> CommissioningParameters: | 
|  | ''' | 
|  | Opens a commissioning window on the device with the given nodeid. | 
|  |  | 
|  | Args: | 
|  | nodeid (int): Node id of the device. | 
|  | timeout (int): Command timeout | 
|  | iteration (int): The PAKE iteration count associated with the PAKE Passcode ID and ephemeral | 
|  | PAKE passcode verifier to be used for this commissioning. Valid range: 1000 - 100000 | 
|  | Ignored if option == 0 | 
|  | discriminator (int): The long discriminator for the DNS-SD advertisement. Valid range: 0-4095 | 
|  | Ignored if option == 0 | 
|  | option (int): | 
|  | 0 = kOriginalSetupCode | 
|  | 1 = kTokenWithRandomPIN | 
|  |  | 
|  | Returns: | 
|  | CommissioningParameters | 
|  | ''' | 
|  | self.CheckIsActive() | 
|  |  | 
|  | async with self._open_window_context as ctx: | 
|  | await self._ChipStack.CallAsync( | 
|  | lambda: self._dmLib.pychip_DeviceController_OpenCommissioningWindow( | 
|  | self.devCtrl, self.pairingDelegate, nodeid, timeout, iteration, discriminator, option) | 
|  | ) | 
|  |  | 
|  | return await asyncio.futures.wrap_future(ctx.future) | 
|  |  | 
|  | async def OpenJointCommissioningWindow(self, nodeid: int, endpointId: int, timeout: int, iteration: int, | 
|  | discriminator: int) -> CommissioningParameters: | 
|  | ''' | 
|  | Opens a joint commissioning window on the device with the given nodeid. | 
|  |  | 
|  | Args: | 
|  | nodeid (int): Node id of the device. | 
|  | timeout (int): Command timeout | 
|  | iteration (int): The PAKE iteration count associated with the PAKE Passcode ID and ephemeral | 
|  | PAKE passcode verifier to be used for this commissioning. Valid range: 1000 - 100000 | 
|  | discriminator (int): The long discriminator for the DNS-SD advertisement. Valid range: 0-4095 | 
|  |  | 
|  | Returns: | 
|  | CommissioningParameters | 
|  | ''' | 
|  | self.CheckIsActive() | 
|  |  | 
|  | async with self._open_window_context as ctx: | 
|  | await self._ChipStack.CallAsync( | 
|  | lambda: self._dmLib.pychip_DeviceController_OpenJointCommissioningWindow( | 
|  | self.devCtrl, self.pairingDelegate, nodeid, endpointId, timeout, iteration, discriminator) | 
|  | ) | 
|  |  | 
|  | return await asyncio.futures.wrap_future(ctx.future) | 
|  |  | 
|  | def GetCompressedFabricId(self): | 
|  | ''' | 
|  | Get compressed fabric Id. | 
|  |  | 
|  | Returns: | 
|  | int: The compressed fabric ID as a 64-bit integer. | 
|  |  | 
|  | Raises: | 
|  | ChipStackError: On failure. | 
|  | ''' | 
|  | self.CheckIsActive() | 
|  |  | 
|  | fabricid = c_uint64(0) | 
|  |  | 
|  | self._ChipStack.Call( | 
|  | lambda: self._dmLib.pychip_DeviceController_GetCompressedFabricId( | 
|  | self.devCtrl, pointer(fabricid)) | 
|  | ).raise_on_error() | 
|  |  | 
|  | return fabricid.value | 
|  |  | 
|  | def GetFabricIdInternal(self) -> int: | 
|  | ''' | 
|  | Get the fabric ID from the object. Only used to validate cached value from property. | 
|  |  | 
|  | Returns: | 
|  | int: The raw fabric ID as a 64-bit integer. | 
|  |  | 
|  | Raises: | 
|  | ChipStackError: On failure. | 
|  | ''' | 
|  | self.CheckIsActive() | 
|  |  | 
|  | fabricid = c_uint64(0) | 
|  |  | 
|  | self._ChipStack.Call( | 
|  | lambda: self._dmLib.pychip_DeviceController_GetFabricId( | 
|  | self.devCtrl, pointer(fabricid)) | 
|  | ).raise_on_error() | 
|  |  | 
|  | return fabricid.value | 
|  |  | 
|  | def GetFabricIndexInternal(self) -> int: | 
|  | ''' | 
|  | Get the fabric index from the object. Only used to validate cached value from property. | 
|  |  | 
|  | Returns: | 
|  | int: fabric index in local fabric table associated with this controller. | 
|  |  | 
|  | Raises: | 
|  | ChipStackError: On failure. | 
|  | ''' | 
|  | self.CheckIsActive() | 
|  |  | 
|  | fabricindex = c_uint8(0) | 
|  |  | 
|  | self._ChipStack.Call( | 
|  | lambda: self._dmLib.pychip_DeviceController_GetFabricIndex( | 
|  | self.devCtrl, pointer(fabricindex)) | 
|  | ).raise_on_error() | 
|  |  | 
|  | return fabricindex.value | 
|  |  | 
|  | def GetNodeIdInternal(self) -> int: | 
|  | ''' | 
|  | Get the node ID from the object. Only used to validate cached value from property. | 
|  |  | 
|  | Returns: | 
|  | int: The Node ID as a 64 bit integer. | 
|  |  | 
|  | Raises: | 
|  | ChipStackError: On failure. | 
|  | ''' | 
|  | self.CheckIsActive() | 
|  |  | 
|  | nodeid = c_uint64(0) | 
|  |  | 
|  | self._ChipStack.Call( | 
|  | lambda: self._dmLib.pychip_DeviceController_GetNodeId( | 
|  | self.devCtrl, pointer(nodeid)) | 
|  | ).raise_on_error() | 
|  |  | 
|  | return nodeid.value | 
|  |  | 
|  | def GetRootPublicKeyBytesInternal(self) -> bytes: | 
|  | ''' | 
|  | Get the root public key associated with our fabric. | 
|  |  | 
|  | Returns: | 
|  | bytes: The root public key raw bytes in uncompressed point form. | 
|  |  | 
|  | Raises: | 
|  | ChipStackError: On failure. | 
|  | ''' | 
|  | self.CheckIsActive() | 
|  |  | 
|  | size = 128 | 
|  | buf = (ctypes.c_uint8 * size)() | 
|  | csize = ctypes.c_size_t(size) | 
|  | builtins.chipStack.Call(    # type: ignore[attr-defined]  # 'chipStack' is dynamically added; referred to in DeviceProxyWrapper class __del__ method | 
|  | lambda: self._dmLib.pychip_DeviceController_GetRootPublicKeyBytes(self.devCtrl, buf, ctypes.byref(csize)) | 
|  | ).raise_on_error() | 
|  |  | 
|  | return bytes(buf[:csize.value]) | 
|  |  | 
|  | def GetClusterHandler(self): | 
|  | ''' | 
|  | Get cluster handler | 
|  |  | 
|  | Returns: | 
|  | ChipClusters: An instance of the ChipClusters class. | 
|  | ''' | 
|  | self.CheckIsActive() | 
|  |  | 
|  | return self._Cluster | 
|  |  | 
|  | async def FindOrEstablishPASESession(self, setupCode: str, nodeid: int, timeoutMs: typing.Optional[int] = None) -> typing.Optional[DeviceProxyWrapper]: | 
|  | ''' | 
|  | Find or establish a PASE session. | 
|  |  | 
|  | Args: | 
|  | setUpCode (str): The setup code of the device. | 
|  | nodeid (int): Node id of the device. | 
|  | timeoutMs (Optional[int]): Optional timeout in milliseconds. | 
|  |  | 
|  | Returns: | 
|  | DeviceProxyWrapper on success, if not is None. | 
|  | ''' | 
|  | self.CheckIsActive() | 
|  | returnDevice = c_void_p(None) | 
|  | res = await self._ChipStack.CallAsyncWithResult(lambda: self._dmLib.pychip_GetDeviceBeingCommissioned( | 
|  | self.devCtrl, nodeid, byref(returnDevice)), timeoutMs) | 
|  | if res.is_success: | 
|  | return DeviceProxyWrapper(returnDevice, DeviceProxyWrapper.DeviceProxyType.COMMISSIONEE, self._dmLib) | 
|  |  | 
|  | await self.EstablishPASESession(setupCode, nodeid) | 
|  |  | 
|  | res = await self._ChipStack.CallAsyncWithResult(lambda: self._dmLib.pychip_GetDeviceBeingCommissioned( | 
|  | self.devCtrl, nodeid, byref(returnDevice)), timeoutMs) | 
|  | if res.is_success: | 
|  | return DeviceProxyWrapper(returnDevice, DeviceProxyWrapper.DeviceProxyType.COMMISSIONEE, self._dmLib) | 
|  |  | 
|  | return None | 
|  |  | 
|  | def GetConnectedDeviceSync(self, nodeid, allowPASE=True, timeoutMs: typing.Optional[int] = None, payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD): | 
|  | ''' | 
|  | Gets an OperationalDeviceProxy or CommissioneeDeviceProxy for the specified Node. | 
|  |  | 
|  | Args: | 
|  | nodeid (int): Target's Node ID | 
|  | allowPASE (bool): Get a device proxy of a device being commissioned. | 
|  | timeoutMs (Optional[int]): Timeout for a timed invoke request. Omit or set to 'None' to indicate a non-timed request. | 
|  |  | 
|  | Returns: | 
|  | DeviceProxyWrapper on success. | 
|  | ''' | 
|  | self.CheckIsActive() | 
|  |  | 
|  | returnDevice = c_void_p(None) | 
|  | returnErr: typing.Any = None | 
|  | deviceAvailableCV = threading.Condition() | 
|  |  | 
|  | if allowPASE: | 
|  | res = self._ChipStack.Call(lambda: self._dmLib.pychip_GetDeviceBeingCommissioned( | 
|  | self.devCtrl, nodeid, byref(returnDevice)), timeoutMs) | 
|  | if res.is_success: | 
|  | LOGGER.info('Using PASE connection') | 
|  | return DeviceProxyWrapper(returnDevice, DeviceProxyWrapper.DeviceProxyType.COMMISSIONEE, self._dmLib) | 
|  |  | 
|  | class DeviceAvailableClosure(): | 
|  | def deviceAvailable(self, device, err): | 
|  | nonlocal returnDevice | 
|  | nonlocal returnErr | 
|  | nonlocal deviceAvailableCV | 
|  | with deviceAvailableCV: | 
|  | returnDevice = c_void_p(device) | 
|  | returnErr = err | 
|  | deviceAvailableCV.notify_all() | 
|  | ctypes.pythonapi.Py_DecRef(ctypes.py_object(self)) | 
|  |  | 
|  | closure = DeviceAvailableClosure() | 
|  | ctypes.pythonapi.Py_IncRef(ctypes.py_object(closure)) | 
|  | self._ChipStack.Call(lambda: self._dmLib.pychip_GetConnectedDeviceByNodeId( | 
|  | self.devCtrl, nodeid, ctypes.py_object(closure), _DeviceAvailableCallback, payloadCapability), | 
|  | timeoutMs).raise_on_error() | 
|  |  | 
|  | # The callback might have been received synchronously (during self._ChipStack.Call()). | 
|  | # Check if the device is already set before waiting for the callback. | 
|  | if returnDevice.value is None: | 
|  | with deviceAvailableCV: | 
|  | timeout = None | 
|  | if timeoutMs is not None: | 
|  | timeout = float(timeoutMs) / 1000 | 
|  |  | 
|  | ret = deviceAvailableCV.wait(timeout) | 
|  | if ret is False: | 
|  | raise TimeoutError("Timed out waiting for DNS-SD resolution") | 
|  |  | 
|  | if returnDevice.value is None: | 
|  | returnErr.raise_on_error() | 
|  |  | 
|  | return DeviceProxyWrapper(returnDevice, DeviceProxyWrapper.DeviceProxyType.OPERATIONAL, self._dmLib) | 
|  |  | 
|  | async def WaitForActive(self, nodeid, *, timeoutSeconds=30.0, stayActiveDurationMs=30000): | 
|  | ''' | 
|  | Waits a LIT ICD device to become active. Will send a StayActive command to the device on active to allow human operations. | 
|  |  | 
|  | Args: | 
|  | nodeId: Node ID of the LID ICD. | 
|  | stayActiveDurationMs: The duration in the StayActive command, in milliseconds. | 
|  |  | 
|  | Returns: | 
|  | StayActiveResponse on success | 
|  | ''' | 
|  | await WaitForCheckIn(ScopedNodeId(nodeid, self._fabricIndex), timeoutSeconds=timeoutSeconds) | 
|  | return await self.SendCommand(nodeid, 0, Clusters.IcdManagement.Commands.StayActiveRequest(stayActiveDuration=stayActiveDurationMs)) | 
|  |  | 
|  | async def GetConnectedDevice(self, nodeid, allowPASE: bool = True, timeoutMs: typing.Optional[int] = None, | 
|  | payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD): | 
|  | ''' | 
|  | Gets an OperationalDeviceProxy or CommissioneeDeviceProxy for the specified Node. | 
|  |  | 
|  | Args: | 
|  | nodeId (int): Target's Node ID. | 
|  | allowPASE (bool): Get a device proxy of a device being commissioned. | 
|  | timeoutMs (Optional[int]): Timeout for a timed invoke request. Omit or set to 'None' to indicate a non-timed request. | 
|  |  | 
|  | Returns: | 
|  | DeviceProxyWrapper on success. | 
|  | ''' | 
|  | self.CheckIsActive() | 
|  |  | 
|  | if allowPASE: | 
|  | returnDevice = c_void_p(None) | 
|  | res = await self._ChipStack.CallAsyncWithResult(lambda: self._dmLib.pychip_GetDeviceBeingCommissioned( | 
|  | self.devCtrl, nodeid, byref(returnDevice)), timeoutMs) | 
|  | if res.is_success: | 
|  | LOGGER.info('Using PASE connection') | 
|  | return DeviceProxyWrapper(returnDevice, DeviceProxyWrapper.DeviceProxyType.COMMISSIONEE, self._dmLib) | 
|  |  | 
|  | eventLoop = asyncio.get_running_loop() | 
|  | future = eventLoop.create_future() | 
|  |  | 
|  | class DeviceAvailableClosure(): | 
|  | def __init__(self, loop, future: asyncio.Future): | 
|  | self._returnDevice = c_void_p(None) | 
|  | self._returnErr = None | 
|  | self._event_loop = loop | 
|  | self._future = future | 
|  |  | 
|  | def _deviceAvailable(self): | 
|  | if self._future.cancelled(): | 
|  | return | 
|  | if self._returnDevice.value is not None: | 
|  | self._future.set_result(self._returnDevice) | 
|  | else: | 
|  | self._future.set_exception(self._returnErr.to_exception()) | 
|  |  | 
|  | def deviceAvailable(self, device, err): | 
|  | self._returnDevice = c_void_p(device) | 
|  | self._returnErr = err | 
|  | self._event_loop.call_soon_threadsafe(self._deviceAvailable) | 
|  | ctypes.pythonapi.Py_DecRef(ctypes.py_object(self)) | 
|  |  | 
|  | closure = DeviceAvailableClosure(eventLoop, future) | 
|  | ctypes.pythonapi.Py_IncRef(ctypes.py_object(closure)) | 
|  | await self._ChipStack.CallAsync(lambda: self._dmLib.pychip_GetConnectedDeviceByNodeId( | 
|  | self.devCtrl, nodeid, ctypes.py_object(closure), _DeviceAvailableCallback, payloadCapability), | 
|  | timeoutMs) | 
|  |  | 
|  | # The callback might have been received synchronously (during self._ChipStack.CallAsync()). | 
|  | # In that case the Future has already been set it will return immediately | 
|  | if timeoutMs is not None: | 
|  | timeout = float(timeoutMs) / 1000 | 
|  | await asyncio.wait_for(future, timeout=timeout) | 
|  | else: | 
|  | await future | 
|  |  | 
|  | return DeviceProxyWrapper(future.result(), DeviceProxyWrapper.DeviceProxyType.OPERATIONAL, self._dmLib) | 
|  |  | 
|  | def ComputeRoundTripTimeout(self, nodeid, upperLayerProcessingTimeoutMs: int = 0): | 
|  | ''' | 
|  | Returns a computed timeout value based on the round-trip time it takes for the peer at the other end of the session to | 
|  | receive a message, process it and send it back. This is computed based on the session type, the type of transport, | 
|  | sleepy characteristics of the target and a caller-provided value for the time it takes to process a message | 
|  | at the upper layer on the target For group sessions. | 
|  |  | 
|  | This will result in a session being established if one wasn't already. | 
|  |  | 
|  | Returns: | 
|  | int: The computed timeout value in milliseconds, representing the round-trip time. | 
|  | ''' | 
|  | device = self.GetConnectedDeviceSync(nodeid) | 
|  | res = self._ChipStack.Call(lambda: self._dmLib.pychip_DeviceProxy_ComputeRoundTripTimeout( | 
|  | device.deviceProxy, upperLayerProcessingTimeoutMs)) | 
|  | return res | 
|  |  | 
|  | def GetRemoteSessionParameters(self, nodeid) -> typing.Optional[SessionParameters]: | 
|  | ''' | 
|  | Returns the SessionParameters of reported by the remote node associated with `nodeid`. | 
|  | If there is some error in getting SessionParameters None is returned. | 
|  |  | 
|  | This will result in a session being established if one wasn't already established. | 
|  |  | 
|  | Returns: | 
|  | Optional[SessionParameters]: The session parameters. | 
|  | ''' | 
|  |  | 
|  | # First creating the struct to make building the ByteArray to be sent to CFFI easier. | 
|  | sessionParametersStruct = SessionParametersStruct.parse(b'\x00' * SessionParametersStruct.sizeof()) | 
|  | sessionParametersByteArray = SessionParametersStruct.build(sessionParametersStruct) | 
|  | device = self.GetConnectedDeviceSync(nodeid) | 
|  | self._ChipStack.Call(lambda: self._dmLib.pychip_DeviceProxy_GetRemoteSessionParameters( | 
|  | device.deviceProxy, ctypes.c_char_p(sessionParametersByteArray))).raise_on_error() | 
|  |  | 
|  | sessionParametersStruct = SessionParametersStruct.parse(sessionParametersByteArray) | 
|  | return SessionParameters( | 
|  | sessionIdleInterval=sessionParametersStruct.SessionIdleInterval if sessionParametersStruct.SessionIdleInterval != 0 else None, | 
|  | sessionActiveInterval=sessionParametersStruct.SessionActiveInterval if sessionParametersStruct.SessionActiveInterval != 0 else None, | 
|  | sessionActiveThreshold=sessionParametersStruct.SessionActiveThreshold if sessionParametersStruct.SessionActiveThreshold != 0 else None, | 
|  | dataModelRevision=sessionParametersStruct.DataModelRevision if sessionParametersStruct.DataModelRevision != 0 else None, | 
|  | interactionModelRevision=sessionParametersStruct.InteractionModelRevision if sessionParametersStruct.InteractionModelRevision != 0 else None, | 
|  | specficiationVersion=sessionParametersStruct.SpecificationVersion if sessionParametersStruct.SpecificationVersion != 0 else None, | 
|  | maxPathsPerInvoke=sessionParametersStruct.MaxPathsPerInvoke) | 
|  |  | 
|  | async def TestOnlySendBatchCommands(self, nodeid: int, commands: typing.List[ClusterCommand.InvokeRequestInfo], | 
|  | timedRequestTimeoutMs: typing.Optional[int] = None, | 
|  | interactionTimeoutMs: typing.Optional[int] = None, busyWaitMs: typing.Optional[int] = None, | 
|  | suppressResponse: typing.Optional[bool] = None, remoteMaxPathsPerInvoke: typing.Optional[int] = None, | 
|  | suppressTimedRequestMessage: bool = False, commandRefsOverride: typing.Optional[typing.List[int]] = None): | 
|  | ''' | 
|  | Please see SendBatchCommands for description. | 
|  | TestOnly overridable arguments: | 
|  | remoteMaxPathsPerInvoke: Overrides the number of batch commands we think can be sent to remote node. | 
|  | suppressTimedRequestMessage: When set to true, we suppress sending Timed Request Message. | 
|  | commandRefsOverride: List of commandRefs to use for each command with the same index in `commands`. | 
|  |  | 
|  | Returns: | 
|  | TestOnlyBatchCommandResponse | 
|  | ''' | 
|  | self.CheckIsActive() | 
|  |  | 
|  | eventLoop = asyncio.get_running_loop() | 
|  | future = eventLoop.create_future() | 
|  |  | 
|  | device = await self.GetConnectedDevice(nodeid, timeoutMs=interactionTimeoutMs) | 
|  |  | 
|  | ClusterCommand.TestOnlySendBatchCommands( | 
|  | future, eventLoop, device.deviceProxy, commands, | 
|  | timedRequestTimeoutMs=timedRequestTimeoutMs, | 
|  | interactionTimeoutMs=interactionTimeoutMs, busyWaitMs=busyWaitMs, suppressResponse=suppressResponse, | 
|  | remoteMaxPathsPerInvoke=remoteMaxPathsPerInvoke, suppressTimedRequestMessage=suppressTimedRequestMessage, | 
|  | commandRefsOverride=commandRefsOverride).raise_on_error() | 
|  | return await future | 
|  |  | 
|  | async def TestOnlySendCommandTimedRequestFlagWithNoTimedInvoke(self, nodeid: int, endpoint: int, | 
|  | payload: ClusterObjects.ClusterCommand, responseType=None): | 
|  | ''' | 
|  | Please see SendCommand for description. | 
|  |  | 
|  | Returns: | 
|  | Command response. The type of the response is defined by the command. | 
|  |  | 
|  | Raises: | 
|  | InteractionModelError on error | 
|  | ''' | 
|  | self.CheckIsActive() | 
|  |  | 
|  | eventLoop = asyncio.get_running_loop() | 
|  | future = eventLoop.create_future() | 
|  |  | 
|  | device = await self.GetConnectedDevice(nodeid, timeoutMs=None) | 
|  | ClusterCommand.TestOnlySendCommandTimedRequestFlagWithNoTimedInvoke( | 
|  | future, eventLoop, responseType, device.deviceProxy, ClusterCommand.CommandPath( | 
|  | EndpointId=endpoint, | 
|  | ClusterId=payload.cluster_id, | 
|  | CommandId=payload.command_id, | 
|  | ), payload).raise_on_error() | 
|  | return await future | 
|  |  | 
|  | async def SendCommand(self, nodeid: int, endpoint: int, payload: ClusterObjects.ClusterCommand, responseType=None, | 
|  | timedRequestTimeoutMs: typing.Optional[int] = None, | 
|  | interactionTimeoutMs: typing.Optional[int] = None, busyWaitMs: typing.Optional[int] = None, | 
|  | suppressResponse: typing.Optional[bool] = None, | 
|  | payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD): | 
|  | ''' | 
|  | Send a cluster-object encapsulated command to a node and get returned a future that can be awaited upon to receive | 
|  | the response. If a valid responseType is passed in, that will be used to de-serialize the object. If not, | 
|  | the type will be automatically deduced from the metadata received over the wire. | 
|  |  | 
|  | timedWriteTimeoutMs: Timeout for a timed invoke request. Omit or set to 'None' to indicate a non-timed request. | 
|  | interactionTimeoutMs: Overall timeout for the interaction. Omit or set to 'None' to have the SDK automatically compute the | 
|  | right timeout value based on transport characteristics as well as the responsiveness of the target. | 
|  |  | 
|  | Returns: | 
|  | command response. The type of the response is defined by the command. | 
|  |  | 
|  | Raises: | 
|  | InteractionModelError on error | 
|  | ''' | 
|  | LOGGER.debug("Sending command %s to node ID 0x%016X", payload, nodeid) | 
|  | self.CheckIsActive() | 
|  |  | 
|  | eventLoop = asyncio.get_running_loop() | 
|  | future = eventLoop.create_future() | 
|  |  | 
|  | device = await self.GetConnectedDevice(nodeid, timeoutMs=interactionTimeoutMs, payloadCapability=payloadCapability) | 
|  | allow_large_payload = payloadCapability == TransportPayloadCapability.LARGE_PAYLOAD or payloadCapability == TransportPayloadCapability.MRP_OR_TCP_PAYLOAD | 
|  | res = await ClusterCommand.SendCommand( | 
|  | future, eventLoop, responseType, device.deviceProxy, ClusterCommand.CommandPath( | 
|  | EndpointId=endpoint, | 
|  | ClusterId=payload.cluster_id, | 
|  | CommandId=payload.command_id, | 
|  | ), payload, timedRequestTimeoutMs=timedRequestTimeoutMs, | 
|  | interactionTimeoutMs=interactionTimeoutMs, busyWaitMs=busyWaitMs, suppressResponse=suppressResponse, allowLargePayload=allow_large_payload) | 
|  | res.raise_on_error() | 
|  | return await future | 
|  |  | 
|  | async def SendBatchCommands(self, nodeid: int, commands: typing.List[ClusterCommand.InvokeRequestInfo], | 
|  | timedRequestTimeoutMs: typing.Optional[int] = None, | 
|  | interactionTimeoutMs: typing.Optional[int] = None, busyWaitMs: typing.Optional[int] = None, | 
|  | suppressResponse: typing.Optional[bool] = None, | 
|  | payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD): | 
|  | ''' | 
|  | Send a batch of cluster-object encapsulated commands to a node and get returned a future that can be awaited upon to receive | 
|  | the responses. If a valid responseType is passed in, that will be used to de-serialize the object. If not, | 
|  | the type will be automatically deduced from the metadata received over the wire. | 
|  |  | 
|  | nodeId: Target's Node ID | 
|  | commands: A list of InvokeRequestInfo containing the commands to invoke. | 
|  | timedWriteTimeoutMs: Timeout for a timed invoke request. Omit or set to 'None' to indicate a non-timed request. | 
|  | interactionTimeoutMs: Overall timeout for the interaction. Omit or set to 'None' to have the SDK automatically compute the | 
|  | right timeout value based on transport characteristics as well as the responsiveness of the target. | 
|  | busyWaitMs: How long to wait in ms after sending command to device before performing any other operations. | 
|  | suppressResponse: Do not send a response to this action | 
|  |  | 
|  | Returns: | 
|  | - List of command responses in the same order as what was given in `commands`. The type of the response is defined by the command. | 
|  | - A value of `None` indicates success. | 
|  | - If only a single command fails, for example with `UNSUPPORTED_COMMAND`, the corresponding index associated with the command will, | 
|  | contain `interaction_model.Status.UnsupportedCommand`. | 
|  | - If a command is not responded to by server, command will contain `interaction_model.Status.NoCommandResponse` | 
|  | Raises: | 
|  | - InteractionModelError if error with sending of InvokeRequestMessage fails as a whole. | 
|  | ''' | 
|  | self.CheckIsActive() | 
|  |  | 
|  | eventLoop = asyncio.get_running_loop() | 
|  | future = eventLoop.create_future() | 
|  |  | 
|  | device = await self.GetConnectedDevice(nodeid, timeoutMs=interactionTimeoutMs, payloadCapability=payloadCapability) | 
|  |  | 
|  | res = await ClusterCommand.SendBatchCommands( | 
|  | future, eventLoop, device.deviceProxy, commands, | 
|  | timedRequestTimeoutMs=timedRequestTimeoutMs, | 
|  | interactionTimeoutMs=interactionTimeoutMs, busyWaitMs=busyWaitMs, suppressResponse=suppressResponse) | 
|  | res.raise_on_error() | 
|  | return await future | 
|  |  | 
|  | def SendGroupCommand(self, groupid: int, payload: ClusterObjects.ClusterCommand, busyWaitMs: typing.Optional[int] = None): | 
|  | ''' | 
|  | Send a group cluster-object encapsulated command to a group_id and get returned a future | 
|  | that can be awaited upon to get confirmation command was sent. | 
|  |  | 
|  | Returns: | 
|  | None: responses are not sent to group commands. | 
|  |  | 
|  | Raises: | 
|  | InteractionModelError on error. | 
|  | ''' | 
|  | self.CheckIsActive() | 
|  |  | 
|  | assert self.devCtrl is not None | 
|  | ClusterCommand.SendGroupCommand( | 
|  | groupid, self.devCtrl, payload, busyWaitMs=busyWaitMs).raise_on_error() | 
|  |  | 
|  | # None is the expected return for sending group commands. | 
|  | return None | 
|  |  | 
|  | async def WriteAttribute(self, nodeid: int, | 
|  | attributes: typing.List[typing.Tuple[int, ClusterObjects.ClusterAttributeDescriptor]], | 
|  | timedRequestTimeoutMs: typing.Optional[int] = None, | 
|  | interactionTimeoutMs: typing.Optional[int] = None, busyWaitMs: typing.Optional[int] = None, | 
|  | payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD): | 
|  | ''' | 
|  | Write a list of attributes on a target node. | 
|  |  | 
|  | nodeId: Target's Node ID | 
|  | timedWriteTimeoutMs: Timeout for a timed write request. Omit or set to 'None' to indicate a non-timed request. | 
|  | attributes: A list of tuples of type (endpoint, cluster-object): | 
|  | interactionTimeoutMs: Overall timeout for the interaction. Omit or set to 'None' to have the SDK automatically compute the | 
|  | right timeout value based on transport characteristics as well as the responsiveness of the target. | 
|  |  | 
|  | E.g | 
|  | (1, Clusters.UnitTesting.Attributes.XYZAttribute('hello')) -- Write 'hello' | 
|  | to the XYZ attribute on the test cluster to endpoint 1 | 
|  |  | 
|  | Returns: | 
|  | [AttributeStatus] (list - one for each path). | 
|  |  | 
|  | Raises: | 
|  | InteractionModelError on error. | 
|  | ''' | 
|  |  | 
|  | return await self._WriteAttribute(nodeid=nodeid, | 
|  | attributes=attributes, | 
|  | timedRequestTimeoutMs=timedRequestTimeoutMs, | 
|  | interactionTimeoutMs=interactionTimeoutMs, | 
|  | busyWaitMs=busyWaitMs, | 
|  | payloadCapability=payloadCapability, | 
|  | forceLegacyListEncoding=False) | 
|  |  | 
|  | async def _WriteAttribute(self, nodeid: int, | 
|  | attributes: typing.List[typing.Tuple[int, ClusterObjects.ClusterAttributeDescriptor]], | 
|  | timedRequestTimeoutMs: typing.Optional[int] = None, | 
|  | interactionTimeoutMs: typing.Optional[int] = None, busyWaitMs: typing.Optional[int] = None, | 
|  | payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD, forceLegacyListEncoding: bool = False): | 
|  |  | 
|  | self.CheckIsActive() | 
|  |  | 
|  | eventLoop = asyncio.get_running_loop() | 
|  | future = eventLoop.create_future() | 
|  |  | 
|  | device = await self.GetConnectedDevice(nodeid, timeoutMs=interactionTimeoutMs, payloadCapability=payloadCapability) | 
|  |  | 
|  | attrs = [] | 
|  | for v in attributes: | 
|  | if len(v) == 2: | 
|  | attrs.append(ClusterAttribute.AttributeWriteRequest( | 
|  | v[0], v[1], 0, 0, v[1].value))  # type: ignore[attr-defined]  # 'value' added dynamically to ClusterAttributeDescriptor | 
|  | else: | 
|  | attrs.append(ClusterAttribute.AttributeWriteRequest( | 
|  | v[0], v[1], v[2], 1, v[1].value)) | 
|  |  | 
|  | ClusterAttribute.WriteAttributes( | 
|  | future, eventLoop, device.deviceProxy, attrs, timedRequestTimeoutMs=timedRequestTimeoutMs, | 
|  | interactionTimeoutMs=interactionTimeoutMs, busyWaitMs=busyWaitMs, forceLegacyListEncoding=forceLegacyListEncoding).raise_on_error() | 
|  | return await future | 
|  |  | 
|  | async def TestOnlyWriteAttributeWithLegacyList(self, nodeid: int, | 
|  | attributes: typing.List[typing.Tuple[int, ClusterObjects.ClusterAttributeDescriptor]], | 
|  | timedRequestTimeoutMs: typing.Optional[int] = None, | 
|  | interactionTimeoutMs: typing.Optional[int] = None, busyWaitMs: typing.Optional[int] = None, | 
|  | payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD): | 
|  | ''' | 
|  | Please see WriteAttribute for description. | 
|  | This is a test-only wrapper for _WriteAttribute that sets forceLegacyListEncoding to True. | 
|  |  | 
|  | The purpose of this method is to write attributes using the legacy encoding format for list data types, to ensure that end devices support legacy WriteClients. | 
|  |  | 
|  |  | 
|  | Returns: | 
|  | [AttributeStatus] (list - one for each path). | 
|  |  | 
|  | Raises: | 
|  | InteractionModelError on error. | 
|  | ''' | 
|  |  | 
|  | return await self._WriteAttribute(nodeid=nodeid, | 
|  | attributes=attributes, | 
|  | timedRequestTimeoutMs=timedRequestTimeoutMs, | 
|  | interactionTimeoutMs=interactionTimeoutMs, | 
|  | busyWaitMs=busyWaitMs, | 
|  | payloadCapability=payloadCapability, | 
|  | forceLegacyListEncoding=True) | 
|  |  | 
|  | def WriteGroupAttribute( | 
|  | self, groupid: int, attributes: typing.List[typing.Tuple[ClusterObjects.ClusterAttributeDescriptor, int]], busyWaitMs: typing.Optional[int] = None): | 
|  | ''' | 
|  | Write a list of attributes on a target group. | 
|  |  | 
|  | groupid: Group ID to send write attribute to. | 
|  | attributes: A list of tuples of type (cluster-object, data-version). The data-version can be omitted. | 
|  |  | 
|  | E.g | 
|  | (Clusters.UnitTesting.Attributes.XYZAttribute('hello'), 1) -- Group Write 'hello' with data version 1. | 
|  |  | 
|  | Returns: | 
|  | list = An empty list | 
|  |  | 
|  | Raises: | 
|  | InteractionModelError on error. | 
|  | ''' | 
|  | self.CheckIsActive() | 
|  |  | 
|  | attrs = [] | 
|  | invalid_endpoint = 0xFFFF | 
|  | for v in attributes: | 
|  | if len(v) == 2: | 
|  | attrs.append(ClusterAttribute.AttributeWriteRequest( | 
|  | # 'value' added dynamically to ClusterAttributeDescriptor | 
|  | invalid_endpoint, v[0], v[1], 1, v[0].value))  # type: ignore[attr-defined] | 
|  | else: | 
|  | attrs.append(ClusterAttribute.AttributeWriteRequest( | 
|  | invalid_endpoint, v[0], 0, 0, v[0].value)) | 
|  |  | 
|  | assert self.devCtrl is not None | 
|  | ClusterAttribute.WriteGroupAttributes( | 
|  | groupid, self.devCtrl, attrs, busyWaitMs=busyWaitMs).raise_on_error() | 
|  |  | 
|  | # An empty list is the expected return for sending group write attribute. | 
|  | return [] | 
|  |  | 
|  | def TestOnlyPrepareToReceiveBdxData(self) -> asyncio.Future: | 
|  | ''' | 
|  | Sets up the system to expect a node to initiate a BDX transfer. The transfer will send data here. | 
|  |  | 
|  | If no BDX transfer is initiated, the caller must cancel the returned future to avoid interfering with other BDX transfers. | 
|  | For example, the Diagnostic Logs clusters won't start a BDX transfer when the log is small so the future must be cancelled to allow later | 
|  | attempts to retrieve logs to succeed. | 
|  |  | 
|  | Returns: | 
|  | a future that will yield a BdxTransfer with the init message from the transfer. | 
|  |  | 
|  | Raises: | 
|  | InteractionModelError on error. | 
|  | ''' | 
|  | self.CheckIsActive() | 
|  |  | 
|  | eventLoop = asyncio.get_running_loop() | 
|  | future = eventLoop.create_future() | 
|  |  | 
|  | Bdx.PrepareToReceiveBdxData(future).raise_on_error() | 
|  | return future | 
|  |  | 
|  | def TestOnlyPrepareToSendBdxData(self, data: bytes) -> asyncio.Future: | 
|  | ''' | 
|  | Sets up the system to expect a node to initiate a BDX transfer. The transfer will send data to the node. | 
|  |  | 
|  | If no BDX transfer is initiated, the caller must cancel the returned future to avoid interfering with other BDX transfers. | 
|  |  | 
|  | Returns: | 
|  | A future that will yield a BdxTransfer with the init message from the transfer. | 
|  |  | 
|  | Raises: | 
|  | InteractionModelError on error. | 
|  | ''' | 
|  | self.CheckIsActive() | 
|  |  | 
|  | eventLoop = asyncio.get_running_loop() | 
|  | future = eventLoop.create_future() | 
|  |  | 
|  | Bdx.PrepareToSendBdxData(future, data).raise_on_error() | 
|  | return future | 
|  |  | 
|  | # mypy errors ignored due to valid use of dynamic types and flexible tuple formats | 
|  | # Fixing these typing errors is a high risk to affect existing functionality. | 
|  | # these mismatches are intentional and safe within the current logic | 
|  | # TODO:  Explore proper typing for dynamic attributes in ChipDeviceCtrl.py #618 | 
|  | def _parseAttributePathTuple(self, pathTuple: typing.Union[ | 
|  | None,  # Empty tuple, all wildcard | 
|  | typing.Tuple[int],  # Endpoint | 
|  | # Wildcard endpoint, Cluster id present | 
|  | typing.Tuple[typing.Type[ClusterObjects.Cluster]], | 
|  | # Wildcard endpoint, Cluster + Attribute present | 
|  | typing.Tuple[typing.Type[ClusterObjects.ClusterAttributeDescriptor]], | 
|  | # Wildcard attribute id | 
|  | typing.Tuple[int, typing.Type[ClusterObjects.Cluster]], | 
|  | # Concrete path | 
|  | typing.Tuple[int, typing.Type[ClusterObjects.ClusterAttributeDescriptor]], | 
|  | # Directly specified attribute path | 
|  | ClusterAttribute.AttributePath | 
|  | ]): | 
|  | if isinstance(pathTuple, ClusterAttribute.AttributePath): | 
|  | return pathTuple | 
|  | if pathTuple == ('*') or pathTuple == (): | 
|  | # Wildcard | 
|  | return ClusterAttribute.AttributePath() | 
|  | elif not isinstance(pathTuple, tuple): | 
|  | if isinstance(pathTuple, int): | 
|  | return ClusterAttribute.AttributePath(EndpointId=pathTuple) | 
|  | elif issubclass(pathTuple, ClusterObjects.Cluster):  # type: ignore[misc, arg-type] | 
|  | return ClusterAttribute.AttributePath.from_cluster(EndpointId=None, Cluster=pathTuple)  # type: ignore[arg-type] | 
|  | elif issubclass(pathTuple, ClusterObjects.ClusterAttributeDescriptor):  # type: ignore[arg-type] | 
|  | return ClusterAttribute.AttributePath.from_attribute(EndpointId=None, Attribute=pathTuple)  # type: ignore[arg-type] | 
|  | else: | 
|  | raise ValueError("Unsupported Attribute Path") | 
|  | else: | 
|  | # endpoint + (cluster) attribute / endpoint + cluster | 
|  | if issubclass(pathTuple[1], ClusterObjects.Cluster):  # type: ignore[misc] | 
|  | return ClusterAttribute.AttributePath.from_cluster( | 
|  | EndpointId=pathTuple[0],  # type: ignore[arg-type] | 
|  | Cluster=pathTuple[1]  # type: ignore[arg-type, misc] | 
|  | ) | 
|  | elif issubclass(pathTuple[1], ClusterAttribute.ClusterAttributeDescriptor):  # type: ignore[arg-type, misc] | 
|  | return ClusterAttribute.AttributePath.from_attribute( | 
|  | EndpointId=pathTuple[0],    # type: ignore[arg-type] | 
|  | Attribute=pathTuple[1]  # type: ignore[arg-type, misc] | 
|  | ) | 
|  | else: | 
|  | raise ValueError("Unsupported Attribute Path") | 
|  |  | 
|  | def _parseDataVersionFilterTuple(self, pathTuple: typing.List[typing.Tuple[int, typing.Type[ClusterObjects.Cluster], int]]): | 
|  | endpoint = None | 
|  | cluster = None | 
|  |  | 
|  | # endpoint + (cluster) attribute / endpoint + cluster | 
|  | endpoint = pathTuple[0] | 
|  | # mypy errors ignored due to valid use of dynamic types (e.g., int, str, or class types). | 
|  | # Fixing these typing errors is a high risk to affect existing functionality. | 
|  | # These mismatches are intentional and safe within the current logic. | 
|  | # TODO:  Explore proper typing for dynamic attributes in ChipDeviceCtrl.py #618 | 
|  | if issubclass(pathTuple[1], ClusterObjects.Cluster):  # type: ignore[arg-type] | 
|  | cluster = pathTuple[1] | 
|  | else: | 
|  | raise ValueError("Unsupported Cluster Path") | 
|  | dataVersion = pathTuple[2] | 
|  | return ClusterAttribute.DataVersionFilter.from_cluster( | 
|  | EndpointId=endpoint, Cluster=cluster, DataVersion=dataVersion)  # type: ignore[arg-type] | 
|  |  | 
|  | def _parseEventPathTuple(self, pathTuple: typing.Union[ | 
|  | None,  # Empty tuple, all wildcard | 
|  | typing.Tuple[str, int],  # all wildcard with urgency set | 
|  | typing.Tuple[int, int],  # Endpoint, | 
|  | # Wildcard endpoint, Cluster id present | 
|  | typing.Tuple[typing.Type[ClusterObjects.Cluster], int], | 
|  | # Wildcard endpoint, Cluster + Event present | 
|  | typing.Tuple[typing.Type[ClusterObjects.ClusterEvent], int], | 
|  | # Wildcard event id | 
|  | typing.Tuple[int, typing.Type[ClusterObjects.Cluster], int], | 
|  | # Concrete path | 
|  | typing.Tuple[int, | 
|  | typing.Type[ClusterObjects.ClusterEvent], int] | 
|  | ]): | 
|  | if pathTuple in [('*'), ()]: | 
|  | # Wildcard | 
|  | return ClusterAttribute.EventPath() | 
|  | elif not isinstance(pathTuple, tuple): | 
|  | # mypy refactor (PR https://github.com/project-chip/connectedhomeip/pull/39827): | 
|  | # instantiate class types before passing to from_cluster/from_event | 
|  | # because these expect instances, not classes. | 
|  | if isinstance(pathTuple, int): | 
|  | return ClusterAttribute.EventPath(EndpointId=pathTuple) | 
|  | elif isinstance(pathTuple, type) and issubclass(pathTuple, ClusterObjects.Cluster): | 
|  | return ClusterAttribute.EventPath.from_cluster(EndpointId=None, Cluster=pathTuple) | 
|  | elif isinstance(pathTuple, type) and issubclass(pathTuple, ClusterObjects.ClusterEvent): | 
|  | return ClusterAttribute.EventPath.from_event(EndpointId=None, Event=pathTuple) | 
|  | else: | 
|  | raise ValueError("Unsupported Event Path") | 
|  | else: | 
|  | if pathTuple[0] == '*': | 
|  | return ClusterAttribute.EventPath(Urgent=pathTuple[-1]) | 
|  | else: | 
|  | urgent = bool(pathTuple[-1]) if len(pathTuple) > 2 else False | 
|  | # endpoint + (cluster) event / endpoint + cluster | 
|  | # mypy errors ignored due to valid use of dynamic types (e.g., int, str, or class types). | 
|  | # Fixing these typing errors is a high risk to affect existing functionality. | 
|  | # These mismatches are intentional and safe within the current logic. | 
|  | if issubclass(pathTuple[1], ClusterObjects.Cluster):  # type: ignore[arg-type] | 
|  | return ClusterAttribute.EventPath.from_cluster( | 
|  | EndpointId=pathTuple[0],    # type: ignore[arg-type] | 
|  | Cluster=pathTuple[1], Urgent=urgent  # type: ignore[arg-type] | 
|  | ) | 
|  | elif issubclass(pathTuple[1], ClusterAttribute.ClusterEvent):  # type: ignore[arg-type] | 
|  | return ClusterAttribute.EventPath.from_event( | 
|  | EndpointId=pathTuple[0],    # type: ignore[arg-type] | 
|  | Event=pathTuple[1],  # type: ignore[arg-type] | 
|  | Urgent=urgent | 
|  | ) | 
|  | else: | 
|  | raise ValueError("Unsupported Attribute Path") | 
|  |  | 
|  | async def Read( | 
|  | self, | 
|  | nodeid: int, | 
|  | attributes: typing.Optional[typing.List[typing.Union[ | 
|  | None,  # Empty tuple, all wildcard | 
|  | typing.Tuple[int],  # Endpoint | 
|  | # Wildcard endpoint, Cluster id present | 
|  | typing.Tuple[typing.Type[ClusterObjects.Cluster]], | 
|  | # Wildcard endpoint, Cluster + Attribute present | 
|  | typing.Tuple[typing.Type[ClusterObjects.ClusterAttributeDescriptor]], | 
|  | # Wildcard attribute id | 
|  | typing.Tuple[int, typing.Type[ClusterObjects.Cluster]], | 
|  | # Concrete path | 
|  | typing.Tuple[int, typing.Type[ClusterObjects.ClusterAttributeDescriptor]], | 
|  | # Directly specified attribute path | 
|  | ClusterAttribute.AttributePath | 
|  | ]]] = None, | 
|  | dataVersionFilters: typing.Optional[typing.List[typing.Tuple[int, typing.Type[ClusterObjects.Cluster], int]]] = None, events: typing.Optional[typing.List[ | 
|  | typing.Union[ | 
|  | None,  # Empty tuple, all wildcard | 
|  | typing.Tuple[str, int],  # all wildcard with urgency set | 
|  | typing.Tuple[int, int],  # Endpoint, | 
|  | # Wildcard endpoint, Cluster id present | 
|  | typing.Tuple[typing.Type[ClusterObjects.Cluster], int], | 
|  | # Wildcard endpoint, Cluster + Event present | 
|  | typing.Tuple[typing.Type[ClusterObjects.ClusterEvent], int], | 
|  | # Wildcard event id | 
|  | typing.Tuple[int, typing.Type[ClusterObjects.Cluster], int], | 
|  | # Concrete path | 
|  | typing.Tuple[int, typing.Type[ClusterObjects.ClusterEvent], int] | 
|  | ]]] = None, | 
|  | eventNumberFilter: typing.Optional[int] = None, | 
|  | returnClusterObject: bool = False, reportInterval: typing.Optional[typing.Tuple[int, int]] = None, | 
|  | fabricFiltered: bool = True, keepSubscriptions: bool = False, autoResubscribe: bool = True, | 
|  | payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD | 
|  | ): | 
|  | ''' | 
|  | Read a list of attributes and/or events from a target node | 
|  |  | 
|  | nodeId: Target's Node ID | 
|  | attributes: A list of tuples of varying types depending on the type of read being requested: | 
|  | (endpoint, Clusters.ClusterA.AttributeA):   Endpoint = specific,    Cluster = specific,   Attribute = specific | 
|  | (endpoint, Clusters.ClusterA):              Endpoint = specific,    Cluster = specific,   Attribute = * | 
|  | (Clusters.ClusterA.AttributeA):             Endpoint = *,           Cluster = specific,   Attribute = specific | 
|  | endpoint:                                   Endpoint = specific,    Cluster = *,          Attribute = * | 
|  | Clusters.ClusterA:                          Endpoint = *,           Cluster = specific,   Attribute = * | 
|  | '*' or ():                                  Endpoint = *,           Cluster = *,          Attribute = * | 
|  |  | 
|  | The cluster and attributes specified above are to be selected from the generated cluster objects. | 
|  |  | 
|  | e.g. | 
|  | ReadAttribute(1, [ 1 ] ) -- case 4 above. | 
|  | ReadAttribute(1, [ Clusters.BasicInformation ] ) -- case 5 above. | 
|  | ReadAttribute(1, [ (1, Clusters.BasicInformation.Attributes.Location ] ) -- case 1 above. | 
|  |  | 
|  | An AttributePath can also be specified directly by [matter.cluster.Attribute.AttributePath(...)] | 
|  |  | 
|  | dataVersionFilters: A list of tuples of (endpoint, cluster, data version). | 
|  |  | 
|  | events: A list of tuples of varying types depending on the type of read being requested: | 
|  | (endpoint, Clusters.ClusterA.EventA, urgent):       Endpoint = specific, | 
|  | Cluster = specific,   Event = specific, Urgent = True/False | 
|  | (endpoint, Clusters.ClusterA, urgent):              Endpoint = specific, | 
|  | Cluster = specific,   Event = *, Urgent = True/False | 
|  | (Clusters.ClusterA.EventA, urgent):                 Endpoint = *, | 
|  | Cluster = specific,   Event = specific, Urgent = True/False | 
|  | endpoint:                                   Endpoint = specific,    Cluster = *,          Event = *, Urgent = True/False | 
|  | Clusters.ClusterA:                          Endpoint = *,          Cluster = specific,    Event = *, Urgent = True/False | 
|  | '*' or ():                                  Endpoint = *,          Cluster = *,          Event = *, Urgent = True/False | 
|  |  | 
|  | eventNumberFilter: Optional minimum event number filter. | 
|  |  | 
|  | returnClusterObject: This returns the data as consolidated cluster objects, with all attributes for a cluster inside | 
|  | a single cluster-wide cluster object. | 
|  |  | 
|  | reportInterval: A tuple of two int-s for (MinIntervalFloor, MaxIntervalCeiling). Used by establishing subscriptions. | 
|  | When not provided, a read request will be sent. | 
|  | fabricFiltered: If True (default), the read/subscribe is fabric-filtered and will only see things associated with the fabric | 
|  | of the reader/subscriber. Relevant for attributes with fabric-scoped data. | 
|  | keepSubscriptions: Keep existing subscriptions. If set to False, existing subscriptions with this node will get cancelled | 
|  | and a new one gets setup. | 
|  | autoResubscribe: Automatically resubscribe to the subscription if subscription is lost. The automatic re-subscription only | 
|  | applies if the subscription establishes on first try. If the first subscription establishment attempt fails the function | 
|  | returns right away. | 
|  |  | 
|  | Returns: | 
|  | - AsyncReadTransaction.ReadResponse. Please see ReadAttribute and ReadEvent for examples of how to access data. | 
|  |  | 
|  | Raises: | 
|  | - InteractionModelError (matter.interaction_model) on error | 
|  |  | 
|  | ''' | 
|  | self.CheckIsActive() | 
|  | # mypy errors ignored due to valid use of dynamic types. | 
|  | # A single tuple is passed intentionally (not a list), as expected by the method logic. | 
|  | # Fixing these typing errors is a high risk to affect existing functionality. | 
|  | # These mismatches are intentional and safe within the current logic. | 
|  | # TODO:  Explore proper typing for dynamic attributes in ChipDeviceCtrl.py #618 | 
|  |  | 
|  | eventLoop = asyncio.get_running_loop() | 
|  | future = eventLoop.create_future() | 
|  |  | 
|  | device = await self.GetConnectedDevice(nodeid, payloadCapability=payloadCapability) | 
|  | attributePaths = [self._parseAttributePathTuple( | 
|  | v) for v in attributes] if attributes else None | 
|  | clusterDataVersionFilters = [self._parseDataVersionFilterTuple( | 
|  | v) for v in dataVersionFilters] if dataVersionFilters else None  # type: ignore[arg-type] | 
|  | eventPaths = [self._parseEventPathTuple( | 
|  | v) for v in events] if events else None | 
|  |  | 
|  | allowLargePayload = payloadCapability in (TransportPayloadCapability.LARGE_PAYLOAD, | 
|  | TransportPayloadCapability.MRP_OR_TCP_PAYLOAD) | 
|  | transaction = ClusterAttribute.AsyncReadTransaction(future, eventLoop, self, returnClusterObject) | 
|  | ClusterAttribute.Read(transaction, device=device.deviceProxy, | 
|  | attributes=attributePaths, dataVersionFilters=clusterDataVersionFilters, events=eventPaths, | 
|  | eventNumberFilter=eventNumberFilter, | 
|  | subscriptionParameters=ClusterAttribute.SubscriptionParameters( | 
|  | reportInterval[0], reportInterval[1]) if reportInterval else None, | 
|  | fabricFiltered=fabricFiltered, | 
|  | keepSubscriptions=keepSubscriptions, autoResubscribe=autoResubscribe, allowLargePayload=allowLargePayload).raise_on_error() | 
|  | await future | 
|  |  | 
|  | if result := transaction.GetSubscriptionHandler(): | 
|  | return result | 
|  | return transaction.GetReadResponse() | 
|  |  | 
|  | async def ReadAttribute( | 
|  | self, | 
|  | nodeid: int, | 
|  | attributes: typing.Optional[typing.List[typing.Union[ | 
|  | None,  # Empty tuple, all wildcard | 
|  | typing.Tuple[int],  # Endpoint | 
|  | # Wildcard endpoint, Cluster id present | 
|  | typing.Tuple[typing.Type[ClusterObjects.Cluster]], | 
|  | # Wildcard endpoint, Cluster + Attribute present | 
|  | typing.Tuple[typing.Type[ClusterObjects.ClusterAttributeDescriptor]], | 
|  | # Wildcard attribute id | 
|  | typing.Tuple[int, typing.Type[ClusterObjects.Cluster]], | 
|  | # Concrete path | 
|  | typing.Tuple[int, typing.Type[ClusterObjects.ClusterAttributeDescriptor]], | 
|  | # Directly specified attribute path | 
|  | ClusterAttribute.AttributePath | 
|  | ]]], dataVersionFilters: typing.Optional[typing.List[typing.Tuple[int, typing.Type[ClusterObjects.Cluster], int]]] = None, | 
|  | returnClusterObject: bool = False, | 
|  | reportInterval: typing.Optional[typing.Tuple[int, int]] = None, | 
|  | fabricFiltered: bool = True, keepSubscriptions: bool = False, autoResubscribe: bool = True, | 
|  | payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD | 
|  | ): | 
|  | ''' | 
|  | Read a list of attributes from a target node, this is a wrapper of DeviceController.Read() | 
|  |  | 
|  | nodeId: Target's Node ID | 
|  | attributes: A list of tuples of varying types depending on the type of read being requested: | 
|  | (endpoint, Clusters.ClusterA.AttributeA):   Endpoint = specific,    Cluster = specific,   Attribute = specific | 
|  | (endpoint, Clusters.ClusterA):              Endpoint = specific,    Cluster = specific,   Attribute = * | 
|  | (Clusters.ClusterA.AttributeA):             Endpoint = *,           Cluster = specific,   Attribute = specific | 
|  | endpoint:                                   Endpoint = specific,    Cluster = *,          Attribute = * | 
|  | Clusters.ClusterA:                          Endpoint = *,           Cluster = specific,   Attribute = * | 
|  | '*' or ():                                  Endpoint = *,           Cluster = *,          Attribute = * | 
|  |  | 
|  | The cluster and attributes specified above are to be selected from the generated cluster objects. | 
|  |  | 
|  | e.g. | 
|  | ReadAttribute(1, [ 1 ] ) -- case 4 above. | 
|  | ReadAttribute(1, [ Clusters.BasicInformation ] ) -- case 5 above. | 
|  | ReadAttribute(1, [ (1, Clusters.BasicInformation.Attributes.Location ] ) -- case 1 above. | 
|  |  | 
|  | An AttributePath can also be specified directly by [matter.cluster.Attribute.AttributePath(...)] | 
|  |  | 
|  | returnClusterObject: This returns the data as consolidated cluster objects, with all attributes for a cluster inside | 
|  | a single cluster-wide cluster object. | 
|  |  | 
|  | reportInterval: A tuple of two int-s for (MinIntervalFloor, MaxIntervalCeiling). Used by establishing subscriptions. | 
|  | When not provided, a read request will be sent. | 
|  | fabricFiltered: If True (default), the read/subscribe is fabric-filtered and will only see things associated with the fabric | 
|  | of the reader/subscriber. Relevant for attributes with fabric-scoped data. | 
|  | keepSubscriptions: Keep existing subscriptions. If set to False, existing subscriptions with this node will get cancelled | 
|  | and a new one gets setup. | 
|  | autoResubscribe: Automatically resubscribe to the subscription if subscription is lost. The automatic re-subscription only | 
|  | applies if the subscription establishes on first try. If the first subscription establishment attempt fails the function | 
|  | returns right away. | 
|  |  | 
|  | Returns: | 
|  | - subscription request: ClusterAttribute.SubscriptionTransaction | 
|  | To get notified on attribute change use SetAttributeUpdateCallback on the returned | 
|  | SubscriptionTransaction. This is used to set a callback function, which is a callable of | 
|  | type Callable[[TypedAttributePath, SubscriptionTransaction], None] | 
|  | Get the attribute value from the change path using GetAttribute on the SubscriptionTransaction | 
|  | You can await changes in the main loop using a trigger mechanism from the callback. | 
|  | ex. queue.SimpleQueue | 
|  |  | 
|  | - read request: AsyncReadTransaction.ReadResponse.attributes. | 
|  | This is of type AttributeCache.attributeCache (Attribute.py), | 
|  | which is a dict mapping endpoints to a list of Cluster (ClusterObjects.py) classes | 
|  | (dict[int, List[Cluster]]) | 
|  | Access as returned_object[endpoint_id][<Cluster class>][<Attribute class>] | 
|  | Ex. To access the OnTime attribute from the OnOff cluster on endpoint 1 | 
|  | returned_object[1][Clusters.OnOff][Clusters.OnOff.Attributes.OnTime] | 
|  |  | 
|  | Raises: | 
|  | - InteractionModelError (matter.interaction_model) on error | 
|  | ''' | 
|  | res = await self.Read(nodeid, | 
|  | attributes=attributes, | 
|  | dataVersionFilters=dataVersionFilters, | 
|  | returnClusterObject=returnClusterObject, | 
|  | reportInterval=reportInterval, | 
|  | fabricFiltered=fabricFiltered, | 
|  | keepSubscriptions=keepSubscriptions, | 
|  | autoResubscribe=autoResubscribe, | 
|  | payloadCapability=payloadCapability) | 
|  | if isinstance(res, ClusterAttribute.SubscriptionTransaction): | 
|  | return res | 
|  | else: | 
|  | return res.attributes | 
|  |  | 
|  | async def ReadEvent( | 
|  | self, | 
|  | nodeid: int, | 
|  | events: typing.List[typing.Union[ | 
|  | None,  # Empty tuple, all wildcard | 
|  | typing.Tuple[str, int],  # all wildcard with urgency set | 
|  | typing.Tuple[int, int],  # Endpoint, | 
|  | # Wildcard endpoint, Cluster id present | 
|  | typing.Tuple[typing.Type[ClusterObjects.Cluster], int], | 
|  | # Wildcard endpoint, Cluster + Event present | 
|  | typing.Tuple[typing.Type[ClusterObjects.ClusterEvent], int], | 
|  | # Wildcard event id | 
|  | typing.Tuple[int, typing.Type[ClusterObjects.Cluster], int], | 
|  | # Concrete path | 
|  | typing.Tuple[int, typing.Type[ClusterObjects.ClusterEvent], int] | 
|  | ]], eventNumberFilter: typing.Optional[int] = None, | 
|  | fabricFiltered: bool = True, | 
|  | reportInterval: typing.Optional[typing.Tuple[int, int]] = None, | 
|  | keepSubscriptions: bool = False, | 
|  | autoResubscribe: bool = True, | 
|  | payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD | 
|  | ): | 
|  | ''' | 
|  | Read a list of events from a target node, this is a wrapper of DeviceController.Read() | 
|  |  | 
|  | nodeId: Target's Node ID | 
|  | events: A list of tuples of varying types depending on the type of read being requested: | 
|  | (endpoint, Clusters.ClusterA.EventA, urgent):       Endpoint = specific, | 
|  | Cluster = specific,   Event = specific, Urgent = True/False | 
|  | (endpoint, Clusters.ClusterA, urgent):              Endpoint = specific, | 
|  | Cluster = specific,   Event = *, Urgent = True/False | 
|  | (Clusters.ClusterA.EventA, urgent):                 Endpoint = *, | 
|  | Cluster = specific,   Event = specific, Urgent = True/False | 
|  | endpoint:                                   Endpoint = specific,    Cluster = *,          Event = *, Urgent = True/False | 
|  | Clusters.ClusterA:                          Endpoint = *,          Cluster = specific,    Event = *, Urgent = True/False | 
|  | '*' or ():                                  Endpoint = *,          Cluster = *,          Event = *, Urgent = True/False | 
|  |  | 
|  | The cluster and events specified above are to be selected from the generated cluster objects. | 
|  |  | 
|  | e.g. | 
|  | ReadEvent(1, [ 1 ] ) -- case 4 above. | 
|  | ReadEvent(1, [ Clusters.BasicInformation ] ) -- case 5 above. | 
|  | ReadEvent(1, [ (1, Clusters.BasicInformation.Events.Location ] ) -- case 1 above. | 
|  |  | 
|  | eventNumberFilter: Optional minimum event number filter. | 
|  | reportInterval: A tuple of two int-s for (MinIntervalFloor, MaxIntervalCeiling). Used by establishing subscriptions. | 
|  | When not provided, a read request will be sent. | 
|  | keepSubscriptions: Keep existing subscriptions. If set to False, existing subscriptions with this node will get cancelled | 
|  | and a new one gets setup. | 
|  | autoResubscribe: Automatically resubscribe to the subscription if subscription is lost. The automatic re-subscription only | 
|  | applies if the subscription establishes on first try. If the first subscription establishment attempt fails the function | 
|  | returns right away. | 
|  |  | 
|  | Returns: | 
|  | - subscription request: ClusterAttribute.SubscriptionTransaction | 
|  | To get notified on event subscriptions, use the SetEventUpdateCallback function on the | 
|  | returned  SubscriptionTransaction. This is a callable of type | 
|  | Callable[[EventReadResult, SubscriptionTransaction], None] | 
|  | You can await events using a trigger mechanism in the callback. ex. queue.SimpleQueue | 
|  |  | 
|  | - read request: AsyncReadTransaction.ReadResponse.events. | 
|  | This is a List[ClusterEvent]. | 
|  |  | 
|  | Raises: | 
|  | - InteractionModelError (matter.interaction_model) on error | 
|  | ''' | 
|  | res = await self.Read(nodeid=nodeid, events=events, eventNumberFilter=eventNumberFilter, | 
|  | fabricFiltered=fabricFiltered, reportInterval=reportInterval, keepSubscriptions=keepSubscriptions, | 
|  | autoResubscribe=autoResubscribe, payloadCapability=payloadCapability) | 
|  | if isinstance(res, ClusterAttribute.SubscriptionTransaction): | 
|  | return res | 
|  | else: | 
|  | return res.events | 
|  |  | 
|  | def SetIpk(self, ipk: bytes): | 
|  | ''' | 
|  | Sets the Identity Protection Key (IPK) for the device controller. | 
|  |  | 
|  | Raises: | 
|  | ChipStackError: On failure. | 
|  | ''' | 
|  | self._ChipStack.Call( | 
|  | lambda: self._dmLib.pychip_DeviceController_SetIpk(self.devCtrl, ipk, len(ipk)) | 
|  | ).raise_on_error() | 
|  |  | 
|  | def InitGroupTestingData(self): | 
|  | ''' | 
|  | Populates the Device Controller's GroupDataProvider with known test group info and keys. | 
|  |  | 
|  | Raises: | 
|  | ChipStackError: On failure. | 
|  | ''' | 
|  | self.CheckIsActive() | 
|  |  | 
|  | self._ChipStack.Call( | 
|  | lambda: self._dmLib.pychip_OpCreds_InitGroupTestingData( | 
|  | self.devCtrl) | 
|  | ).raise_on_error() | 
|  |  | 
|  | def CreateManualCode(self, discriminator: int, passcode: int) -> str: | 
|  | ''' | 
|  | Creates a standard flow manual code from the given discriminator and passcode. | 
|  |  | 
|  | Args: | 
|  | discriminator (int): The long discriminator for the DNS-SD advertisement. Valid range: 0-4095. | 
|  | passcode (int): The setup passcode of the device. | 
|  |  | 
|  | Returns: | 
|  | str: The decoded string from the buffer. | 
|  |  | 
|  | Raises: | 
|  | MemoryError: If the output size is invalid during manual code creation. | 
|  | ''' | 
|  | # 64 bytes is WAY more than required, but let's be safe | 
|  | in_size = 64 | 
|  | out_size = c_size_t(0) | 
|  | buf = create_string_buffer(in_size) | 
|  | self._ChipStack.Call( | 
|  | lambda: self._dmLib.pychip_CreateManualCode(discriminator, passcode, buf, in_size, pointer(out_size)) | 
|  | ).raise_on_error() | 
|  | if out_size.value == 0 or out_size.value > in_size: | 
|  | raise MemoryError("Invalid output size for manual code") | 
|  | return buf.value.decode() | 
|  |  | 
|  | # ----- Private Members ----- | 
|  | def _InitLib(self): | 
|  | if self._dmLib is None: | 
|  | self._dmLib = CDLL(self._ChipStack.LocateChipDLL()) | 
|  |  | 
|  | self._dmLib.pychip_DeviceController_DeleteDeviceController.argtypes = [ | 
|  | c_void_p, c_void_p] | 
|  | self._dmLib.pychip_DeviceController_DeleteDeviceController.restype = PyChipError | 
|  |  | 
|  | self._dmLib.pychip_DeviceController_ConnectBLE.argtypes = [ | 
|  | c_void_p, c_uint16, c_bool, c_uint32, c_uint64] | 
|  | self._dmLib.pychip_DeviceController_ConnectBLE.restype = PyChipError | 
|  |  | 
|  | self._dmLib.pychip_DeviceController_ConnectNFC.argtypes = [ | 
|  | c_void_p, c_uint16, c_uint32, c_uint64] | 
|  | self._dmLib.pychip_DeviceController_ConnectNFC.restype = PyChipError | 
|  |  | 
|  | self._dmLib.pychip_DeviceController_SetThreadOperationalDataset.argtypes = [ | 
|  | c_char_p, c_uint32] | 
|  | self._dmLib.pychip_DeviceController_SetThreadOperationalDataset.restype = PyChipError | 
|  |  | 
|  | self._dmLib.pychip_DeviceController_SetWiFiCredentials.argtypes = [ | 
|  | c_char_p, c_char_p] | 
|  | self._dmLib.pychip_DeviceController_SetWiFiCredentials.restype = PyChipError | 
|  |  | 
|  | # Currently only supports 1 list item | 
|  | self._dmLib.pychip_DeviceController_SetTimeZone.restype = PyChipError | 
|  | self._dmLib.pychip_DeviceController_SetTimeZone.argtypes = [ | 
|  | c_int32, c_uint64, c_char_p] | 
|  |  | 
|  | # Currently only supports 1 list item | 
|  | self._dmLib.pychip_DeviceController_SetDSTOffset.restype = PyChipError | 
|  | self._dmLib.pychip_DeviceController_SetDSTOffset.argtypes = [ | 
|  | c_int32, c_uint64, c_uint64] | 
|  |  | 
|  | self._dmLib.pychip_DeviceController_SetDefaultNtp.restype = PyChipError | 
|  | self._dmLib.pychip_DeviceController_SetDefaultNtp.argtypes = [c_char_p] | 
|  |  | 
|  | self._dmLib.pychip_DeviceController_SetTrustedTimeSource.restype = PyChipError | 
|  | self._dmLib.pychip_DeviceController_SetTrustedTimeSource.argtypes = [c_uint64, c_uint16] | 
|  |  | 
|  | self._dmLib.pychip_DeviceController_SetCheckMatchingFabric.restype = PyChipError | 
|  | self._dmLib.pychip_DeviceController_SetCheckMatchingFabric.argtypes = [c_bool] | 
|  |  | 
|  | self._dmLib.pychip_DeviceController_SetIcdRegistrationParameters.restype = PyChipError | 
|  | self._dmLib.pychip_DeviceController_SetIcdRegistrationParameters.argtypes = [ | 
|  | c_bool, c_void_p | 
|  | ] | 
|  |  | 
|  | self._dmLib.pychip_DeviceController_ResetCommissioningParameters.restype = PyChipError | 
|  | self._dmLib.pychip_DeviceController_ResetCommissioningParameters.argtypes = [] | 
|  |  | 
|  | self._dmLib.pychip_DeviceController_Commission.argtypes = [ | 
|  | c_void_p, c_uint64] | 
|  | self._dmLib.pychip_DeviceController_Commission.restype = PyChipError | 
|  |  | 
|  | self._dmLib.pychip_DeviceController_OnNetworkCommission.argtypes = [ | 
|  | c_void_p, c_void_p, c_uint64, c_uint32, c_uint8, c_char_p, c_uint32] | 
|  | self._dmLib.pychip_DeviceController_OnNetworkCommission.restype = PyChipError | 
|  |  | 
|  | self._dmLib.pychip_DeviceController_DiscoverCommissionableNodes.argtypes = [ | 
|  | c_void_p, c_uint8, c_char_p] | 
|  | self._dmLib.pychip_DeviceController_DiscoverCommissionableNodes.restype = PyChipError | 
|  |  | 
|  | self._dmLib.pychip_DeviceController_StopCommissionableDiscovery.argtypes = [ | 
|  | c_void_p] | 
|  | self._dmLib.pychip_DeviceController_StopCommissionableDiscovery.restype = PyChipError | 
|  |  | 
|  | self._dmLib.pychip_DeviceController_EstablishPASESessionIP.argtypes = [ | 
|  | c_void_p, c_char_p, c_uint32, c_uint64, c_uint16] | 
|  | self._dmLib.pychip_DeviceController_EstablishPASESessionIP.restype = PyChipError | 
|  |  | 
|  | self._dmLib.pychip_DeviceController_EstablishPASESessionBLE.argtypes = [ | 
|  | c_void_p, c_uint32, c_uint16, c_uint64] | 
|  | self._dmLib.pychip_DeviceController_EstablishPASESessionBLE.restype = PyChipError | 
|  |  | 
|  | self._dmLib.pychip_DeviceController_EstablishPASESession.argtypes = [ | 
|  | c_void_p, c_char_p, c_uint64] | 
|  | self._dmLib.pychip_DeviceController_EstablishPASESession.restype = PyChipError | 
|  |  | 
|  | self._dmLib.pychip_DeviceController_HasDiscoveredCommissionableNode.argtypes = [c_void_p] | 
|  | self._dmLib.pychip_DeviceController_HasDiscoveredCommissionableNode.restype = c_bool | 
|  |  | 
|  | self._dmLib.pychip_DeviceController_GetIPForDiscoveredDevice.argtypes = [ | 
|  | c_void_p, c_int, c_char_p, c_uint32] | 
|  | self._dmLib.pychip_DeviceController_GetIPForDiscoveredDevice.restype = c_bool | 
|  |  | 
|  | self._dmLib.pychip_DeviceController_ConnectIP.argtypes = [ | 
|  | c_void_p, c_char_p, c_uint32, c_uint64] | 
|  | self._dmLib.pychip_DeviceController_ConnectIP.restype = PyChipError | 
|  |  | 
|  | self._dmLib.pychip_DeviceController_ConnectWithCode.argtypes = [ | 
|  | c_void_p, c_char_p, c_uint64, c_uint8] | 
|  | self._dmLib.pychip_DeviceController_ConnectWithCode.restype = PyChipError | 
|  |  | 
|  | self._dmLib.pychip_DeviceController_UnpairDevice.argtypes = [ | 
|  | c_void_p, c_uint64, _DeviceUnpairingCompleteFunct] | 
|  | self._dmLib.pychip_DeviceController_UnpairDevice.restype = PyChipError | 
|  |  | 
|  | self._dmLib.pychip_DeviceController_MarkSessionDefunct.argtypes = [ | 
|  | c_void_p, c_uint64] | 
|  | self._dmLib.pychip_DeviceController_MarkSessionDefunct.restype = PyChipError | 
|  |  | 
|  | self._dmLib.pychip_DeviceController_MarkSessionForEviction.argtypes = [ | 
|  | c_void_p, c_uint64] | 
|  | self._dmLib.pychip_DeviceController_MarkSessionForEviction.restype = PyChipError | 
|  |  | 
|  | self._dmLib.pychip_DeviceController_DeleteAllSessionResumption.argtypes = [ | 
|  | c_void_p] | 
|  | self._dmLib.pychip_DeviceController_DeleteAllSessionResumption.restype = PyChipError | 
|  |  | 
|  | self._dmLib.pychip_DeviceController_GetAddressAndPort.argtypes = [ | 
|  | c_void_p, c_uint64, c_char_p, c_uint64, POINTER(c_uint16)] | 
|  | self._dmLib.pychip_DeviceController_GetAddressAndPort.restype = PyChipError | 
|  |  | 
|  | self._dmLib.pychip_ScriptDevicePairingDelegate_SetKeyExchangeCallback.argtypes = [ | 
|  | c_void_p, _DevicePairingDelegate_OnPairingCompleteFunct] | 
|  | self._dmLib.pychip_ScriptDevicePairingDelegate_SetKeyExchangeCallback.restype = PyChipError | 
|  |  | 
|  | self._dmLib.pychip_ScriptDevicePairingDelegate_SetCommissioningCompleteCallback.argtypes = [ | 
|  | c_void_p, _DevicePairingDelegate_OnCommissioningCompleteFunct] | 
|  | self._dmLib.pychip_ScriptDevicePairingDelegate_SetCommissioningCompleteCallback.restype = PyChipError | 
|  |  | 
|  | self._dmLib.pychip_ScriptDevicePairingDelegate_SetOpenWindowCompleteCallback.argtypes = [ | 
|  | c_void_p, _DevicePairingDelegate_OnOpenWindowCompleteFunct] | 
|  | self._dmLib.pychip_ScriptDevicePairingDelegate_SetOpenWindowCompleteCallback.restype = PyChipError | 
|  |  | 
|  | self._dmLib.pychip_ScriptDevicePairingDelegate_SetCommissioningStatusUpdateCallback.argtypes = [ | 
|  | c_void_p, _DevicePairingDelegate_OnCommissioningStatusUpdateFunct] | 
|  | self._dmLib.pychip_ScriptDevicePairingDelegate_SetCommissioningStatusUpdateCallback.restype = PyChipError | 
|  |  | 
|  | self._dmLib.pychip_ScriptDevicePairingDelegate_SetFabricCheckCallback.argtypes = [ | 
|  | c_void_p, _DevicePairingDelegate_OnFabricCheckFunct] | 
|  | self._dmLib.pychip_ScriptDevicePairingDelegate_SetFabricCheckCallback.restype = PyChipError | 
|  |  | 
|  | self._dmLib.pychip_ScriptDevicePairingDelegate_SetExpectingPairingComplete.argtypes = [ | 
|  | c_void_p, c_bool] | 
|  | self._dmLib.pychip_ScriptDevicePairingDelegate_SetExpectingPairingComplete.restype = PyChipError | 
|  |  | 
|  | self._dmLib.pychip_GetConnectedDeviceByNodeId.argtypes = [ | 
|  | c_void_p, c_uint64, py_object, _DeviceAvailableCallbackFunct, c_int] | 
|  | self._dmLib.pychip_GetConnectedDeviceByNodeId.restype = PyChipError | 
|  |  | 
|  | self._dmLib.pychip_FreeOperationalDeviceProxy.argtypes = [ | 
|  | c_void_p] | 
|  | self._dmLib.pychip_FreeOperationalDeviceProxy.restype = PyChipError | 
|  |  | 
|  | self._dmLib.pychip_GetDeviceBeingCommissioned.argtypes = [ | 
|  | c_void_p, c_uint64, c_void_p] | 
|  | self._dmLib.pychip_GetDeviceBeingCommissioned.restype = PyChipError | 
|  |  | 
|  | self._dmLib.pychip_ExpireSessions.argtypes = [c_void_p, c_uint64] | 
|  | self._dmLib.pychip_ExpireSessions.restype = PyChipError | 
|  |  | 
|  | self._dmLib.pychip_DeviceCommissioner_CloseBleConnection.argtypes = [ | 
|  | c_void_p] | 
|  | self._dmLib.pychip_DeviceCommissioner_CloseBleConnection.restype = PyChipError | 
|  |  | 
|  | self._dmLib.pychip_GetCommandSenderHandle.argtypes = [c_void_p] | 
|  | self._dmLib.pychip_GetCommandSenderHandle.restype = c_uint64 | 
|  |  | 
|  | self._dmLib.pychip_DeviceController_GetCompressedFabricId.argtypes = [ | 
|  | c_void_p, POINTER(c_uint64)] | 
|  | self._dmLib.pychip_DeviceController_GetCompressedFabricId.restype = PyChipError | 
|  |  | 
|  | self._dmLib.pychip_DeviceController_OpenCommissioningWindow.argtypes = [ | 
|  | c_void_p, c_void_p, c_uint64, c_uint16, c_uint32, c_uint16, c_uint8] | 
|  | self._dmLib.pychip_DeviceController_OpenCommissioningWindow.restype = PyChipError | 
|  |  | 
|  | try: | 
|  | # NOTE: Joint Fabric is an optional feature in the Matter SDK core library. | 
|  | #       Build with CHIP_DEVICE_CONFIG_ENABLE_JOINT_FABRIC=1 to enable it. | 
|  | self._dmLib.pychip_DeviceController_OpenJointCommissioningWindow.argtypes = [ | 
|  | c_void_p, c_void_p, c_uint64, c_uint16, c_uint16, c_uint32, c_uint16] | 
|  | self._dmLib.pychip_DeviceController_OpenJointCommissioningWindow.restype = PyChipError | 
|  | except AttributeError: | 
|  | def _unsupported_joint_fabric(*args, **kwargs): | 
|  | raise NotImplementedError("Joint Fabric support is not available in this Matter SDK build.") | 
|  | self._dmLib.pychip_DeviceController_OpenJointCommissioningWindow = _unsupported_joint_fabric | 
|  |  | 
|  | self._dmLib.pychip_TestCommissionerUsed.argtypes = [] | 
|  | self._dmLib.pychip_TestCommissionerUsed.restype = c_bool | 
|  |  | 
|  | self._dmLib.pychip_TestCommissioningCallbacks.argtypes = [] | 
|  | self._dmLib.pychip_TestCommissioningCallbacks.restype = c_bool | 
|  |  | 
|  | self._dmLib.pychip_TestCommissioningStageSuccessful.argtypes = [c_uint8] | 
|  | self._dmLib.pychip_TestCommissioningStageSuccessful.restype = c_bool | 
|  |  | 
|  | self._dmLib.pychip_ResetCommissioningTests.argtypes = [] | 
|  | self._dmLib.pychip_TestPaseConnection.argtypes = [c_uint64] | 
|  |  | 
|  | self._dmLib.pychip_SetTestCommissionerSimulateFailureOnStage.argtypes = [ | 
|  | c_uint8] | 
|  | self._dmLib.pychip_SetTestCommissionerSimulateFailureOnStage.restype = c_bool | 
|  |  | 
|  | self._dmLib.pychip_SetTestCommissionerSimulateFailureOnReport.argtypes = [ | 
|  | c_uint8] | 
|  | self._dmLib.pychip_SetTestCommissionerSimulateFailureOnReport.restype = c_bool | 
|  |  | 
|  | self._dmLib.pychip_SetTestCommissionerPrematureCompleteAfter.argtypes = [ | 
|  | c_uint8] | 
|  | self._dmLib.pychip_SetTestCommissionerPrematureCompleteAfter.restype = c_bool | 
|  |  | 
|  | self._dmLib.pychip_GetCompletionError.argtypes = [] | 
|  | self._dmLib.pychip_GetCompletionError.restype = PyChipError | 
|  |  | 
|  | self._dmLib.pychip_GetCommissioningRCACData.argtypes = [ctypes.POINTER( | 
|  | ctypes.c_uint8), ctypes.POINTER(ctypes.c_size_t), ctypes.c_size_t] | 
|  | self._dmLib.pychip_GetCommissioningRCACData.restype = None | 
|  |  | 
|  | self._dmLib.pychip_DeviceController_IssueNOCChain.argtypes = [ | 
|  | c_void_p, py_object, c_char_p, c_size_t, c_uint64] | 
|  | self._dmLib.pychip_DeviceController_IssueNOCChain.restype = PyChipError | 
|  |  | 
|  | self._dmLib.pychip_OpCreds_InitGroupTestingData.argtypes = [ | 
|  | c_void_p] | 
|  | self._dmLib.pychip_OpCreds_InitGroupTestingData.restype = PyChipError | 
|  |  | 
|  | self._dmLib.pychip_DeviceController_SetIssueNOCChainCallbackPythonCallback.argtypes = [ | 
|  | _IssueNOCChainCallbackPythonCallbackFunct] | 
|  | self._dmLib.pychip_DeviceController_SetIssueNOCChainCallbackPythonCallback.restype = None | 
|  |  | 
|  | self._dmLib.pychip_DeviceController_GetNodeId.argtypes = [c_void_p, POINTER(c_uint64)] | 
|  | self._dmLib.pychip_DeviceController_GetNodeId.restype = PyChipError | 
|  |  | 
|  | self._dmLib.pychip_DeviceController_GetFabricId.argtypes = [c_void_p, POINTER(c_uint64)] | 
|  | self._dmLib.pychip_DeviceController_GetFabricId.restype = PyChipError | 
|  |  | 
|  | self._dmLib.pychip_DeviceController_GetFabricIndex.argtypes = [c_void_p, POINTER(c_uint8)] | 
|  | self._dmLib.pychip_DeviceController_GetFabricIndex.restype = PyChipError | 
|  |  | 
|  | self._dmLib.pychip_DeviceController_GetLogFilter = [None] | 
|  | self._dmLib.pychip_DeviceController_GetLogFilter = c_uint8 | 
|  |  | 
|  | self._dmLib.pychip_DeviceController_GetRootPublicKeyBytes.argtypes = [c_void_p, POINTER(c_uint8), POINTER(c_size_t)] | 
|  | self._dmLib.pychip_DeviceController_GetRootPublicKeyBytes.restype = PyChipError | 
|  |  | 
|  | self._dmLib.pychip_OpCreds_AllocateController.argtypes = [c_void_p, POINTER( | 
|  | c_void_p), POINTER(c_void_p), c_uint64, c_uint64, c_uint16, c_char_p, c_bool, c_bool, POINTER(c_uint32), c_uint32, c_void_p] | 
|  | self._dmLib.pychip_OpCreds_AllocateController.restype = PyChipError | 
|  |  | 
|  | self._dmLib.pychip_OpCreds_AllocateControllerForPythonCommissioningFLow.argtypes = [ | 
|  | POINTER(c_void_p), POINTER(c_void_p), c_void_p, POINTER(c_char), c_uint32, POINTER(c_char), c_uint32, POINTER(c_char), c_uint32, POINTER(c_char), c_uint32, c_uint16, c_bool] | 
|  | self._dmLib.pychip_OpCreds_AllocateControllerForPythonCommissioningFLow.restype = PyChipError | 
|  |  | 
|  | self._dmLib.pychip_DeviceController_SetIpk.argtypes = [c_void_p, POINTER(c_char), c_size_t] | 
|  | self._dmLib.pychip_DeviceController_SetIpk.restype = PyChipError | 
|  |  | 
|  | self._dmLib.pychip_CheckInDelegate_SetOnCheckInCompleteCallback.restype = None | 
|  | self._dmLib.pychip_CheckInDelegate_SetOnCheckInCompleteCallback.argtypes = [_OnCheckInCompleteFunct] | 
|  |  | 
|  | self._dmLib.pychip_CheckInDelegate_SetOnCheckInCompleteCallback(_OnCheckInComplete) | 
|  |  | 
|  | self._dmLib.pychip_DeviceProxy_GetRemoteSessionParameters.restype = PyChipError | 
|  | self._dmLib.pychip_DeviceProxy_GetRemoteSessionParameters.argtypes = [c_void_p, c_char_p] | 
|  |  | 
|  | self._dmLib.pychip_CreateManualCode.restype = PyChipError | 
|  | self._dmLib.pychip_CreateManualCode.argtypes = [c_uint16, c_uint32, c_char_p, c_size_t, POINTER(c_size_t)] | 
|  |  | 
|  | self._dmLib.pychip_DeviceController_SetSkipCommissioningComplete.restype = PyChipError | 
|  | self._dmLib.pychip_DeviceController_SetSkipCommissioningComplete.argtypes = [c_bool] | 
|  |  | 
|  | self._dmLib.pychip_DeviceController_SetTermsAcknowledgements.restype = PyChipError | 
|  | self._dmLib.pychip_DeviceController_SetTermsAcknowledgements.argtypes = [c_uint16, c_uint16] | 
|  |  | 
|  | self._dmLib.pychip_DeviceController_SetDACRevocationSetPath.restype = PyChipError | 
|  | self._dmLib.pychip_DeviceController_SetDACRevocationSetPath.argtypes = [c_char_p] | 
|  |  | 
|  |  | 
|  | class ChipDeviceController(ChipDeviceControllerBase): | 
|  | ''' | 
|  | The ChipDeviceCommissioner binding, named as ChipDeviceController | 
|  | ''' | 
|  |  | 
|  | def __init__(self, | 
|  | opCredsContext: ctypes.c_void_p, | 
|  | fabricId: int, | 
|  | nodeId: int, | 
|  | adminVendorId: int, | 
|  | fabricAdmin: FabricAdmin.FabricAdmin, | 
|  | catTags: typing.List[int] = [], | 
|  | paaTrustStorePath: str = "", | 
|  | useTestCommissioner: bool = False, | 
|  | name: str = '', | 
|  | keypair: typing.Optional[p256keypair.P256Keypair] = None): | 
|  | super().__init__( | 
|  | name or | 
|  | f"caIndex({fabricAdmin.caIndex:x})/fabricId(0x{fabricId:016X})/nodeId(0x{nodeId:016X})" | 
|  | ) | 
|  |  | 
|  | self._issue_node_chain_context: CallbackContext = CallbackContext(asyncio.Lock()) | 
|  | self._dmLib.pychip_DeviceController_SetIssueNOCChainCallbackPythonCallback(_IssueNOCChainCallbackPythonCallback) | 
|  |  | 
|  | pairingDelegate = c_void_p(None) | 
|  | devCtrl = c_void_p(None) | 
|  |  | 
|  | c_catTags = (c_uint32 * len(catTags))() | 
|  |  | 
|  | for i, item in enumerate(catTags): | 
|  | c_catTags[i] = item | 
|  |  | 
|  | # TODO(erjiaqing@): Figure out how to control enableServerInteractions for a single device controller (node) | 
|  | self._externalKeyPair = keypair | 
|  | self._ChipStack.Call( | 
|  | lambda: self._dmLib.pychip_OpCreds_AllocateController(cast( | 
|  | opCredsContext, c_void_p), pointer(devCtrl), pointer(pairingDelegate), fabricId, nodeId, adminVendorId, c_char_p(None if len(paaTrustStorePath) == 0 else str.encode(paaTrustStorePath)), useTestCommissioner, self._ChipStack.enableServerInteractions, c_catTags, len(catTags), None if keypair is None else keypair.native_object) | 
|  | ).raise_on_error() | 
|  |  | 
|  | self._fabricAdmin = fabricAdmin | 
|  | self._fabricId = fabricId | 
|  | self._nodeId = nodeId | 
|  | self._caIndex = fabricAdmin.caIndex | 
|  |  | 
|  | self._set_dev_ctrl(devCtrl=devCtrl, pairingDelegate=pairingDelegate) | 
|  |  | 
|  | self._finish_init() | 
|  |  | 
|  | assert self._fabricId == fabricId | 
|  | assert self._nodeId == nodeId | 
|  |  | 
|  | @property | 
|  | def caIndex(self) -> int: | 
|  | return self._caIndex | 
|  |  | 
|  | @property | 
|  | def fabricAdmin(self) -> typing.Optional[FabricAdmin.FabricAdmin]: | 
|  | return self._fabricAdmin | 
|  |  | 
|  | async def Commission(self, nodeid) -> int: | 
|  | ''' | 
|  | Start the auto-commissioning process on a node after establishing a PASE connection. | 
|  | This function is intended to be used in conjunction with one of the EstablishPASESession | 
|  | functions. It can be called either before or after the DevicePairingDelegate | 
|  | receives the OnPairingComplete call. Commissioners that want to perform simple | 
|  | auto-commissioning should use the supplied "CommissionWithCode" function, which will | 
|  | establish the PASE connection and commission automatically. | 
|  |  | 
|  | Args: | 
|  | nodeid (int): Node id of the device. | 
|  |  | 
|  | Raises: | 
|  | A ChipStackError on failure. | 
|  |  | 
|  | Returns: | 
|  | int: Effective Node ID of the device (as defined by the assigned NOC). | 
|  | ''' | 
|  | self.CheckIsActive() | 
|  |  | 
|  | async with self._commissioning_context as ctx: | 
|  | self._enablePairingCompleteCallback(False) | 
|  | await self._ChipStack.CallAsync( | 
|  | lambda: self._dmLib.pychip_DeviceController_Commission( | 
|  | self.devCtrl, nodeid) | 
|  | ) | 
|  |  | 
|  | return await asyncio.futures.wrap_future(ctx.future) | 
|  |  | 
|  | async def CommissionBleThread(self, discriminator, setupPinCode, nodeId, threadOperationalDataset: bytes, isShortDiscriminator: bool = False) -> int: | 
|  | ''' | 
|  | Commissions a Thread device over BLE. | 
|  |  | 
|  | Args: | 
|  | discriminator (int): The long discriminator for the DNS-SD advertisement. Valid range: 0-4095. | 
|  | setupPinCode (int): The setup pin code of the device. | 
|  | nodeId (int): Node id of the device. | 
|  | threadOperationalDataset (bytes): The Thread operational dataset for commissioning. | 
|  | isShortDiscriminator (bool): Short discriminator. | 
|  |  | 
|  | Returns: | 
|  | int: Effective Node ID of the device (as defined by the assigned NOC). | 
|  | ''' | 
|  | self.SetThreadOperationalDataset(threadOperationalDataset) | 
|  | return await self.ConnectBLE(discriminator, setupPinCode, nodeId, isShortDiscriminator) | 
|  |  | 
|  | async def CommissionNfcThread(self, discriminator, setupPinCode, nodeId, threadOperationalDataset: bytes) -> int: | 
|  | ''' | 
|  | Commissions a Thread device over NFC. | 
|  |  | 
|  | Args: | 
|  | discriminator (int): The long discriminator for the DNS-SD advertisement. Valid range: 0-4095. | 
|  | setupPinCode (int): The setup pin code of the device. | 
|  | nodeId (int): Node id of the device. | 
|  | threadOperationalDataset (bytes): The Thread operational dataset for commissioning. | 
|  |  | 
|  | Returns: | 
|  | int: Effective Node ID of the device (as defined by the assigned NOC). | 
|  | ''' | 
|  | self.SetThreadOperationalDataset(threadOperationalDataset) | 
|  | return await self.ConnectNFC(discriminator, setupPinCode, nodeId) | 
|  |  | 
|  | async def CommissionBleWiFi(self, discriminator, setupPinCode, nodeId, ssid: str, credentials: str, isShortDiscriminator: bool = False) -> int: | 
|  | ''' | 
|  | Commissions a Wi-Fi device over BLE. | 
|  |  | 
|  | Args: | 
|  | discriminator (int): The long discriminator for the DNS-SD advertisement. Valid range: 0-4095. | 
|  | setupPinCode (int): The setup pin code of the device. | 
|  | nodeId (int): Node id of the device. | 
|  | ssid (str): SSID of the WiFi  network. | 
|  | credentials (str): WiFi network password. | 
|  | isShortDiscriminator (bool): Short discriminator. | 
|  |  | 
|  | Returns: | 
|  | int: Effective Node ID of the device (as defined by the assigned NOC). | 
|  | ''' | 
|  | self.SetWiFiCredentials(ssid, credentials) | 
|  | return await self.ConnectBLE(discriminator, setupPinCode, nodeId, isShortDiscriminator) | 
|  |  | 
|  | def SetWiFiCredentials(self, ssid: str, credentials: str): | 
|  | ''' | 
|  | Set the Wi-Fi credentials to set during commissioning. | 
|  |  | 
|  | Args: | 
|  | ssid (str): SSID of the WiFi  network. | 
|  | credentials (str): WiFi network password. | 
|  |  | 
|  | Raises: | 
|  | ChipStackError: On failure. | 
|  | ''' | 
|  | self.CheckIsActive() | 
|  |  | 
|  | self._ChipStack.Call( | 
|  | lambda: self._dmLib.pychip_DeviceController_SetWiFiCredentials( | 
|  | ssid.encode("utf-8"), credentials.encode("utf-8")) | 
|  | ).raise_on_error() | 
|  |  | 
|  | def SetThreadOperationalDataset(self, threadOperationalDataset): | 
|  | ''' | 
|  | Set the Thread operational dataset to set during commissioning. | 
|  |  | 
|  | Args: | 
|  | threadOperationalDataset (bytes): The Thread operational dataset for commissioning. | 
|  |  | 
|  | Raises: | 
|  | ChipStackError: On failure. | 
|  | ''' | 
|  | self.CheckIsActive() | 
|  |  | 
|  | self._ChipStack.Call( | 
|  | lambda: self._dmLib.pychip_DeviceController_SetThreadOperationalDataset( | 
|  | threadOperationalDataset, len(threadOperationalDataset)) | 
|  | ).raise_on_error() | 
|  |  | 
|  | def ResetCommissioningParameters(self): | 
|  | ''' | 
|  | Sets the commissioning parameters back to the default values. | 
|  |  | 
|  | Raises: | 
|  | ChipStackError: On failure. | 
|  | ''' | 
|  | self.CheckIsActive() | 
|  | self._ChipStack.Call( | 
|  | lambda: self._dmLib.pychip_DeviceController_ResetCommissioningParameters() | 
|  | ).raise_on_error() | 
|  |  | 
|  | def SetTimeZone(self, offset: int, validAt: int, name: str = ""): | 
|  | ''' | 
|  | Set the time zone to set during commissioning. Currently only one time zone entry is supported. | 
|  |  | 
|  | Args: | 
|  | offset (int): Timezone offset. | 
|  | validAt (int): Timestamp of the timezone. | 
|  | name (str):  Name or label of the timezone. | 
|  |  | 
|  | Raises: | 
|  | ChipStackError: On failure. | 
|  | ''' | 
|  | self.CheckIsActive() | 
|  | self._ChipStack.Call( | 
|  | lambda: self._dmLib.pychip_DeviceController_SetTimeZone(offset, validAt, name.encode("utf-8")) | 
|  | ).raise_on_error() | 
|  |  | 
|  | def SetDSTOffset(self, offset: int, validStarting: int, validUntil: int): | 
|  | ''' | 
|  | Set the DST offset to set during commissioning. Currently only one DST entry is supported. | 
|  |  | 
|  | Args: | 
|  | offset (int): Timezone offset. | 
|  | validStarting (int): The start timestamp. | 
|  | validUntil (int): The end timestamp | 
|  |  | 
|  | Raises: | 
|  | ChipStackError: On failure. | 
|  | ''' | 
|  | self.CheckIsActive() | 
|  | self._ChipStack.Call( | 
|  | lambda: self._dmLib.pychip_DeviceController_SetDSTOffset(offset, validStarting, validUntil) | 
|  | ).raise_on_error() | 
|  |  | 
|  | def SetTCAcknowledgements(self, tcAcceptedVersion: int, tcUserResponse: int): | 
|  | ''' | 
|  | Set the TC acknowledgements to set during commissioning. | 
|  |  | 
|  | Args: | 
|  | tcAcceptedVersion (int): TC accepted version. | 
|  | tcUserResponse (int):  TC user responde. | 
|  |  | 
|  | Raises: | 
|  | ChipStackError: On failure. | 
|  | ''' | 
|  | self.CheckIsActive() | 
|  | self._ChipStack.Call( | 
|  | lambda: self._dmLib.pychip_DeviceController_SetTermsAcknowledgements(tcAcceptedVersion, tcUserResponse) | 
|  | ).raise_on_error() | 
|  |  | 
|  | def SetSkipCommissioningComplete(self, skipCommissioningComplete: bool): | 
|  | ''' | 
|  | Set whether to skip the commissioning complete callback. | 
|  |  | 
|  | Args: | 
|  | skipCommissioningComplete (bool): The value skip the commissioning complete. | 
|  |  | 
|  | Raises: | 
|  | ChipStackError: On failure. | 
|  | ''' | 
|  | self.CheckIsActive() | 
|  | self._ChipStack.Call( | 
|  | lambda: self._dmLib.pychip_DeviceController_SetSkipCommissioningComplete(skipCommissioningComplete) | 
|  | ).raise_on_error() | 
|  |  | 
|  | def SetDefaultNTP(self, defaultNTP: str): | 
|  | ''' | 
|  | Set the DefaultNTP to set during commissioning. | 
|  |  | 
|  | Args: | 
|  | defaultNTP (str): The default NTP. | 
|  |  | 
|  | Raises: | 
|  | ChipStackError: On failure. | 
|  | ''' | 
|  | self.CheckIsActive() | 
|  | self._ChipStack.Call( | 
|  | lambda: self._dmLib.pychip_DeviceController_SetDefaultNtp(defaultNTP.encode("utf-8")) | 
|  | ).raise_on_error() | 
|  |  | 
|  | def SetTrustedTimeSource(self, nodeId: int, endpoint: int): | 
|  | ''' | 
|  | Set the trusted time source nodeId to set during commissioning. This must be a node on the commissioner fabric. | 
|  |  | 
|  | Args: | 
|  | nodeId (int): Node id of the device. | 
|  | endpoint (int): endpoint of the device. | 
|  |  | 
|  | Raises: | 
|  | ChipStackError: On failure. | 
|  | ''' | 
|  | self.CheckIsActive() | 
|  | self._ChipStack.Call( | 
|  | lambda: self._dmLib.pychip_DeviceController_SetTrustedTimeSource(nodeId, endpoint) | 
|  | ).raise_on_error() | 
|  |  | 
|  | def SetCheckMatchingFabric(self, check: bool): | 
|  | ''' | 
|  | Instructs the auto-commissioner to perform a matching fabric check before commissioning. | 
|  |  | 
|  | Args: | 
|  | check (bool): Validation fabric before commissioning. | 
|  |  | 
|  | Raises: | 
|  | ChipStackError: On failure. | 
|  | ''' | 
|  | self.CheckIsActive() | 
|  | self._ChipStack.Call( | 
|  | lambda: self._dmLib.pychip_DeviceController_SetCheckMatchingFabric(check) | 
|  | ).raise_on_error() | 
|  |  | 
|  | def GenerateICDRegistrationParameters(self): | 
|  | ''' | 
|  | Generates ICD registration parameters for this controller. | 
|  |  | 
|  | Returns: | 
|  | ICDRegistrationParameters: An object containing the generated parameters | 
|  | including symmetricKey, checkInNodeId, monitoredSubject, stayActiveMs, | 
|  | and clientType. | 
|  | ''' | 
|  | return ICDRegistrationParameters( | 
|  | secrets.token_bytes(16), | 
|  | self._nodeId, | 
|  | self._nodeId, | 
|  | 30, | 
|  | Clusters.IcdManagement.Enums.ClientTypeEnum.kPermanent) | 
|  |  | 
|  | def EnableICDRegistration(self, parameters: ICDRegistrationParameters): | 
|  | ''' Enables ICD registration for the following commissioning session. | 
|  |  | 
|  | Args: | 
|  | parameters: A ICDRegistrationParameters for the parameters used for ICD registration, or None for default arguments. | 
|  |  | 
|  | Raises: | 
|  | ChipStackError: On failure. | 
|  | ''' | 
|  | if parameters is None: | 
|  | raise ValueError("ICD registration parameter required.") | 
|  | if parameters.symmetricKey is None or len(parameters.symmetricKey) != 16: | 
|  | raise ValueError("symmetricKey should be 16 bytes") | 
|  |  | 
|  | self.CheckIsActive() | 
|  | self._ChipStack.Call( | 
|  | lambda: self._dmLib.pychip_DeviceController_SetIcdRegistrationParameters( | 
|  | True, pointer(parameters.to_c())) | 
|  | ).raise_on_error() | 
|  |  | 
|  | def DisableICDRegistration(self): | 
|  | ''' | 
|  | Disables ICD registration. | 
|  |  | 
|  | Raises: | 
|  | ChipStackError: On failure. | 
|  | ''' | 
|  | self.CheckIsActive() | 
|  | self._ChipStack.Call( | 
|  | lambda: self._dmLib.pychip_DeviceController_SetIcdRegistrationParameters(False, None) | 
|  | ).raise_on_error() | 
|  |  | 
|  | def GetFabricCheckResult(self) -> int: | 
|  | ''' | 
|  | Returns the fabric check result if SetCheckMatchingFabric was used. | 
|  |  | 
|  | Returns: | 
|  | int: The fabric check result, or `-1` if no check was performed. | 
|  | ''' | 
|  | return self._fabricCheckNodeId | 
|  |  | 
|  | async def CommissionOnNetwork(self, nodeId: int, setupPinCode: int, | 
|  | filterType: DiscoveryFilterType = DiscoveryFilterType.NONE, | 
|  | filter: typing.Any = None, | 
|  | discoveryTimeoutMsec: int = 30000) -> int: | 
|  | ''' | 
|  | Does the routine for OnNetworkCommissioning, with a filter for mDNS discovery. | 
|  | Supported filters are: | 
|  |  | 
|  | DiscoveryFilterType.NONE | 
|  | DiscoveryFilterType.SHORT_DISCRIMINATOR | 
|  | DiscoveryFilterType.LONG_DISCRIMINATOR | 
|  | DiscoveryFilterType.VENDOR_ID | 
|  | DiscoveryFilterType.DEVICE_TYPE | 
|  | DiscoveryFilterType.COMMISSIONING_MODE | 
|  | DiscoveryFilterType.INSTANCE_NAME | 
|  | DiscoveryFilterType.COMMISSIONER | 
|  | DiscoveryFilterType.COMPRESSED_FABRIC_ID | 
|  |  | 
|  | The filter can be an integer, a string or None depending on the actual type of selected filter. | 
|  |  | 
|  | Raises: | 
|  | ChipStackError: On failure. | 
|  |  | 
|  | Returns: | 
|  | - Effective Node ID of the device (as defined by the assigned NOC) | 
|  | ''' | 
|  | self.CheckIsActive() | 
|  |  | 
|  | # Convert numerical filters to string for passing down to binding. | 
|  | if isinstance(filter, int): | 
|  | filter = str(filter) | 
|  |  | 
|  | async with self._commissioning_context as ctx: | 
|  | self._enablePairingCompleteCallback(True) | 
|  | await self._ChipStack.CallAsync( | 
|  | lambda: self._dmLib.pychip_DeviceController_OnNetworkCommission( | 
|  | self.devCtrl, self.pairingDelegate, nodeId, setupPinCode, int(filterType), str(filter).encode("utf-8") if filter is not None else None, discoveryTimeoutMsec) | 
|  | ) | 
|  |  | 
|  | return await asyncio.futures.wrap_future(ctx.future) | 
|  |  | 
|  | def get_rcac(self): | 
|  | ''' | 
|  | Passes captured RCAC data back to Python test modules for validation | 
|  | - Setting buffer size to max size mentioned in spec: | 
|  | - Ref: https://github.com/CHIP-Specifications/connectedhomeip-spec/blob/06c4d55962954546ecf093c221fe1dab57645028/policies/matter_certificate_policy.adoc#615-key-sizes | 
|  |  | 
|  | Returns: | 
|  | bytes: A bytes sentence representing the RCAC, or None if no data. | 
|  | ''' | 
|  | rcac_size = 400 | 
|  | rcac_buffer = (ctypes.c_uint8 * rcac_size)()  # Allocate buffer | 
|  |  | 
|  | actual_rcac_size = ctypes.c_size_t() | 
|  |  | 
|  | # Now calling the C++ function with the buffer size set as an additional parameter | 
|  | self._dmLib.pychip_GetCommissioningRCACData( | 
|  | ctypes.cast(rcac_buffer, ctypes.POINTER(ctypes.c_uint8)), | 
|  | ctypes.byref(actual_rcac_size), | 
|  | ctypes.c_size_t(rcac_size)  # Pass the buffer size | 
|  | ) | 
|  |  | 
|  | # Check if data is available | 
|  | if actual_rcac_size.value > 0: | 
|  | # Convert the data to a Python bytes object | 
|  | rcac_data = bytearray(rcac_buffer[:actual_rcac_size.value]) | 
|  | rcac_bytes = bytes(rcac_data) | 
|  | else: | 
|  | LOGGER.exception("RCAC returned from C++ did not contain any data") | 
|  | return None | 
|  | return rcac_bytes | 
|  |  | 
|  | async def CommissionWithCode(self, setupPayload: str, nodeid: int, discoveryType: DiscoveryType = DiscoveryType.DISCOVERY_ALL) -> int: | 
|  | ''' | 
|  | Commission with the given nodeid from the setupPayload. | 
|  | setupPayload may be a QR or manual code. | 
|  |  | 
|  | Args: | 
|  | setupPayload (str): The setup payload (QR or manual code). | 
|  | nodeid (int): Node id of the device. | 
|  | discoveryType (DiscoveryType.DISCOVERY_ALL): The discovery type to use. | 
|  |  | 
|  | Raises: | 
|  | ChipStackError: On failure. | 
|  |  | 
|  | Returns: | 
|  | int: Effective Node ID of the device (as defined by the assigned NOC) | 
|  | ''' | 
|  | self.CheckIsActive() | 
|  |  | 
|  | async with self._commissioning_context as ctx: | 
|  | self._enablePairingCompleteCallback(True) | 
|  | await self._ChipStack.CallAsync( | 
|  | lambda: self._dmLib.pychip_DeviceController_ConnectWithCode( | 
|  | self.devCtrl, setupPayload.encode("utf-8"), nodeid, discoveryType.value) | 
|  | ) | 
|  |  | 
|  | return await asyncio.futures.wrap_future(ctx.future) | 
|  |  | 
|  | def NOCChainCallback(self, nocChain): | 
|  | ''' | 
|  | Callback function for handling the NOC chain result. | 
|  |  | 
|  | Args: | 
|  | nocChain (nocChain): The object NOC chain data received. | 
|  |  | 
|  | Returns: | 
|  | None | 
|  | ''' | 
|  | if self._issue_node_chain_context.future is None: | 
|  | LOGGER.exception("NOCChainCallback while not expecting a callback") | 
|  | return | 
|  | self._issue_node_chain_context.future.set_result(nocChain) | 
|  | return | 
|  |  | 
|  | async def IssueNOCChain(self, csr: Clusters.OperationalCredentials.Commands.CSRResponse, nodeId: int): | 
|  | ''' | 
|  | Issue an NOC chain using the associated OperationalCredentialsDelegate. | 
|  | The NOC chain will be provided in TLV cert format. | 
|  |  | 
|  | Args: | 
|  | crs (cluster): Certificate Signing Request response | 
|  | nodeId (int): Node id of the device. | 
|  |  | 
|  | Returns: | 
|  | asyncio.Future: A future object that is the result of the NOC Chain operation. | 
|  | ''' | 
|  | self.CheckIsActive() | 
|  |  | 
|  | async with self._issue_node_chain_context as ctx: | 
|  | await self._ChipStack.CallAsync( | 
|  | lambda: self._dmLib.pychip_DeviceController_IssueNOCChain( | 
|  | self.devCtrl, py_object(self), csr.NOCSRElements, len(csr.NOCSRElements), nodeId) | 
|  | ) | 
|  |  | 
|  | return await asyncio.futures.wrap_future(ctx.future) | 
|  |  | 
|  | def SetDACRevocationSetPath(self, dacRevocationSetPath: typing.Optional[str]): | 
|  | ''' | 
|  | Set the path to the device attestation revocation set JSON file. | 
|  |  | 
|  | Args: | 
|  | dacRevocationSetPath (Optional[str]): Path to the JSON file containing the device attestation revocation set. | 
|  |  | 
|  | Raises: | 
|  | ChipStackError: On failure. | 
|  | ''' | 
|  | self.CheckIsActive() | 
|  | self._ChipStack.Call( | 
|  | lambda: self._dmLib.pychip_DeviceController_SetDACRevocationSetPath( | 
|  | c_char_p(str.encode(dacRevocationSetPath) if dacRevocationSetPath else ""))  # type: ignore[arg-type] | 
|  | ).raise_on_error() | 
|  |  | 
|  |  | 
|  | class BareChipDeviceController(ChipDeviceControllerBase): | 
|  | ''' | 
|  | A bare device controller without AutoCommissioner support. | 
|  | ''' | 
|  |  | 
|  | def __init__(self, operationalKey: p256keypair.P256Keypair, noc: bytes, | 
|  | icac: typing.Union[bytes, None], rcac: bytes, ipk: typing.Union[bytes, None], adminVendorId: int, name: typing.Optional[str] = None): | 
|  | ''' | 
|  | Creates a controller without AutoCommissioner. | 
|  |  | 
|  | The allocated controller uses the noc, icac, rcac and ipk instead of the default, | 
|  | random generated certificates / keys. Which is suitable for creating a controller | 
|  | for manually signing certificates for testing. | 
|  |  | 
|  | Args: | 
|  | operationalKey: A P256Keypair object for the operational key of the controller. | 
|  | noc (bytes): The NOC for the controller, in bytes. | 
|  | icac (Optional[bytes]): The optional ICAC for the controller. | 
|  | rcac (bytes): The RCAC for the controller. | 
|  | ipk (Optional[bytes]): The optional IPK for the controller, when None is provided, the defaultIpk will be used. | 
|  | adminVendorId (int): The adminVendorId of the controller. | 
|  | name (str): The name of the controller, for debugging use only. | 
|  |  | 
|  | Raises: | 
|  | ChipStackError: On failure | 
|  | ''' | 
|  | super().__init__(name or f"ctrl(v/{adminVendorId})") | 
|  |  | 
|  | pairingDelegate = c_void_p(None) | 
|  | devCtrl = c_void_p(None) | 
|  |  | 
|  | # Device should hold a reference to the key to avoid it being GC-ed. | 
|  | self._externalKeyPair = operationalKey | 
|  | nativeKey = operationalKey._create_native_object() | 
|  |  | 
|  | self._ChipStack.Call( | 
|  | lambda: self._dmLib.pychip_OpCreds_AllocateControllerForPythonCommissioningFLow( | 
|  | cast(devCtrl, c_void_p), cast(pairingDelegate, c_void_p), nativeKey, noc, len(noc), icac, len(icac) if icac else 0, rcac, len(rcac), ipk, len(ipk) if ipk else 0, adminVendorId, self._ChipStack.enableServerInteractions) | 
|  | ).raise_on_error() | 
|  |  | 
|  | self._set_dev_ctrl(devCtrl, pairingDelegate) | 
|  |  | 
|  | self._finish_init() |