blob: 8f329daca51101af2b3bc5dfd62db5ec6b838387 [file] [log] [blame]
#
# Copyright (c) 2023 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import inspect
import logging
import random
from dataclasses import dataclass
import chip.clusters as Clusters
import chip.discovery as Discovery
from chip import ChipUtility
from chip.exceptions import ChipStackError
from chip.interaction_model import InteractionModelError, Status
from matter_testing_support import MatterBaseTest, async_test_body, default_matter_test_main, type_matches
from mobly import asserts
def get_all_cmds_for_cluster_id(cid: int) -> list[Clusters.ClusterObjects.ClusterCommand]:
cluster = Clusters.ClusterObjects.ALL_CLUSTERS[cid]
try:
return inspect.getmembers(cluster.Commands, inspect.isclass)
except AttributeError:
return []
def client_cmd(cmd_class):
# Inspect returns all the classes, not just the ones we want, so use a try
# here incase we're inspecting a builtin class
try:
return cmd_class if cmd_class.is_client else None
except AttributeError:
return None
# one of the steps in this test requires sending a command that requires a timed interaction
# without first sending the TimedRequest action
# OpenCommissioningWindow requires a timed invoke and is mandatory on servers, BUT, it's marked
# that way in the base class. We need a new, fake class that doesn't have that set
@dataclass
class FakeRevokeCommissioning(Clusters.AdministratorCommissioning.Commands.RevokeCommissioning):
@ChipUtility.classproperty
def must_use_timed_invoke(cls) -> bool:
return False
class TC_IDM_1_2(MatterBaseTest):
@async_test_body
async def test_TC_IDM_1_2(self):
self.print_step(0, "Commissioning - already done")
wildcard_descriptor = await self.default_controller.ReadAttribute(self.dut_node_id, [(Clusters.Descriptor)])
endpoints = list(wildcard_descriptor.keys())
endpoints.sort()
self.print_step(1, "Send Invoke to unsupported endpoint")
# First non-existent endpoint is where the index and and endpoint number don't match
non_existent_endpoint = next(i for i, e in enumerate(endpoints + [None]) if i != e)
# General Commissioning cluster should be supported on all DUTs, so it will recognize this cluster and
# command, but it is sent on an unsupported endpoint
cmd = Clusters.GeneralCommissioning.Commands.ArmFailSafe(expiryLengthSeconds=0, breadcrumb=1)
try:
await self.default_controller.SendCommand(nodeid=self.dut_node_id, endpoint=non_existent_endpoint, payload=cmd)
asserts.fail("Unexpected success return from sending command to unsupported endpoint")
except InteractionModelError as e:
asserts.assert_equal(e.status, Status.UnsupportedEndpoint, "Unexpected error returned from unsupported endpoint")
self.print_step(2, "Send Invoke to unsupported cluster")
all_cluster_ids = list(Clusters.ClusterObjects.ALL_CLUSTERS.keys())
unsupported_clusters: dict[int, list[int]] = {}
supported_clusters: dict[int, list[int]] = {}
for i in endpoints:
dut_ep_cluster_ids = wildcard_descriptor[i][Clusters.Descriptor][Clusters.Descriptor.Attributes.ServerList]
unsupported_clusters[i] = list(set(all_cluster_ids) - set(dut_ep_cluster_ids))
# We only want to consider the set of standard clusters as "supported clusters", so use the intersection
supported_clusters[i] = set(dut_ep_cluster_ids).intersection(set(all_cluster_ids))
# This is really unlikely to happen on any real product, so we're going to assert here if we can't find anything
# since it's likely a test error
asserts.assert_true(any(unsupported_clusters[i] for i in endpoints),
"Unable to find any unsupported clusters on any endpoint")
asserts.assert_true(any(supported_clusters[i] for i in endpoints), "Unable to find supported clusters on any endpoint")
sent = False
for i in endpoints:
if sent:
break
for cid in unsupported_clusters[i]:
cluster = Clusters.ClusterObjects.ALL_CLUSTERS[cid]
members = get_all_cmds_for_cluster_id(cid)
if not members:
continue
# just use the first command with default values
name, cmd = members[0]
logging.info(f'Sending {name} command to unsupported cluster {cluster} on endpoint {i}')
try:
await self.default_controller.SendCommand(nodeid=self.dut_node_id, endpoint=i, payload=cmd())
asserts.fail("Unexpected success return from sending command to unsupported cluster")
except InteractionModelError as e:
asserts.assert_equal(e.status, Status.UnsupportedCluster, "Unexpected error returned from unsupported cluster")
sent = True
break
asserts.assert_true(sent, "Unable to find unsupported cluster with commands on any supported endpoint")
self.print_step(3, "Send Invoke for unsupported command")
# First read all the supported commands by wildcard reading the AcceptedCommands attribute from all clusters
# We can't wildcard across clusters even if the attribute is the same, so we're going to go 1 by 1.
# Just go endpoint by endpoint so we can early exit (each supports different clusters)
# TODO: add option to make this a beefier test that does all the commands?
sent = False
for i in endpoints:
if sent:
break
for cid in supported_clusters[i]:
cluster = Clusters.ClusterObjects.ALL_CLUSTERS[cid]
logging.info(f'Checking cluster {cluster} ({cid}) on ep {i} for supported commands')
members = get_all_cmds_for_cluster_id(cid)
if not members:
continue
dut_supported_ids = await self.read_single_attribute_check_success(cluster=cluster, endpoint=i, attribute=cluster.Attributes.AcceptedCommandList)
all_supported_cmds = list(filter(None, [client_cmd(x[1]) for x in members]))
all_supported_ids = [x.command_id for x in all_supported_cmds]
unsupported_commands = list(set(all_supported_ids) - set(dut_supported_ids))
if not unsupported_commands:
continue
# Let's just use the first unsupported command
id = unsupported_commands[0]
cmd = next(filter(lambda x: x.command_id == id, all_supported_cmds))
try:
ret = await self.default_controller.SendCommand(nodeid=self.dut_node_id, endpoint=i, payload=cmd())
asserts.fail(f'Unexpected success sending unsupported cmd {cmd} to {cluster} cluster on ep {i}')
except InteractionModelError as e:
asserts.assert_equal(e.status, Status.UnsupportedCommand, "Unexpected error returned from unsupported command")
sent = True
break
# It might actually be the case that all the supported clusters support all the commands. In that case, let's just put a warning.
# We could, in theory, send a command with a fully out of bounds command ID, but that's not supported by the controller
if not sent:
logging.warning("Unable to find a supported cluster with unsupported commands on any endpoint - SKIPPING")
self.print_step(4, "Setup TH to have no privileges for a cluster, send Invoke")
# Setup the ACL
acl_only = Clusters.AccessControl.Structs.AccessControlEntryStruct(
privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kAdminister,
authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase,
subjects=[self.matter_test_config.controller_node_id],
targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0, cluster=Clusters.AccessControl.id)])
result = await self.default_controller.WriteAttribute(self.dut_node_id, [(0, Clusters.AccessControl.Attributes.Acl([acl_only]))])
asserts.assert_equal(result[0].Status, Status.Success, "ACL write failed")
# For the unsupported access test, let's use a cluster that's known to be there and supports commands - general commissioning on EP0
cmd = Clusters.GeneralCommissioning.Commands.ArmFailSafe(expiryLengthSeconds=0, breadcrumb=1)
try:
await self.default_controller.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd)
asserts.fail("Unexpected success return when sending a command with no privileges")
except InteractionModelError as e:
asserts.assert_equal(e.status, Status.UnsupportedAccess, "Unexpected error returned")
full_access = Clusters.AccessControl.Structs.AccessControlEntryStruct(
privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kAdminister,
authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase,
subjects=[self.matter_test_config.controller_node_id],
targets=[])
result = await self.default_controller.WriteAttribute(self.dut_node_id, [(0, Clusters.AccessControl.Attributes.Acl([full_access]))])
asserts.assert_equal(result[0].Status, Status.Success, "ACL write failed")
self.print_step(5, "setup TH with no accessing fabric and invoke command")
# The only way to have no accessing fabric is to have a PASE session and no added NOC
# KeySetRead - fabric scoped command, should not be accessible over PASE
# To get a PASE session, we need an open commissioning window
discriminator = random.randint(0, 4095)
params = self.default_controller.OpenCommissioningWindow(
nodeid=self.dut_node_id, timeout=600, iteration=10000, discriminator=discriminator, option=1)
# TH2 = new controller that's not connected over CASE
new_certificate_authority = self.certificate_authority_manager.NewCertificateAuthority()
new_fabric_admin = new_certificate_authority.NewFabricAdmin(vendorId=0xFFF1, fabricId=self.matter_test_config.fabric_id + 1)
TH2 = new_fabric_admin.NewController(nodeId=112233)
devices = TH2.DiscoverCommissionableNodes(
filterType=Discovery.FilterType.LONG_DISCRIMINATOR, filter=discriminator, stopOnFirst=False)
# For some reason, the devices returned here aren't filtered, so filter ourselves
device = next(filter(lambda d: d.commissioningMode == 2 and d.longDiscriminator == discriminator, devices))
for a in device.addresses:
try:
TH2.EstablishPASESessionIP(ipaddr=a, setupPinCode=params.setupPinCode,
nodeid=self.dut_node_id+1, port=device.port)
break
except ChipStackError:
continue
try:
TH2.GetConnectedDeviceSync(nodeid=self.dut_node_id+1, allowPASE=True, timeoutMs=1000)
except TimeoutError:
asserts.fail("Unable to establish a PASE session to the device")
try:
# Any group ID is fine since we'll fail before this
await TH2.SendCommand(nodeid=self.dut_node_id + 1, endpoint=0, payload=Clusters.GroupKeyManagement.Commands.KeySetRead(groupKeySetID=0x0001))
asserts.fail("Incorrectly received a success response from a fabric-scoped command")
except InteractionModelError as e:
asserts.assert_equal(e.status, Status.UnsupportedAccess, "Incorrect error from fabric-sensitive read over PASE")
# Cleanup - RevokeCommissioning so we can use ArmFailSafe etc. again.
await self.default_controller.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=Clusters.AdministratorCommissioning.Commands.RevokeCommissioning(), timedRequestTimeoutMs=6000)
self.print_step(6, "Send invoke request with requires a data response")
# ArmFailSafe sends a data response
cmd = Clusters.GeneralCommissioning.Commands.ArmFailSafe(expiryLengthSeconds=900, breadcrumb=1)
ret = await self.default_controller.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd)
asserts.assert_true(type_matches(ret, Clusters.GeneralCommissioning.Commands.ArmFailSafeResponse),
"Unexpected response type from ArmFailSafe")
self.print_step(7, "Send a command with suppress Response")
# NOTE: This is out of scope currently due to https://github.com/project-chip/connectedhomeip/issues/8043
# We perform this step, but the DUT will likely incorrectly send a response
# Sending this command at least ensures the DUT doesn't crash with this flag set, even if the behvaior is not correct
# Lucky candidate ArmFailSafe is at it again - command side effect is to set breadcrumb attribute
cmd = Clusters.GeneralCommissioning.Commands.ArmFailSafe(expiryLengthSeconds=900, breadcrumb=2)
await self.default_controller.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd, suppressResponse=True)
# TODO: Once the above issue is resolved, this needs a check to ensure that no response was received.
# Verify that the command had the correct side effect even if a response was sent
breadcrumb = await self.read_single_attribute_check_success(
cluster=Clusters.GeneralCommissioning, attribute=Clusters.GeneralCommissioning.Attributes.Breadcrumb, endpoint=0)
asserts.assert_equal(breadcrumb, 2, "Breadcrumb was not correctly set on ArmFailSafe with response suppressed")
# Cleanup - Unset the failsafe
cmd = Clusters.GeneralCommissioning.Commands.ArmFailSafe(expiryLengthSeconds=0, breadcrumb=0)
await self.default_controller.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd)
self.print_step(8, "Send Invoke with timedRequest marked, but no timed request sent")
# We can do this with any command, but to be thorough, test first with a command that does not
# require a timed interaction (ArmFailSafe) and then one that does (RevokeCommissioning)
try:
await self.default_controller.TestOnlySendCommandTimedRequestFlagWithNoTimedInvoke(nodeid=self.dut_node_id, endpoint=0, payload=cmd)
asserts.fail("Unexpected success response from sending an Invoke with TimedRequest flag and no timed interaction")
except InteractionModelError as e:
asserts.assert_equal(e.status, Status.TimedRequestMismatch,
"Unexpected error response from Invoke with TimedRequest flag and no TimedInvoke")
# Try with RevokeCommissioning
# First open a commissioning window for us to revoke, so we know this command is able to succeed absent this error
_ = self.default_controller.OpenCommissioningWindow(
nodeid=self.dut_node_id, timeout=600, iteration=10000, discriminator=discriminator, option=1)
cmd = FakeRevokeCommissioning()
try:
await self.default_controller.TestOnlySendCommandTimedRequestFlagWithNoTimedInvoke(nodeid=self.dut_node_id, endpoint=0, payload=cmd)
asserts.fail("Unexpected success response from sending an Invoke with TimedRequest flag and no timed interaction")
except InteractionModelError as e:
asserts.assert_equal(e.status, Status.TimedRequestMismatch,
"Unexpected error response from Invoke with TimedRequest flag and no TimedInvoke")
self.print_step(9, "Send invoke for a command that requires timedRequest, but doesn't use one")
# RevokeCommissioning requires a timed interaction. This is enforced in the python layer because
# the generated class indicates that a timed interaction is required. The fake class overrides this.
try:
await self.default_controller.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=cmd)
asserts.fail("Incorrectly received a success response for a command that required TimedInvoke action")
except InteractionModelError as e:
asserts.assert_equal(e.status, Status.NeedsTimedInteraction)
# Cleanup - actually revoke commissioning to close the open window
await self.default_controller.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=Clusters.AdministratorCommissioning.Commands.RevokeCommissioning(), timedRequestTimeoutMs=6000)
if __name__ == "__main__":
default_matter_test_main()