[Python] Align the return values of all commission interfaces (#27064)
* [Python] Align the return values of all commission interfaces
* Restyled by autopep8
* fix CI
* Restyled by autopep8
* [Python] Commission return value alignment as PyChipError
* assign Commission return value
* restyled by autopep8 & defined CHIP_ERROR_TIMEOUT instead of 50
---------
Co-authored-by: Restyled.io <commits@restyled.io>
diff --git a/src/controller/python/chip/ChipDeviceCtrl.py b/src/controller/python/chip/ChipDeviceCtrl.py
index d9f3eab..3cf99f6 100644
--- a/src/controller/python/chip/ChipDeviceCtrl.py
+++ b/src/controller/python/chip/ChipDeviceCtrl.py
@@ -59,6 +59,9 @@
__all__ = ["ChipDeviceController"]
+# Defined in $CHIP_ROOT/src/lib/core/CHIPError.h
+CHIP_ERROR_TIMEOUT: int = 50
+
_DevicePairingDelegate_OnPairingCompleteFunct = CFUNCTYPE(None, PyChipError)
_DeviceUnpairingCompleteFunct = CFUNCTYPE(None, c_uint64, PyChipError)
_DevicePairingDelegate_OnCommissioningCompleteFunct = CFUNCTYPE(
@@ -89,7 +92,8 @@
@_IssueNOCChainCallbackPythonCallbackFunct
-def _IssueNOCChainCallbackPythonCallback(devCtrl, status: PyChipError, noc: c_void_p, nocLen: int, icac: c_void_p, icacLen: int, rcac: c_void_p, rcacLen: int, ipk: c_void_p, ipkLen: int, adminSubject: int):
+def _IssueNOCChainCallbackPythonCallback(devCtrl, status: PyChipError, noc: c_void_p, nocLen: int, icac: c_void_p,
+ icacLen: int, rcac: c_void_p, rcacLen: int, ipk: c_void_p, ipkLen: int, adminSubject: int):
nocChain = NOCChain(None, None, None, None, 0)
if status.is_success:
nocBytes = None
@@ -135,13 +139,13 @@
def SetDeviceController(self, devCtrl: 'ChipDeviceController'):
self._devCtrl = devCtrl
- def Commission(self, nodeId: int, setupPinCode: int):
+ def Commission(self, nodeId: int, setupPinCode: int) -> PyChipError:
''' Commission the device using the device controller discovered this device.
nodeId: The nodeId commissioned to the device
setupPinCode: The setup pin code of the device
'''
- self._devCtrl.CommissionOnNetwork(
+ return self._devCtrl.CommissionOnNetwork(
nodeId, setupPinCode, filterType=discovery.FilterType.INSTANCE_NAME, filter=self.instanceName)
def __rich_repr__(self):
@@ -354,7 +358,7 @@
C++ constructor instance in the SDK.
'''
if (self._isActive):
- if self.devCtrl != None:
+ if self.devCtrl is not None:
self._ChipStack.Call(
lambda: self._dmLib.pychip_DeviceController_DeleteDeviceController(
self.devCtrl)
@@ -399,7 +403,7 @@
self.devCtrl)
)
- def ConnectBLE(self, discriminator, setupPinCode, nodeid):
+ def ConnectBLE(self, discriminator, setupPinCode, nodeid) -> PyChipError:
self.CheckIsActive()
self._ChipStack.commissioningCompleteEvent.clear()
@@ -411,8 +415,8 @@
).raise_on_error()
if not self._ChipStack.commissioningCompleteEvent.isSet():
# Error 50 is a timeout
- return False
- return self._ChipStack.commissioningEventRes.is_success
+ return PyChipError(CHIP_ERROR_TIMEOUT)
+ return self._ChipStack.commissioningEventRes
def UnpairDevice(self, nodeid: int):
self.CheckIsActive()
@@ -521,7 +525,8 @@
return (address.value.decode(), port.value) if error == 0 else None
- def DiscoverCommissionableNodes(self, filterType: discovery.FilterType = discovery.FilterType.NONE, filter: typing.Any = None, stopOnFirst: bool = False, timeoutSecond: int = 5) -> typing.Union[None, CommissionableNode, typing.List[CommissionableNode]]:
+ def DiscoverCommissionableNodes(self, filterType: discovery.FilterType = discovery.FilterType.NONE, filter: typing.Any = None,
+ stopOnFirst: bool = False, timeoutSecond: int = 5) -> typing.Union[None, CommissionableNode, typing.List[CommissionableNode]]:
''' Discover commissionable nodes via DNS-SD with specified filters.
Supported filters are:
@@ -550,7 +555,8 @@
if stopOnFirst:
target = time.time() + timeoutSecond
while time.time() < target:
- if self._ChipStack.Call(lambda: self._dmLib.pychip_DeviceController_HasDiscoveredCommissionableNode(self.devCtrl)):
+ if self._ChipStack.Call(
+ lambda: self._dmLib.pychip_DeviceController_HasDiscoveredCommissionableNode(self.devCtrl)):
break
time.sleep(0.1)
else:
@@ -831,7 +837,8 @@
future, eventLoop, device.deviceProxy, attrs, timedRequestTimeoutMs=timedRequestTimeoutMs, interactionTimeoutMs=interactionTimeoutMs, busyWaitMs=busyWaitMs).raise_on_error()
return await future
- def WriteGroupAttribute(self, groupid: int, attributes: typing.List[typing.Tuple[ClusterObjects.ClusterAttributeDescriptor, int]], busyWaitMs: typing.Union[None, int] = None):
+ def WriteGroupAttribute(
+ self, groupid: int, attributes: typing.List[typing.Tuple[ClusterObjects.ClusterAttributeDescriptor, int]], busyWaitMs: typing.Union[None, int] = None):
'''
Write a list of attributes on a target group.
@@ -878,8 +885,8 @@
if pathTuple == ('*') or pathTuple == ():
# Wildcard
pass
- elif type(pathTuple) is not tuple:
- if type(pathTuple) is int:
+ elif not isinstance(pathTuple, tuple):
+ if isinstance(pathTuple, int):
endpoint = pathTuple
elif issubclass(pathTuple, ClusterObjects.Cluster):
cluster = pathTuple
@@ -934,9 +941,9 @@
if pathTuple in [('*'), ()]:
# Wildcard
pass
- elif type(pathTuple) is not tuple:
+ elif not isinstance(pathTuple, tuple):
print(type(pathTuple))
- if type(pathTuple) is int:
+ if isinstance(pathTuple, int):
endpoint = pathTuple
elif issubclass(pathTuple, ClusterObjects.Cluster):
cluster = pathTuple
@@ -1133,7 +1140,7 @@
try:
req = eval(
f"GeneratedObjects.{cluster}.Commands.{command}")(**args)
- except:
+ except BaseException:
raise UnknownCommand(cluster, command)
try:
res = asyncio.run(self.SendCommand(nodeid, endpoint, req))
@@ -1151,21 +1158,22 @@
try:
attributeType = eval(
f"GeneratedObjects.{cluster}.Attributes.{attribute}")
- except:
+ except BaseException:
raise UnknownAttribute(cluster, attribute)
result = asyncio.run(self.ReadAttribute(
nodeid, [(endpoint, attributeType)]))
path = ClusterAttribute.AttributePath(
EndpointId=endpoint, Attribute=attributeType)
- return im.AttributeReadResult(path=im.AttributePath(nodeId=nodeid, endpointId=path.EndpointId, clusterId=path.ClusterId, attributeId=path.AttributeId), status=0, value=result[endpoint][clusterType][attributeType], dataVersion=result[endpoint][clusterType][ClusterAttribute.DataVersion])
+ return im.AttributeReadResult(path=im.AttributePath(nodeId=nodeid, endpointId=path.EndpointId, clusterId=path.ClusterId, attributeId=path.AttributeId),
+ status=0, value=result[endpoint][clusterType][attributeType], dataVersion=result[endpoint][clusterType][ClusterAttribute.DataVersion])
def ZCLWriteAttribute(self, cluster: str, attribute: str, nodeid, endpoint, groupid, value, dataVersion=0, blocking=True):
req = None
try:
req = eval(
f"GeneratedObjects.{cluster}.Attributes.{attribute}")(value)
- except:
+ except BaseException:
raise UnknownAttribute(cluster, attribute)
return asyncio.run(self.WriteAttribute(nodeid, [(endpoint, req, dataVersion)]))
@@ -1176,7 +1184,7 @@
req = None
try:
req = eval(f"GeneratedObjects.{cluster}.Attributes.{attribute}")
- except:
+ except BaseException:
raise UnknownAttribute(cluster, attribute)
return asyncio.run(self.ReadAttribute(nodeid, [(endpoint, req)], None, False, reportInterval=(minInterval, maxInterval)))
@@ -1430,7 +1438,8 @@
TODO: This class contains DEPRECATED functions, we should update the test scripts to avoid the usage of those functions.
'''
- def __init__(self, opCredsContext: ctypes.c_void_p, fabricId: int, nodeId: int, adminVendorId: int, catTags: typing.List[int] = [], paaTrustStorePath: str = "", useTestCommissioner: bool = False, fabricAdmin: FabricAdmin = None, name: str = None, keypair: p256keypair.P256Keypair = None):
+ def __init__(self, opCredsContext: ctypes.c_void_p, fabricId: int, nodeId: int, adminVendorId: int, catTags: typing.List[int] = [
+ ], paaTrustStorePath: str = "", useTestCommissioner: bool = False, fabricAdmin: FabricAdmin = None, name: str = None, keypair: p256keypair.P256Keypair = None):
super().__init__(
name or
f"caIndex({fabricAdmin.caIndex:x})/fabricId(0x{fabricId:016X})/nodeId(0x{nodeId:016X})"
@@ -1472,7 +1481,7 @@
def fabricAdmin(self) -> FabricAdmin:
return self._fabricAdmin
- def Commission(self, nodeid) -> bool:
+ def Commission(self, nodeid) -> PyChipError:
'''
Start the auto-commissioning process on a node after establishing a PASE connection.
This function is intended to be used in conjunction with `EstablishPASESessionBLE` or
@@ -1492,15 +1501,18 @@
lambda: self._dmLib.pychip_DeviceController_Commission(
self.devCtrl, nodeid)
)
- return (self._ChipStack.commissioningCompleteEvent.isSet() and (self._ChipStack.commissioningEventRes == 0))
+ if not self._ChipStack.commissioningCompleteEvent.isSet():
+ # Error 50 is a timeout
+ return PyChipError(CHIP_ERROR_TIMEOUT)
+ return self._ChipStack.commissioningEventRes
- def CommissionThread(self, discriminator, setupPinCode, nodeId, threadOperationalDataset: bytes):
+ def CommissionThread(self, discriminator, setupPinCode, nodeId, threadOperationalDataset: bytes) -> PyChipError:
''' Commissions a Thread device over BLE
'''
self.SetThreadOperationalDataset(threadOperationalDataset)
return self.ConnectBLE(discriminator, setupPinCode, nodeId)
- def CommissionWiFi(self, discriminator, setupPinCode, nodeId, ssid: str, credentials: str):
+ def CommissionWiFi(self, discriminator, setupPinCode, nodeId, ssid: str, credentials: str) -> PyChipError:
''' Commissions a WiFi device over BLE
'''
self.SetWiFiCredentials(ssid, credentials)
@@ -1522,7 +1534,8 @@
threadOperationalDataset, len(threadOperationalDataset))
).raise_on_error()
- def CommissionOnNetwork(self, nodeId: int, setupPinCode: int, filterType: DiscoveryFilterType = DiscoveryFilterType.NONE, filter: typing.Any = None):
+ def CommissionOnNetwork(self, nodeId: int, setupPinCode: int,
+ filterType: DiscoveryFilterType = DiscoveryFilterType.NONE, filter: typing.Any = None) -> PyChipError:
'''
Does the routine for OnNetworkCommissioning, with a filter for mDNS discovery.
Supported filters are:
@@ -1556,10 +1569,11 @@
self.devCtrl, nodeId, setupPinCode, int(filterType), str(filter).encode("utf-8") + b"\x00" if filter is not None else None)
)
if not self._ChipStack.commissioningCompleteEvent.isSet():
- return False, -1
- return self._ChipStack.commissioningEventRes == 0, self._ChipStack.commissioningEventRes
+ # Error 50 is a timeout
+ return PyChipError(CHIP_ERROR_TIMEOUT)
+ return self._ChipStack.commissioningEventRes
- def CommissionWithCode(self, setupPayload: str, nodeid: int):
+ def CommissionWithCode(self, setupPayload: str, nodeid: int) -> PyChipError:
self.CheckIsActive()
setupPayload = setupPayload.encode() + b'\0'
@@ -1575,10 +1589,11 @@
self.devCtrl, setupPayload, nodeid)
)
if not self._ChipStack.commissioningCompleteEvent.isSet():
- return False
- return self._ChipStack.commissioningEventRes == 0
+ # Error 50 is a timeout
+ return PyChipError(CHIP_ERROR_TIMEOUT)
+ return self._ChipStack.commissioningEventRes
- def CommissionIP(self, ipaddr: str, setupPinCode: int, nodeid: int):
+ def CommissionIP(self, ipaddr: str, setupPinCode: int, nodeid: int) -> PyChipError:
""" DEPRECATED, DO NOT USE! Use `CommissionOnNetwork` or `CommissionWithCode` """
self.CheckIsActive()
@@ -1593,8 +1608,9 @@
self.devCtrl, ipaddr.encode("utf-8"), setupPinCode, nodeid)
)
if not self._ChipStack.commissioningCompleteEvent.isSet():
- return False
- return self._ChipStack.commissioningEventRes == 0
+ # Error 50 is a timeout
+ return PyChipError(CHIP_ERROR_TIMEOUT)
+ return self._ChipStack.commissioningEventRes
def IssueNOCChain(self, csr: Clusters.OperationalCredentials.Commands.CSRResponse, nodeId: int):
"""Issue an NOC chain using the associated OperationalCredentialsDelegate.
@@ -1611,7 +1627,8 @@
''' A bare device controller without AutoCommissioner support.
'''
- def __init__(self, operationalKey: p256keypair.P256Keypair, noc: bytes, icac: typing.Union[bytes, None], rcac: bytes, ipk: typing.Union[bytes, None], adminVendorId: int, name: str = None):
+ def __init__(self, operationalKey: p256keypair.P256Keypair, noc: bytes,
+ icac: typing.Union[bytes, None], rcac: bytes, ipk: typing.Union[bytes, None], adminVendorId: int, name: str = None):
'''Creates a controller without autocommissioner.
The allocated controller uses the noc, icac, rcac and ipk instead of the default,
diff --git a/src/controller/python/chip/native/__init__.py b/src/controller/python/chip/native/__init__.py
index e838212..26e394e 100644
--- a/src/controller/python/chip/native/__init__.py
+++ b/src/controller/python/chip/native/__init__.py
@@ -103,6 +103,9 @@
GetLibraryHandle().pychip_FormatError(ctypes.pointer(self), buf, 256)
return buf.value.decode()
+ def __bool__(self):
+ return self.is_success
+
def __eq__(self, other):
if isinstance(other, int):
return self.code == other
diff --git a/src/python_testing/TC_CGEN_2_4.py b/src/python_testing/TC_CGEN_2_4.py
index 682e4b0..56db66b 100644
--- a/src/python_testing/TC_CGEN_2_4.py
+++ b/src/python_testing/TC_CGEN_2_4.py
@@ -49,11 +49,11 @@
# This will run the commissioning up to the point where stage x is run and the
# response is sent before the test commissioner simulates a failure
self.th2.SetTestCommissionerPrematureCompleteAfter(stage)
- success, errcode = self.th2.CommissionOnNetwork(
+ errcode = self.th2.CommissionOnNetwork(
nodeId=self.dut_node_id, setupPinCode=pin,
filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, filter=self.matter_test_config.discriminators[0])
- logging.info('Commissioning complete done. Successful? {}, errorcode = {}'.format(success, errcode))
- asserts.assert_false(success, 'Commissioning complete did not error as expected')
+ logging.info('Commissioning complete done. Successful? {}, errorcode = {}'.format(errcode.is_success, errcode))
+ asserts.assert_false(errcode.is_success, 'Commissioning complete did not error as expected')
asserts.assert_true(errcode.sdk_part == expectedErrorPart, 'Unexpected error type returned from CommissioningComplete')
asserts.assert_true(errcode.sdk_code == expectedErrCode, 'Unexpected error code returned from CommissioningComplete')
revokeCmd = Clusters.AdministratorCommissioning.Commands.RevokeCommissioning()
@@ -89,10 +89,10 @@
logging.info('Step 16 - TH2 fully commissions the DUT')
self.th2.ResetTestCommissioner()
- success, errcode = self.th2.CommissionOnNetwork(
+ errcode = self.th2.CommissionOnNetwork(
nodeId=self.dut_node_id, setupPinCode=pin,
filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, filter=self.matter_test_config.discriminators[0])
- logging.info('Commissioning complete done. Successful? {}, errorcode = {}'.format(success, errcode))
+ logging.info('Commissioning complete done. Successful? {}, errorcode = {}'.format(errcode.is_success, errcode))
logging.info('Step 17 - TH1 sends an arm failsafe')
cmd = Clusters.GeneralCommissioning.Commands.ArmFailSafe(expiryLengthSeconds=900, breadcrumb=0)
diff --git a/src/python_testing/TC_DA_1_5.py b/src/python_testing/TC_DA_1_5.py
index f27d2f1..5634f53 100644
--- a/src/python_testing/TC_DA_1_5.py
+++ b/src/python_testing/TC_DA_1_5.py
@@ -170,10 +170,10 @@
new_fabric_admin = new_certificate_authority.NewFabricAdmin(vendorId=0xFFF1, fabricId=2)
TH2 = new_fabric_admin.NewController(nodeId=112233)
- success, _ = TH2.CommissionOnNetwork(
+ errcode = TH2.CommissionOnNetwork(
nodeId=self.dut_node_id, setupPinCode=pin,
filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, filter=1234)
- asserts.assert_true(success, 'Commissioning on TH2 did not complete successfully')
+ asserts.assert_true(errcode.is_success, 'Commissioning on TH2 did not complete successfully')
self.print_step(15, "Read NOCs list for TH1")
temp = await self.read_single_attribute_check_success(