blob: 9d6e1e53bee1bb228005a5f17a8fa3ddb9a44472 [file] [log] [blame]
#
# Copyright (c) 2021 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import asyncio
from dataclasses import dataclass
from inspect import Attribute
import inspect
from typing import Any
import typing
from chip import ChipDeviceCtrl
from chip import ChipCommissionableNodeCtrl
import chip.interaction_model as IM
import threading
import os
import sys
import logging
import time
import ctypes
import chip.clusters as Clusters
import chip.clusters.Attribute as Attribute
from chip.utils import CommissioningBuildingBlocks
from chip.ChipStack import *
import chip.native
import chip.FabricAdmin
import chip.CertificateAuthority
import chip.discovery
import copy
import secrets
import faulthandler
import ipdb
logger = logging.getLogger('PythonMatterControllerTEST')
logger.setLevel(logging.INFO)
sh = logging.StreamHandler()
sh.setFormatter(
logging.Formatter(
'%(asctime)s [%(name)s] %(levelname)s %(message)s'))
sh.setStream(sys.stdout)
logger.addHandler(sh)
def TestFail(message, doCrash=False):
logger.fatal("Testfail: {}".format(message))
if (doCrash):
logger.fatal("--------------------------------")
logger.fatal("Backtrace of all Python threads:")
logger.fatal("--------------------------------")
#
# Let's dump the Python backtrace for all threads, since the backtrace we'll
# get from gdb (if one is attached) won't give us good Python symbol information.
#
faulthandler.dump_traceback()
#
# Cause a crash to happen so that we can actually get a meaningful
# backtrace when run through GDB.
#
chip.native.GetLibraryHandle().pychip_CauseCrash()
else:
os._exit(1)
def FailIfNot(cond, message):
if not cond:
TestFail(message)
_configurable_tests = set()
_configurable_test_sets = set()
_enabled_tests = []
_disabled_tests = []
def SetTestSet(enabled_tests, disabled_tests):
global _enabled_tests, _disabled_tests
_enabled_tests = enabled_tests[:]
_disabled_tests = disabled_tests[:]
def TestIsEnabled(test_name: str):
enabled_len = -1
disabled_len = -1
if 'all' in _enabled_tests:
enabled_len = 0
if 'all' in _disabled_tests:
disabled_len = 0
for test_item in _enabled_tests:
if test_name.startswith(test_item) and (len(test_item) > enabled_len):
enabled_len = len(test_item)
for test_item in _disabled_tests:
if test_name.startswith(test_item) and (len(test_item) > disabled_len):
disabled_len = len(test_item)
return enabled_len > disabled_len
def test_set(cls):
_configurable_test_sets.add(cls.__qualname__)
return cls
def test_case(func):
test_name = func.__qualname__
_configurable_tests.add(test_name)
def CheckEnableBeforeRun(*args, **kwargs):
if TestIsEnabled(test_name=test_name):
return func(*args, **kwargs)
elif inspect.iscoroutinefunction(func):
# noop, so users can use await as usual
return asyncio.sleep(0)
return CheckEnableBeforeRun
def configurable_tests():
res = [v for v in _configurable_test_sets]
res.sort()
return res
def configurable_test_cases():
res = [v for v in _configurable_tests]
res.sort()
return res
class TestTimeout(threading.Thread):
def __init__(self, timeout: int):
threading.Thread.__init__(self)
self._timeout = timeout
self._should_stop = False
self._cv = threading.Condition()
def stop(self):
with self._cv:
self._should_stop = True
self._cv.notify_all()
self.join()
def run(self):
stop_time = time.time() + self._timeout
logger.info("Test timeout set to {} seconds".format(self._timeout))
with self._cv:
wait_time = stop_time - time.time()
while wait_time > 0 and not self._should_stop:
self._cv.wait(wait_time)
wait_time = stop_time - time.time()
if time.time() > stop_time:
TestFail("Timeout", doCrash=True)
class TestResult:
def __init__(self, operationName, result):
self.operationName = operationName
self.result = result
def assertStatusEqual(self, expected):
if self.result is None:
raise Exception(f"{self.operationName}: no result got")
if self.result.status != expected:
raise Exception(
f"{self.operationName}: expected status {expected}, got {self.result.status}")
return self
def assertValueEqual(self, expected):
self.assertStatusEqual(0)
if self.result is None:
raise Exception(f"{self.operationName}: no result got")
if self.result.value != expected:
raise Exception(
f"{self.operationName}: expected value {expected}, got {self.result.value}")
return self
class BaseTestHelper:
def __init__(self, nodeid: int, paaTrustStorePath: str, testCommissioner: bool = False):
chip.native.Init()
self.chipStack = ChipStack('/tmp/repl_storage.json')
self.certificateAuthorityManager = chip.CertificateAuthority.CertificateAuthorityManager(chipStack=self.chipStack)
self.certificateAuthority = self.certificateAuthorityManager.NewCertificateAuthority()
self.fabricAdmin = self.certificateAuthority.NewFabricAdmin(vendorId=0xFFF1, fabricId=1)
self.devCtrl = self.fabricAdmin.NewController(
nodeid, paaTrustStorePath, testCommissioner)
self.controllerNodeId = nodeid
self.logger = logger
self.paaTrustStorePath = paaTrustStorePath
logging.getLogger().setLevel(logging.DEBUG)
async def _GetCommissonedFabricCount(self, nodeid: int):
data = await self.devCtrl.ReadAttribute(nodeid, [(Clusters.OperationalCredentials.Attributes.CommissionedFabrics)])
return data[0][Clusters.OperationalCredentials][Clusters.OperationalCredentials.Attributes.CommissionedFabrics]
def _WaitForOneDiscoveredDevice(self, timeoutSeconds: int = 2):
print("Waiting for device responses...")
strlen = 100
addrStrStorage = ctypes.create_string_buffer(strlen)
timeout = time.time() + timeoutSeconds
while (not self.devCtrl.GetIPForDiscoveredDevice(0, addrStrStorage, strlen) and time.time() <= timeout):
time.sleep(0.2)
if time.time() > timeout:
return None
return ctypes.string_at(addrStrStorage).decode("utf-8")
def TestDiscovery(self, discriminator: int):
self.logger.info(
f"Discovering commissionable nodes with discriminator {discriminator}")
res = self.devCtrl.DiscoverCommissionableNodes(
chip.discovery.FilterType.LONG_DISCRIMINATOR, discriminator, stopOnFirst=True, timeoutSecond=3)
if not res:
self.logger.info(
f"Device not found")
return False
self.logger.info(f"Found device {res[0]}")
return res[0]
def TestPaseOnly(self, ip: str, setuppin: int, nodeid: int):
self.logger.info(
"Attempting to establish PASE session with device id: {} addr: {}".format(str(nodeid), ip))
if self.devCtrl.EstablishPASESessionIP(
ip, setuppin, nodeid) is not None:
self.logger.info(
"Failed to establish PASE session with device id: {} addr: {}".format(str(nodeid), ip))
return False
self.logger.info(
"Successfully established PASE session with device id: {} addr: {}".format(str(nodeid), ip))
return True
def TestCommissionOnly(self, nodeid: int):
self.logger.info(
"Commissioning device with id {}".format(nodeid))
if not self.devCtrl.Commission(nodeid):
self.logger.info(
"Failed to commission device with id {}".format(str(nodeid)))
return False
self.logger.info(
"Successfully commissioned device with id {}".format(str(nodeid)))
return True
def TestKeyExchangeBLE(self, discriminator: int, setuppin: int, nodeid: int):
self.logger.info(
"Conducting key exchange with device {}".format(discriminator))
if not self.devCtrl.ConnectBLE(discriminator, setuppin, nodeid):
self.logger.info(
"Failed to finish key exchange with device {}".format(discriminator))
return False
self.logger.info("Device finished key exchange.")
return True
def TestCommissionFailure(self, nodeid: int, failAfter: int):
self.devCtrl.ResetTestCommissioner()
a = self.devCtrl.SetTestCommissionerSimulateFailureOnStage(failAfter)
if not a:
# We're not going to hit this stage during commissioning so no sense trying, just say it was fine.
return True
self.logger.info(
"Commissioning device, expecting failure after stage {}".format(failAfter))
self.devCtrl.Commission(nodeid)
return self.devCtrl.CheckTestCommissionerCallbacks() and self.devCtrl.CheckTestCommissionerPaseConnection(nodeid)
def TestCommissionFailureOnReport(self, nodeid: int, failAfter: int):
self.devCtrl.ResetTestCommissioner()
a = self.devCtrl.SetTestCommissionerSimulateFailureOnReport(failAfter)
if not a:
# We're not going to hit this stage during commissioning so no sense trying, just say it was fine.
return True
self.logger.info(
"Commissioning device, expecting failure on report for stage {}".format(failAfter))
self.devCtrl.Commission(nodeid)
return self.devCtrl.CheckTestCommissionerCallbacks() and self.devCtrl.CheckTestCommissionerPaseConnection(nodeid)
def TestCommissioning(self, ip: str, setuppin: int, nodeid: int):
self.logger.info("Commissioning device {}".format(ip))
if not self.devCtrl.CommissionIP(ip, setuppin, nodeid):
self.logger.info(
"Failed to finish commissioning device {}".format(ip))
return False
self.logger.info("Commissioning finished.")
return True
def TestCommissioningWithSetupPayload(self, setupPayload: str, nodeid: int):
self.logger.info("Commissioning device with setup payload {}".format(setupPayload))
if not self.devCtrl.CommissionWithCode(setupPayload, nodeid):
self.logger.info(
"Failed to finish commissioning device {}".format(setupPayload))
return False
self.logger.info("Commissioning finished.")
return True
def TestUsedTestCommissioner(self):
return self.devCtrl.GetTestCommissionerUsed()
def TestFailsafe(self, nodeid: int):
self.logger.info("Testing arm failsafe")
self.logger.info("Setting failsafe on CASE connection")
err, resp = self.devCtrl.ZCLSend("GeneralCommissioning", "ArmFailSafe", nodeid,
0, 0, dict(expiryLengthSeconds=60, breadcrumb=1), blocking=True)
if err != 0:
self.logger.error(
"Failed to send arm failsafe command error is {} with im response{}".format(err, resp))
return False
if resp.errorCode is not Clusters.GeneralCommissioning.Enums.CommissioningError.kOk:
self.logger.error(
"Incorrect response received from arm failsafe - wanted OK, received {}".format(resp))
return False
self.logger.info(
"Attempting to open basic commissioning window - this should fail since the failsafe is armed")
try:
res = asyncio.run(self.devCtrl.SendCommand(
nodeid, 0, Clusters.AdministratorCommissioning.Commands.OpenBasicCommissioningWindow(180), timedRequestTimeoutMs=10000))
# we actually want the exception here because we want to see a failure, so return False here
self.logger.error(
'Incorrectly succeeded in opening basic commissioning window')
return False
except Exception as ex:
pass
# TODO: pipe through the commissioning window opener so we can test enhanced properly. The pake verifier is just garbage because none of of the functions to calculate
# it or serialize it are available right now. However, this command should fail BEFORE that becomes an issue.
discriminator = 1111
salt = secrets.token_bytes(16)
iterations = 2000
# not the right size or the right contents, but it won't matter
verifier = secrets.token_bytes(32)
self.logger.info(
"Attempting to open enhanced commissioning window - this should fail since the failsafe is armed")
try:
res = asyncio.run(self.devCtrl.SendCommand(nodeid, 0, Clusters.AdministratorCommissioning.Commands.OpenCommissioningWindow(
commissioningTimeout=180, PAKEVerifier=verifier, discriminator=discriminator, iterations=iterations, salt=salt), timedRequestTimeoutMs=10000))
# we actually want the exception here because we want to see a failure, so return False here
self.logger.error(
'Incorrectly succeeded in opening enhanced commissioning window')
return False
except Exception as ex:
pass
self.logger.info("Disarming failsafe on CASE connection")
err, resp = self.devCtrl.ZCLSend("GeneralCommissioning", "ArmFailSafe", nodeid,
0, 0, dict(expiryLengthSeconds=0, breadcrumb=1), blocking=True)
if err != 0:
self.logger.error(
"Failed to send arm failsafe command error is {} with im response{}".format(err, resp))
return False
self.logger.info(
"Opening Commissioning Window - this should succeed since the failsafe was just disarmed")
try:
res = asyncio.run(self.devCtrl.SendCommand(
nodeid, 0, Clusters.AdministratorCommissioning.Commands.OpenBasicCommissioningWindow(180), timedRequestTimeoutMs=10000))
except Exception as ex:
self.logger.error(
'Failed to open commissioning window after disarming failsafe')
return False
self.logger.info(
"Attempting to arm failsafe over CASE - this should fail since the commissioning window is open")
err, resp = self.devCtrl.ZCLSend("GeneralCommissioning", "ArmFailSafe", nodeid,
0, 0, dict(expiryLengthSeconds=60, breadcrumb=1), blocking=True)
if err != 0:
self.logger.error(
"Failed to send arm failsafe command error is {} with im response{}".format(err, resp))
return False
if resp.errorCode is Clusters.GeneralCommissioning.Enums.CommissioningError.kBusyWithOtherAdmin:
return True
return False
async def TestControllerCATValues(self, nodeid: int):
''' This tests controllers using CAT Values
'''
# Allocate a new controller instance with a CAT tag.
newControllers = await CommissioningBuildingBlocks.CreateControllersOnFabric(fabricAdmin=self.fabricAdmin, adminDevCtrl=self.devCtrl, controllerNodeIds=[300], targetNodeId=nodeid, privilege=None, catTags=[0x0001_0001])
# Read out an attribute using the new controller. It has no privileges, so this should fail with an UnsupportedAccess error.
res = await newControllers[0].ReadAttribute(nodeid=nodeid, attributes=[(0, Clusters.AccessControl.Attributes.Acl)])
if(res[0][Clusters.AccessControl][Clusters.AccessControl.Attributes.Acl].Reason.status != IM.Status.UnsupportedAccess):
self.logger.error(f"1: Received data instead of an error:{res}")
return False
# Grant the new controller privilege by adding the CAT tag to the subject.
await CommissioningBuildingBlocks.GrantPrivilege(adminCtrl=self.devCtrl, grantedCtrl=newControllers[0], privilege=Clusters.AccessControl.Enums.Privilege.kAdminister, targetNodeId=nodeid, targetCatTags=[0x0001_0001])
# Read out the attribute again - this time, it should succeed.
res = await newControllers[0].ReadAttribute(nodeid=nodeid, attributes=[(0, Clusters.AccessControl.Attributes.Acl)])
if (type(res[0][Clusters.AccessControl][Clusters.AccessControl.Attributes.Acl][0]) != Clusters.AccessControl.Structs.AccessControlEntry):
self.logger.error(f"2: Received something other than data:{res}")
return False
# Reset the privilege back to pre-test.
await CommissioningBuildingBlocks.GrantPrivilege(adminCtrl=self.devCtrl, grantedCtrl=newControllers[0], privilege=None, targetNodeId=nodeid)
newControllers[0].Shutdown()
return True
async def TestMultiControllerFabric(self, nodeid: int):
''' This tests having multiple controller instances on the same fabric.
'''
# Create two new controllers on the same fabric with no privilege on the target node.
newControllers = await CommissioningBuildingBlocks.CreateControllersOnFabric(fabricAdmin=self.fabricAdmin, adminDevCtrl=self.devCtrl, controllerNodeIds=[100, 200], targetNodeId=nodeid, privilege=None)
#
# Read out the ACL list from one of the newly minted controllers which has no access. This should return an IM error.
#
res = await newControllers[0].ReadAttribute(nodeid=nodeid, attributes=[(0, Clusters.AccessControl.Attributes.Acl)])
if(res[0][Clusters.AccessControl][Clusters.AccessControl.Attributes.Acl].Reason.status != IM.Status.UnsupportedAccess):
self.logger.error(f"1: Received data instead of an error:{res}")
return False
#
# Read out the ACL list from an existing controller with admin privileges. This should return back valid data.
# Doing this ensures that we're not somehow aliasing the CASE sessions.
#
res = await self.devCtrl.ReadAttribute(nodeid=nodeid, attributes=[(0, Clusters.AccessControl.Attributes.Acl)])
if (type(res[0][Clusters.AccessControl][Clusters.AccessControl.Attributes.Acl][0]) != Clusters.AccessControl.Structs.AccessControlEntry):
self.logger.error(f"2: Received something other than data:{res}")
return False
#
# Re-do the previous read from the unprivileged controller just to do an ABA test to prove we haven't switched the CASE sessions
# under-neath.
#
res = await newControllers[0].ReadAttribute(nodeid=nodeid, attributes=[(0, Clusters.AccessControl.Attributes.Acl)])
if(res[0][Clusters.AccessControl][Clusters.AccessControl.Attributes.Acl].Reason.status != IM.Status.UnsupportedAccess):
self.logger.error(f"3: Received data instead of an error:{res}")
return False
#
# Grant the new controller admin privileges. Reading out the ACL cluster should now yield data.
#
await CommissioningBuildingBlocks.GrantPrivilege(adminCtrl=self.devCtrl, grantedCtrl=newControllers[0], privilege=Clusters.AccessControl.Enums.Privilege.kAdminister, targetNodeId=nodeid)
res = await newControllers[0].ReadAttribute(nodeid=nodeid, attributes=[(0, Clusters.AccessControl.Attributes.Acl)])
if (type(res[0][Clusters.AccessControl][Clusters.AccessControl.Attributes.Acl][0]) != Clusters.AccessControl.Structs.AccessControlEntry):
self.logger.error(f"4: Received something other than data:{res}")
return False
#
# Grant the second new controller admin privileges as well. Reading out the ACL cluster should now yield data.
#
await CommissioningBuildingBlocks.GrantPrivilege(adminCtrl=self.devCtrl, grantedCtrl=newControllers[1], privilege=Clusters.AccessControl.Enums.Privilege.kAdminister, targetNodeId=nodeid)
res = await newControllers[1].ReadAttribute(nodeid=nodeid, attributes=[(0, Clusters.AccessControl.Attributes.Acl)])
if (type(res[0][Clusters.AccessControl][Clusters.AccessControl.Attributes.Acl][0]) != Clusters.AccessControl.Structs.AccessControlEntry):
self.logger.error(f"5: Received something other than data:{res}")
return False
#
# Grant the second new controller just view privilege. Reading out the ACL cluster should return no data.
#
await CommissioningBuildingBlocks.GrantPrivilege(adminCtrl=self.devCtrl, grantedCtrl=newControllers[1], privilege=Clusters.AccessControl.Enums.Privilege.kView, targetNodeId=nodeid)
res = await newControllers[1].ReadAttribute(nodeid=nodeid, attributes=[(0, Clusters.AccessControl.Attributes.Acl)])
if(res[0][Clusters.AccessControl][Clusters.AccessControl.Attributes.Acl].Reason.status != IM.Status.UnsupportedAccess):
self.logger.error(f"6: Received data5 instead of an error:{res}")
return False
#
# Read the Basic cluster from the 2nd controller. This is possible with just view privileges.
#
res = await newControllers[1].ReadAttribute(nodeid=nodeid, attributes=[(0, Clusters.Basic.Attributes.ClusterRevision)])
if (type(res[0][Clusters.Basic][Clusters.Basic.Attributes.ClusterRevision]) != Clusters.Basic.Attributes.ClusterRevision.attribute_type.Type):
self.logger.error(f"7: Received something other than data:{res}")
return False
newControllers[0].Shutdown()
newControllers[1].Shutdown()
return True
async def TestAddUpdateRemoveFabric(self, nodeid: int):
logger.info("Testing AddNOC, UpdateNOC and RemoveFabric")
self.logger.info("Waiting for attribute read for CommissionedFabrics")
startOfTestFabricCount = await self._GetCommissonedFabricCount(nodeid)
tempCertificateAuthority = self.certificateAuthorityManager.NewCertificateAuthority()
tempFabric = tempCertificateAuthority.NewFabricAdmin(vendorId=0xFFF1, fabricId=1)
tempDevCtrl = tempFabric.NewController(self.controllerNodeId, self.paaTrustStorePath)
self.logger.info("Starting AddNOC using same node ID")
if not await CommissioningBuildingBlocks.AddNOCForNewFabricFromExisting(self.devCtrl, tempDevCtrl, nodeid, nodeid):
self.logger.error("AddNOC failed")
return False
expectedFabricCountUntilRemoveFabric = startOfTestFabricCount + 1
if expectedFabricCountUntilRemoveFabric != await self._GetCommissonedFabricCount(nodeid):
self.logger.error("Expected commissioned fabric count to change after AddNOC")
return False
self.logger.info("Starting UpdateNOC using same node ID")
if not await CommissioningBuildingBlocks.UpdateNOC(tempDevCtrl, nodeid, nodeid):
self.logger.error("UpdateNOC using same node ID failed")
return False
if expectedFabricCountUntilRemoveFabric != await self._GetCommissonedFabricCount(nodeid):
self.logger.error("Expected commissioned fabric count to remain unchanged after UpdateNOC")
return False
self.logger.info("Starting UpdateNOC using different node ID")
newNodeIdForUpdateNoc = nodeid + 1
if not await CommissioningBuildingBlocks.UpdateNOC(tempDevCtrl, nodeid, newNodeIdForUpdateNoc):
self.logger.error("UpdateNOC using different node ID failed")
return False
if expectedFabricCountUntilRemoveFabric != await self._GetCommissonedFabricCount(nodeid):
self.logger.error("Expected commissioned fabric count to remain unchanged after UpdateNOC with new node ID")
return False
# TODO Read using old node ID and expect that it fails.
currentFabricIndexResponse = await tempDevCtrl.ReadAttribute(newNodeIdForUpdateNoc, [(Clusters.OperationalCredentials.Attributes.CurrentFabricIndex)])
updatedNOCFabricIndex = currentFabricIndexResponse[0][Clusters.OperationalCredentials][Clusters.OperationalCredentials.Attributes.CurrentFabricIndex]
removeFabricResponse = await tempDevCtrl.SendCommand(newNodeIdForUpdateNoc, 0, Clusters.OperationalCredentials.Commands.RemoveFabric(updatedNOCFabricIndex))
if startOfTestFabricCount != await self._GetCommissonedFabricCount(nodeid):
self.logger.error("Expected fabric count to be the same at the end of test as when it started")
return False
tempDevCtrl.Shutdown()
tempFabric.Shutdown()
return True
async def TestCaseEviction(self, nodeid: int):
self.logger.info("Testing CASE eviction")
minimumCASESessionsPerFabric = 3
minimumSupportedFabrics = 16
#
# This test exercises the ability to allocate more sessions than are supported in the
# pool configuration. By going beyond (minimumSupportedFabrics * minimumCASESessionsPerFabric),
# it starts to test out the eviction logic. This specific test does not validate the specifics
# of eviction, just that allocation and CASE session establishment proceeds successfully on both
# the controller and target.
#
for x in range(minimumSupportedFabrics * minimumCASESessionsPerFabric * 2):
self.devCtrl.CloseSession(nodeid)
await self.devCtrl.ReadAttribute(nodeid, [(Clusters.Basic.Attributes.ClusterRevision)])
self.logger.info("Testing CASE defunct logic")
#
# This tests establishes a subscription on a given CASE session, then marks it defunct (to simulate
# encountering a transport timeout on the session).
#
# Then, we write to the attribute that was subscribed to from a *different* fabric and check to ensure we still get a report
# on the sub we established previously. Since it was just marked defunct, it should return back to being
# active and a report should get delivered.
#
sawValueChange = False
def OnValueChange(path: Attribute.TypedAttributePath, transaction: Attribute.SubscriptionTransaction) -> None:
nonlocal sawValueChange
self.logger.info("Saw value change!")
if (path.AttributeType == Clusters.UnitTesting.Attributes.Int8u and path.Path.EndpointId == 1):
sawValueChange = True
self.logger.info("Testing CASE defunct logic")
sub = await self.devCtrl.ReadAttribute(nodeid, [(Clusters.UnitTesting.Attributes.Int8u)], reportInterval=(0, 1))
sub.SetAttributeUpdateCallback(OnValueChange)
#
# This marks the session defunct.
#
self.devCtrl.CloseSession(nodeid)
#
# Now write the attribute from fabric2, give it some time before checking if the report
# was received.
#
await self.devCtrl2.WriteAttribute(nodeid, [(1, Clusters.UnitTesting.Attributes.Int8u(4))])
time.sleep(2)
sub.Shutdown()
if sawValueChange is False:
self.logger.error("Didn't see value change in time, likely because sub got terminated due to unexpected session eviction!")
return False
#
# In this test, we're going to setup a subscription on fabric1 through devCtl, then, constantly keep
# evicting sessions on fabric2 (devCtl2) by cycling through closing sessions followed by issuing a Read. This
# should result in evictions on the server on fabric2, but not affect any sessions on fabric1. To test this,
# we're going to setup a subscription to an attribute prior to the cycling reads, and check at the end of the
# test that it's still valid by writing to an attribute from a *different* fabric, and validating that we see
# the change on the established subscription. That proves that the session from fabric1 is still valid and untouched.
#
self.logger.info("Testing fabric-isolated CASE eviction")
sawValueChange = False
sub = await self.devCtrl.ReadAttribute(nodeid, [(Clusters.UnitTesting.Attributes.Int8u)], reportInterval=(0, 1))
sub.SetAttributeUpdateCallback(OnValueChange)
for x in range(minimumSupportedFabrics * minimumCASESessionsPerFabric * 2):
self.devCtrl2.CloseSession(nodeid)
await self.devCtrl2.ReadAttribute(nodeid, [(Clusters.Basic.Attributes.ClusterRevision)])
#
# Now write the attribute from fabric2, give it some time before checking if the report
# was received.
#
await self.devCtrl2.WriteAttribute(nodeid, [(1, Clusters.UnitTesting.Attributes.Int8u(4))])
time.sleep(2)
sub.Shutdown()
if sawValueChange is False:
self.logger.error("Didn't see value change in time, likely because sub got terminated due to other fabric (fabric1)")
return False
#
# Do the same test again, but reversing the roles of fabric1 and fabric2.
#
self.logger.info("Testing fabric-isolated CASE eviction (reverse)")
sawValueChange = False
sub = await self.devCtrl2.ReadAttribute(nodeid, [(Clusters.UnitTesting.Attributes.Int8u)], reportInterval=(0, 1))
sub.SetAttributeUpdateCallback(OnValueChange)
for x in range(minimumSupportedFabrics * minimumCASESessionsPerFabric * 2):
self.devCtrl.CloseSession(nodeid)
await self.devCtrl.ReadAttribute(nodeid, [(Clusters.Basic.Attributes.ClusterRevision)])
await self.devCtrl.WriteAttribute(nodeid, [(1, Clusters.UnitTesting.Attributes.Int8u(4))])
time.sleep(2)
sub.Shutdown()
if sawValueChange is False:
self.logger.error("Didn't see value change in time, likely because sub got terminated due to other fabric (fabric2)")
return False
return True
async def TestMultiFabric(self, ip: str, setuppin: int, nodeid: int):
self.logger.info("Opening Commissioning Window")
await self.devCtrl.SendCommand(nodeid, 0, Clusters.AdministratorCommissioning.Commands.OpenBasicCommissioningWindow(180), timedRequestTimeoutMs=10000)
self.logger.info("Creating 2nd Fabric Admin")
self.fabricAdmin2 = self.certificateAuthority.NewFabricAdmin(vendorId=0xFFF1, fabricId=2)
self.logger.info("Creating Device Controller on 2nd Fabric")
self.devCtrl2 = self.fabricAdmin2.NewController(
self.controllerNodeId, self.paaTrustStorePath)
if not self.devCtrl2.CommissionIP(ip, setuppin, nodeid):
self.logger.info(
"Failed to finish key exchange with device {}".format(ip))
return False
#
# Shut-down all the controllers (which will free them up)
#
self.logger.info(
"Shutting down controllers & fabrics and re-initing stack...")
self.certificateAuthorityManager.Shutdown()
self.logger.info("Shutdown completed, starting new controllers...")
self.certificateAuthorityManager = chip.CertificateAuthority.CertificateAuthorityManager(chipStack=self.chipStack)
self.certificateAuthority = self.certificateAuthorityManager.NewCertificateAuthority()
self.fabricAdmin = self.certificateAuthority.NewFabricAdmin(vendorId=0xFFF1, fabricId=1)
fabricAdmin2 = self.certificateAuthority.NewFabricAdmin(vendorId=0xFFF1, fabricId=2)
self.devCtrl = self.fabricAdmin.NewController(
self.controllerNodeId, self.paaTrustStorePath)
self.devCtrl2 = fabricAdmin2.NewController(
self.controllerNodeId, self.paaTrustStorePath)
self.logger.info("Waiting for attribute reads...")
data1 = await self.devCtrl.ReadAttribute(nodeid, [(Clusters.OperationalCredentials.Attributes.NOCs)], fabricFiltered=False)
data2 = await self.devCtrl2.ReadAttribute(nodeid, [(Clusters.OperationalCredentials.Attributes.NOCs)], fabricFiltered=False)
# Read out noclist from each fabric, and each should contain two NOCs.
nocList1 = data1[0][Clusters.OperationalCredentials][Clusters.OperationalCredentials.Attributes.NOCs]
nocList2 = data2[0][Clusters.OperationalCredentials][Clusters.OperationalCredentials.Attributes.NOCs]
if (len(nocList1) != 2 or len(nocList2) != 2):
self.logger.error("Got back invalid nocList")
return False
data1 = await self.devCtrl.ReadAttribute(nodeid, [(Clusters.OperationalCredentials.Attributes.CurrentFabricIndex)], fabricFiltered=False)
data2 = await self.devCtrl2.ReadAttribute(nodeid, [(Clusters.OperationalCredentials.Attributes.CurrentFabricIndex)], fabricFiltered=False)
# Read out current fabric from each fabric, and both should be different.
self.currentFabric1 = data1[0][Clusters.OperationalCredentials][
Clusters.OperationalCredentials.Attributes.CurrentFabricIndex]
self.currentFabric2 = data2[0][Clusters.OperationalCredentials][
Clusters.OperationalCredentials.Attributes.CurrentFabricIndex]
if (self.currentFabric1 == self.currentFabric2):
self.logger.error(
"Got back fabric indices that match for two different fabrics!")
return False
self.logger.info("Attribute reads completed...")
return True
async def TestFabricSensitive(self, nodeid: int):
expectedDataFabric1 = [
Clusters.UnitTesting.Structs.TestFabricScoped(),
Clusters.UnitTesting.Structs.TestFabricScoped()
]
expectedDataFabric1[0].fabricIndex = 100
expectedDataFabric1[0].fabricSensitiveInt8u = 33
expectedDataFabric1[0].optionalFabricSensitiveInt8u = 34
expectedDataFabric1[0].nullableFabricSensitiveInt8u = 35
expectedDataFabric1[0].nullableOptionalFabricSensitiveInt8u = Clusters.Types.NullValue
expectedDataFabric1[0].fabricSensitiveCharString = "alpha1"
expectedDataFabric1[0].fabricSensitiveStruct.a = 36
expectedDataFabric1[0].fabricSensitiveInt8uList = [1, 2, 3, 4]
expectedDataFabric1[1].fabricIndex = 100
expectedDataFabric1[1].fabricSensitiveInt8u = 43
expectedDataFabric1[1].optionalFabricSensitiveInt8u = 44
expectedDataFabric1[1].nullableFabricSensitiveInt8u = 45
expectedDataFabric1[1].nullableOptionalFabricSensitiveInt8u = Clusters.Types.NullValue
expectedDataFabric1[1].fabricSensitiveCharString = "alpha2"
expectedDataFabric1[1].fabricSensitiveStruct.a = 46
expectedDataFabric1[1].fabricSensitiveInt8uList = [2, 3, 4, 5]
self.logger.info("Writing data from fabric1...")
await self.devCtrl.WriteAttribute(nodeid, [(1, Clusters.UnitTesting.Attributes.ListFabricScoped(expectedDataFabric1))])
expectedDataFabric2 = copy.deepcopy(expectedDataFabric1)
expectedDataFabric2[0].fabricSensitiveInt8u = 133
expectedDataFabric2[0].optionalFabricSensitiveInt8u = 134
expectedDataFabric2[0].nullableFabricSensitiveInt8u = 135
expectedDataFabric2[0].fabricSensitiveCharString = "beta1"
expectedDataFabric2[0].fabricSensitiveStruct.a = 136
expectedDataFabric2[0].fabricSensitiveInt8uList = [11, 12, 13, 14]
expectedDataFabric2[1].fabricSensitiveInt8u = 143
expectedDataFabric2[1].optionalFabricSensitiveInt8u = 144
expectedDataFabric2[1].nullableFabricSensitiveInt8u = 145
expectedDataFabric2[1].fabricSensitiveCharString = "beta2"
expectedDataFabric2[1].fabricSensitiveStruct.a = 146
expectedDataFabric2[1].fabricSensitiveStruct.f = 147
expectedDataFabric2[1].fabricSensitiveInt8uList = [12, 13, 14, 15]
self.logger.info("Writing data from fabric2...")
await self.devCtrl2.WriteAttribute(nodeid, [(1, Clusters.UnitTesting.Attributes.ListFabricScoped(expectedDataFabric2))])
#
# Now read the data back filtered from fabric1 and ensure it matches.
#
self.logger.info("Reading back data from fabric1...")
data = await self.devCtrl.ReadAttribute(nodeid, [(1, Clusters.UnitTesting.Attributes.ListFabricScoped)])
readListDataFabric1 = data[1][Clusters.UnitTesting][Clusters.UnitTesting.Attributes.ListFabricScoped]
#
# Update the expected data's fabric index to that we just read back
# before we attempt to compare the data
#
expectedDataFabric1[0].fabricIndex = self.currentFabric1
expectedDataFabric1[1].fabricIndex = self.currentFabric1
self.logger.info("Comparing data on fabric1...")
if (expectedDataFabric1 != readListDataFabric1):
raise AssertionError("Got back mismatched data")
self.logger.info("Reading back data from fabric2...")
data = await self.devCtrl2.ReadAttribute(nodeid, [(1, Clusters.UnitTesting.Attributes.ListFabricScoped)])
readListDataFabric2 = data[1][Clusters.UnitTesting][Clusters.UnitTesting.Attributes.ListFabricScoped]
#
# Update the expected data's fabric index to that we just read back
# before we attempt to compare the data
#
expectedDataFabric2[0].fabricIndex = self.currentFabric2
expectedDataFabric2[1].fabricIndex = self.currentFabric2
self.logger.info("Comparing data on fabric2...")
if (expectedDataFabric2 != readListDataFabric2):
raise AssertionError("Got back mismatched data")
self.logger.info(
"Reading back unfiltered data across all fabrics from fabric1...")
def CompareUnfilteredData(accessingFabric, otherFabric, expectedData):
index = 0
self.logger.info(
f"Comparing data from accessing fabric {accessingFabric}...")
for item in readListDataFabric:
if (item.fabricIndex == accessingFabric):
if (index == 2):
raise AssertionError(
"Got back more data than expected")
if (item != expectedData[index]):
raise AssertionError("Got back mismatched data")
index = index + 1
else:
#
# We should not be able to see any fabric sensitive data from the non accessing fabric.
# Aside from the fabric index, everything else in TestFabricScoped is marked sensitive so we should
# only see defaults for that data. Instantiate an instance of that struct
# which should automatically be initialized with defaults and compare that
# against what we got back.
#
expectedDefaultData = Clusters.UnitTesting.Structs.TestFabricScoped()
expectedDefaultData.fabricIndex = otherFabric
if (item != expectedDefaultData):
raise AssertionError("Got back mismatched data")
data = await self.devCtrl.ReadAttribute(nodeid, [(1, Clusters.UnitTesting.Attributes.ListFabricScoped)], fabricFiltered=False)
readListDataFabric = data[1][Clusters.UnitTesting][Clusters.UnitTesting.Attributes.ListFabricScoped]
CompareUnfilteredData(self.currentFabric1,
self.currentFabric2, expectedDataFabric1)
data = await self.devCtrl2.ReadAttribute(nodeid, [(1, Clusters.UnitTesting.Attributes.ListFabricScoped)], fabricFiltered=False)
readListDataFabric = data[1][Clusters.UnitTesting][Clusters.UnitTesting.Attributes.ListFabricScoped]
CompareUnfilteredData(self.currentFabric2,
self.currentFabric1, expectedDataFabric2)
self.logger.info("Writing smaller list from alpha (again)")
expectedDataFabric1[0].fabricIndex = 100
expectedDataFabric1[0].fabricSensitiveInt8u = 53
expectedDataFabric1[0].optionalFabricSensitiveInt8u = 54
expectedDataFabric1[0].nullableFabricSensitiveInt8u = 55
expectedDataFabric1[0].nullableOptionalFabricSensitiveInt8u = Clusters.Types.NullValue
expectedDataFabric1[0].fabricSensitiveCharString = "alpha3"
expectedDataFabric1[0].fabricSensitiveStruct.a = 56
expectedDataFabric1[0].fabricSensitiveInt8uList = [51, 52, 53, 54]
expectedDataFabric1.pop(1)
await self.devCtrl.WriteAttribute(nodeid, [(1, Clusters.UnitTesting.Attributes.ListFabricScoped(expectedDataFabric1))])
self.logger.info(
"Reading back data (again) from fabric2 to ensure it hasn't changed")
data = await self.devCtrl2.ReadAttribute(nodeid, [(1, Clusters.UnitTesting.Attributes.ListFabricScoped)])
readListDataFabric2 = data[1][Clusters.UnitTesting][Clusters.UnitTesting.Attributes.ListFabricScoped]
if (expectedDataFabric2 != readListDataFabric2):
raise AssertionError("Got back mismatched data")
self.logger.info(
"Reading back data (again) from fabric1 to ensure it hasn't changed")
data = await self.devCtrl.ReadAttribute(nodeid, [(1, Clusters.UnitTesting.Attributes.ListFabricScoped)])
readListDataFabric1 = data[1][Clusters.UnitTesting][Clusters.UnitTesting.Attributes.ListFabricScoped]
self.logger.info("Comparing data on fabric1...")
expectedDataFabric1[0].fabricIndex = self.currentFabric1
if (expectedDataFabric1 != readListDataFabric1):
raise AssertionError("Got back mismatched data")
async def TestResubscription(self, nodeid: int):
''' This validates the re-subscription logic by triggering a liveness failure caused by the expiration
of the underlying CASE session and the resultant failure to receive reports from the server. This should
trigger CASE session establishment and subscription restablishment. Both the attempt and successful
restablishment of the subscription are validated.
'''
cv = asyncio.Condition()
resubAttempted = False
resubSucceeded = True
async def OnResubscriptionAttempted(transaction, errorEncountered: int, nextResubscribeIntervalMsec: int):
self.logger.info("Re-subscription Attempted")
nonlocal resubAttempted
resubAttempted = True
async def OnResubscriptionSucceeded(transaction):
self.logger.info("Re-subscription Succeeded")
nonlocal cv
async with cv:
cv.notify()
subscription = await self.devCtrl.ReadAttribute(nodeid, [(Clusters.Basic.Attributes.ClusterRevision)], reportInterval=(0, 5))
#
# Register async callbacks that will fire when a re-sub is attempted or succeeds.
#
subscription.SetResubscriptionAttemptedCallback(OnResubscriptionAttempted, True)
subscription.SetResubscriptionSucceededCallback(OnResubscriptionSucceeded, True)
#
# Over-ride the default liveness timeout (which is set quite high to accomodate for
# transport delays) to something very small. This ensures that our liveness timer will
# fire quickly and cause a re-subscription to occur naturally.
#
subscription.OverrideLivenessTimeoutMs(100)
async with cv:
if (not(resubAttempted) or not(resubSucceeded)):
res = await asyncio.wait_for(cv.wait(), 3)
if not res:
self.logger.error("Timed out waiting for resubscription to succeed")
return False
subscription.Shutdown()
return True
def TestCloseSession(self, nodeid: int):
self.logger.info(f"Closing sessions with device {nodeid}")
try:
self.devCtrl.CloseSession(nodeid)
return True
except Exception as ex:
self.logger.exception(
f"Failed to close sessions with device {nodeid}: {ex}")
return False
def SetNetworkCommissioningParameters(self, dataset: str):
self.logger.info("Setting network commissioning parameters")
self.devCtrl.SetThreadOperationalDataset(bytes.fromhex(dataset))
return True
def TestOnOffCluster(self, nodeid: int, endpoint: int, group: int):
self.logger.info(
"Sending On/Off commands to device {} endpoint {}".format(nodeid, endpoint))
err, resp = self.devCtrl.ZCLSend("OnOff", "On", nodeid,
endpoint, group, {}, blocking=True)
if err != 0:
self.logger.error(
"failed to send OnOff.On: error is {} with im response{}".format(err, resp))
return False
err, resp = self.devCtrl.ZCLSend("OnOff", "Off", nodeid,
endpoint, group, {}, blocking=True)
if err != 0:
self.logger.error(
"failed to send OnOff.Off: error is {} with im response {}".format(err, resp))
return False
return True
def TestLevelControlCluster(self, nodeid: int, endpoint: int, group: int):
self.logger.info(
f"Sending MoveToLevel command to device {nodeid} endpoint {endpoint}")
try:
commonArgs = dict(transitionTime=0, optionsMask=1, optionsOverride=1)
# Move to 1
self.devCtrl.ZCLSend("LevelControl", "MoveToLevel", nodeid,
endpoint, group, dict(**commonArgs, level=1), blocking=True)
res = self.devCtrl.ZCLReadAttribute(cluster="LevelControl",
attribute="CurrentLevel",
nodeid=nodeid,
endpoint=endpoint,
groupid=group)
TestResult("Read attribute LevelControl.CurrentLevel",
res).assertValueEqual(1)
# Move to 254
self.devCtrl.ZCLSend("LevelControl", "MoveToLevel", nodeid,
endpoint, group, dict(**commonArgs, level=254), blocking=True)
res = self.devCtrl.ZCLReadAttribute(cluster="LevelControl",
attribute="CurrentLevel",
nodeid=nodeid,
endpoint=endpoint,
groupid=group)
TestResult("Read attribute LevelControl.CurrentLevel",
res).assertValueEqual(254)
return True
except Exception as ex:
self.logger.exception(f"Level cluster test failed: {ex}")
return False
def TestResolve(self, nodeid):
self.logger.info(
"Resolve: node id = {:08x}".format(nodeid))
try:
self.devCtrl.ResolveNode(nodeid=nodeid)
addr = None
start = time.time()
while not addr:
addr = self.devCtrl.GetAddressAndPort(nodeid)
if time.time() - start > 10:
self.logger.exception(f"Timeout waiting for address...")
break
if not addr:
time.sleep(0.2)
if not addr:
self.logger.exception(f"Addr is missing...")
return False
self.logger.info(f"Resolved address: {addr[0]}:{addr[1]}")
return True
except Exception as ex:
self.logger.exception("Failed to resolve. {}".format(ex))
return False
def TestReadBasicAttributes(self, nodeid: int, endpoint: int, group: int):
basic_cluster_attrs = {
"VendorName": "TEST_VENDOR",
"VendorID": 0xFFF1,
"ProductName": "TEST_PRODUCT",
"ProductID": 0x8001,
"NodeLabel": "Test",
"Location": "XX",
"HardwareVersion": 0,
"HardwareVersionString": "TEST_VERSION",
"SoftwareVersion": 1,
"SoftwareVersionString": "1.0",
}
failed_zcl = {}
for basic_attr, expected_value in basic_cluster_attrs.items():
try:
res = self.devCtrl.ZCLReadAttribute(cluster="Basic",
attribute=basic_attr,
nodeid=nodeid,
endpoint=endpoint,
groupid=group)
TestResult(f"Read attribute {basic_attr}", res).assertValueEqual(
expected_value)
except Exception as ex:
failed_zcl[basic_attr] = str(ex)
if failed_zcl:
self.logger.exception(f"Following attributes failed: {failed_zcl}")
return False
return True
def TestWriteBasicAttributes(self, nodeid: int, endpoint: int, group: int):
@ dataclass
class AttributeWriteRequest:
cluster: str
attribute: str
value: Any
expected_status: IM.Status = IM.Status.Success
requests = [
AttributeWriteRequest("Basic", "NodeLabel", "Test"),
AttributeWriteRequest("Basic", "Location",
"a pretty loooooooooooooog string", IM.Status.ConstraintError),
]
failed_zcl = []
for req in requests:
try:
try:
self.devCtrl.ZCLWriteAttribute(cluster=req.cluster,
attribute=req.attribute,
nodeid=nodeid,
endpoint=endpoint,
groupid=group,
value=req.value)
if req.expected_status != IM.Status.Success:
raise AssertionError(
f"Write attribute {req.cluster}.{req.attribute} expects failure but got success response")
except Exception as ex:
if req.expected_status != IM.Status.Success:
continue
else:
raise ex
res = self.devCtrl.ZCLReadAttribute(
cluster=req.cluster, attribute=req.attribute, nodeid=nodeid, endpoint=endpoint, groupid=group)
TestResult(f"Read attribute {req.cluster}.{req.attribute}", res).assertValueEqual(
req.value)
except Exception as ex:
failed_zcl.append(str(ex))
if failed_zcl:
self.logger.exception(f"Following attributes failed: {failed_zcl}")
return False
return True
def TestSubscription(self, nodeid: int, endpoint: int):
desiredPath = None
receivedUpdate = 0
updateLock = threading.Lock()
updateCv = threading.Condition(updateLock)
def OnValueChange(path: Attribute.TypedAttributePath, transaction: Attribute.SubscriptionTransaction) -> None:
nonlocal desiredPath, updateCv, updateLock, receivedUpdate
if path.Path != desiredPath:
return
data = transaction.GetAttribute(path)
logger.info(
f"Received report from server: path: {path.Path}, value: {data}")
with updateLock:
receivedUpdate += 1
updateCv.notify_all()
class _conductAttributeChange(threading.Thread):
def __init__(self, devCtrl: ChipDeviceCtrl.ChipDeviceController, nodeid: int, endpoint: int):
super(_conductAttributeChange, self).__init__()
self.nodeid = nodeid
self.endpoint = endpoint
self.devCtrl = devCtrl
def run(self):
for i in range(5):
time.sleep(3)
self.devCtrl.ZCLSend(
"OnOff", "Toggle", self.nodeid, self.endpoint, 0, {})
try:
desiredPath = Clusters.Attribute.AttributePath(
EndpointId=1, ClusterId=6, AttributeId=0)
# OnOff Cluster, OnOff Attribute
subscription = self.devCtrl.ZCLSubscribeAttribute(
"OnOff", "OnOff", nodeid, endpoint, 1, 10)
subscription.SetAttributeUpdateCallback(OnValueChange)
changeThread = _conductAttributeChange(
self.devCtrl, nodeid, endpoint)
# Reset the number of subscriptions received as subscribing causes a callback.
changeThread.start()
with updateCv:
while receivedUpdate < 5:
# We should observe 5 attribute changes
# The changing thread will change the value after 3 seconds. If we're waiting more than 10, assume something
# is really wrong and bail out here with some information.
if not updateCv.wait(10.0):
self.logger.error(
f"Failed to receive subscription update")
break
# thread changes 5 times, and sleeps for 3 seconds in between. Add an additional 3 seconds of slack. Timeout is in seconds.
changeThread.join(18.0)
#
# Clean-up by shutting down the sub. Otherwise, we're going to get callbacks through OnValueChange on what will soon become an invalid
# execution context above.
#
subscription.Shutdown()
if changeThread.is_alive():
# Thread join timed out
self.logger.error(f"Failed to join change thread")
return False
return True if receivedUpdate == 5 else False
except Exception as ex:
self.logger.exception(f"Failed to finish API test: {ex}")
return False
return True
def TestNonControllerAPIs(self):
'''
This function validates various APIs provided by chip package which is not related to controller.
TODO: Add more tests for APIs
'''
try:
cluster = self.devCtrl.GetClusterHandler()
clusterInfo = cluster.GetClusterInfoById(0xFFF1FC05) # TestCluster
if clusterInfo["clusterName"] != "UnitTesting":
raise Exception(
f"Wrong cluster info clusterName: {clusterInfo['clusterName']} expected 'UnitTesting'")
except Exception as ex:
self.logger.exception(f"Failed to finish API test: {ex}")
return False
return True
def TestFabricScopedCommandDuringPase(self, nodeid: int):
'''Validates that fabric-scoped commands fail during PASE with UNSUPPORTED_ACCESS
The nodeid is the PASE pseudo-node-ID used during PASE establishment
'''
status = None
try:
response = asyncio.run(self.devCtrl.SendCommand(
nodeid, 0, Clusters.OperationalCredentials.Commands.UpdateFabricLabel("roboto")))
except IM.InteractionModelError as ex:
status = ex.status
return status == IM.Status.UnsupportedAccess