blob: 9df92fb17e9d689785822e8ce4bb7ceda45c0d82 [file] [log] [blame]
# See https://github.com/project-chip/connectedhomeip/blob/master/docs/testing/python.md#defining-the-ci-test-arguments
# for details about the block below.
#
# These are separated into different runs because the logs for these tests are HUGE. The attribute one individually
# reads every attribute on every cluster 4 times. If there's a failure, having these in separate runs makes it significantly
# easier to navigate the logs
#
# === BEGIN CI TEST ARGUMENTS ===
# test-runner-runs:
# run1:
# app: ${ALL_CLUSTERS_APP}
# factory-reset: true
# quiet: true
# app-args: --discriminator 1234 --KVS kvs1 --trace-to json:${TRACE_APP}.json
# script-args: >
# --storage-path admin_storage.json
# --commissioning-method on-network
# --discriminator 1234
# --passcode 20202021
# --trace-to json:${TRACE_TEST_JSON}.json
# --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto
# --tests test_TC_ACE_2_1
# run2:
# app: ${ALL_CLUSTERS_APP}
# factory-reset: true
# quiet: true
# app-args: --discriminator 1234 --KVS kvs1 --trace-to json:${TRACE_APP}.json
# script-args: >
# --storage-path admin_storage.json
# --commissioning-method on-network
# --discriminator 1234
# --passcode 20202021
# --trace-to json:${TRACE_TEST_JSON}.json
# --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto
# --tests test_TC_ACE_2_2
# run3:
# app: ${ALL_CLUSTERS_APP}
# factory-reset: true
# quiet: true
# app-args: --discriminator 1234 --KVS kvs1 --trace-to json:${TRACE_APP}.json
# script-args: >
# --storage-path admin_storage.json
# --commissioning-method on-network
# --discriminator 1234
# --passcode 20202021
# --trace-to json:${TRACE_TEST_JSON}.json
# --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto
# --bool-arg ci_only_linux_skip_ota_cluster_disallowed_for_certification:True
# --tests test_TC_ACE_2_3
# === END CI TEST ARGUMENTS ===
import logging
from copy import deepcopy
from enum import Enum, auto
from typing import Optional
import chip.clusters as Clusters
from chip.clusters.Attribute import ValueDecodeFailure
from chip.interaction_model import InteractionModelError, Status
from chip.testing.basic_composition import BasicCompositionTests
from chip.testing.global_attribute_ids import (GlobalAttributeIds, is_standard_attribute_id, is_standard_cluster_id,
is_standard_command_id)
from chip.testing.matter_testing import (AttributePathLocation, ClusterPathLocation, CommandPathLocation, MatterBaseTest, TestStep,
async_test_body, default_matter_test_main)
from chip.testing.spec_parsing import XmlCluster
from chip.tlv import uint
class AccessTestType(Enum):
READ = auto()
WRITE = auto()
INVOKE = auto()
def step_number_with_privilege(step: int, substep: str, privilege: Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum) -> str:
return f'{step}{substep}_{privilege.name}'
def operation_allowed(spec_requires: Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum,
acl_set_to: Optional[Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum]) -> bool:
''' Determines if the action is allowed on the device based on the spec_requirements and the current ACL privilege granted.
The spec parsing uses kUnknownEnumValue to indicate that NO access is allowed for this attribute
or command (ex. command with no write access).
ACL uses None to indicate that no access has been granted to this controller.
In both of these cases, the action is disallowed. In all other cases, access is allowed if the ACL
grants a privilege at or above the privilege required in the spec.
'''
if spec_requires == Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kUnknownEnumValue or acl_set_to is None:
return False
return spec_requires <= acl_set_to
def checkable_attributes(cluster_id, cluster, xml_cluster) -> list[uint]:
all_attrs = cluster[GlobalAttributeIds.ATTRIBUTE_LIST_ID]
def is_known_cluster_attribute(attribute_id) -> bool:
''' Returns true if this is a non-manufacturer specific attribute that has information in the XML and has python codegen data'''
return is_standard_attribute_id(attribute_id) and attribute_id in xml_cluster.attributes and attribute_id in Clusters.ClusterObjects.ALL_ATTRIBUTES[cluster_id]
return [attr_id for attr_id in all_attrs if is_known_cluster_attribute(attr_id)]
def checkable_commands(cluster_id, cluster, xml_cluster) -> list[uint]:
all_cmds = cluster[GlobalAttributeIds.ACCEPTED_COMMAND_LIST_ID]
def is_known_cluster_cmd(command_id) -> bool:
''' Returns true if this is a non-manufacturer specific command that has information in the XML and has python codegen data'''
return is_standard_command_id(command_id) and command_id in xml_cluster.accepted_commands and command_id in Clusters.ClusterObjects.ALL_ACCEPTED_COMMANDS[cluster_id]
return [cmd_id for cmd_id in all_cmds if is_known_cluster_cmd(cmd_id)]
class AccessChecker(MatterBaseTest, BasicCompositionTests):
@async_test_body
async def setup_class(self):
# TODO: Make this into a proper default in the class so we're not overriding the command lines
self.user_params["use_pase_only"] = False
super().setup_class()
await self.setup_class_helper()
self.build_spec_xmls()
acl_attr = Clusters.AccessControl.Attributes.Acl
self.default_acl = await self.read_single_attribute_check_success(endpoint=0, cluster=Clusters.AccessControl, attribute=acl_attr)
self._record_errors()
# We need to run this test from two controllers so we can test access to the ACL cluster while retaining access to the ACL cluster
fabric_admin = self.certificate_authority_manager.activeCaList[0].adminList[0]
self.TH2_nodeid = self.matter_test_config.controller_node_id + 1
self.TH2 = fabric_admin.NewController(nodeId=self.TH2_nodeid)
# Both the tests in this suite are potentially long-running if there are a large number of attributes on the DUT
# and the network is slow. Set the default to 3 minutes to account for this.
@property
def default_timeout(self) -> int:
return 180
@async_test_body
async def setup_test(self):
super().setup_test()
self.success = True
@async_test_body
async def teardown_test(self):
await self.default_controller.WriteAttribute(self.dut_node_id, attributes=[
(0, Clusters.AccessControl.Attributes.Acl(self.default_acl))])
async def _setup_acl(self, privilege: Optional[Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum]):
if privilege is None:
return
new_acl = deepcopy(self.default_acl)
new_entry = Clusters.AccessControl.Structs.AccessControlEntryStruct(
privilege=privilege, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, subjects=[self.TH2_nodeid])
new_acl.append(new_entry)
await self.default_controller.WriteAttribute(self.dut_node_id, attributes=[(0, Clusters.AccessControl.Attributes.Acl(new_acl))])
def _record_errors(self):
''' Checks through all the endpoints and records all the spec warnings in one go so we don't get repeats'''
all_clusters = set()
attrs: dict[uint, set] = {}
cmds: dict[uint, set] = {}
for endpoint_id, endpoint in self.endpoints_tlv.items():
all_clusters |= set(endpoint.keys())
for cluster_id, device_cluster_data in endpoint.items():
# Find all the attributes for this cluster across all endpoint
if cluster_id not in attrs:
attrs[cluster_id] = set()
if cluster_id not in cmds:
cmds[cluster_id] = set()
# discard MEI attributes as we do not have access information for them.
attrs[cluster_id].update(
set([id for id in device_cluster_data[GlobalAttributeIds.ATTRIBUTE_LIST_ID] if is_standard_attribute_id(id)]))
cmds[cluster_id].update(
set([id for id in device_cluster_data[GlobalAttributeIds.ACCEPTED_COMMAND_LIST_ID] if is_standard_command_id(id)]))
# Remove MEI clusters - we don't have information available to check these.
all_clusters = [id for id in all_clusters if is_standard_cluster_id(id)]
for cluster_id in all_clusters:
location = ClusterPathLocation(endpoint_id=0, cluster_id=cluster_id)
if cluster_id not in self.xml_clusters:
self.record_error(test_name="Access Checker", location=location, problem="Cluster not present in spec data")
self.success = False
continue
if cluster_id not in Clusters.ClusterObjects.ALL_ATTRIBUTES:
self.record_error(test_name="Access Checker", location=location, problem="Unknown cluster")
self.success = False
continue
# check that we have information for all the required attributes
xml_cluster = self.xml_clusters[cluster_id]
for attribute_id in attrs[cluster_id]:
location = AttributePathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, attribute_id=attribute_id)
if attribute_id not in xml_cluster.attributes.keys():
self.record_error(test_name="Access Checker", location=location,
problem="Cluster attribute not found in spec XML")
self.success = False
continue
if attribute_id not in Clusters.ClusterObjects.ALL_ATTRIBUTES[cluster_id]:
self.record_error(test_name="Access Checker", location=location,
problem="Unknown attribute")
self.success = False
continue
# Check that we have information for all the required commands
for command_id in cmds[cluster_id]:
location = CommandPathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, command_id=command_id)
if command_id not in xml_cluster.accepted_commands.keys():
self.record_error(test_name="Access Checker", location=location,
problem="Cluster command not found in spec XML")
self.success = False
continue
if command_id not in Clusters.ClusterObjects.ALL_ACCEPTED_COMMANDS[cluster_id]:
self._record_error(test_name="Access Checker", location=location,
problem="Unknown command")
self.success = False
continue
async def _maybe_run_command_access_test_for_cluster_privilege(self, endpoint_id, cluster_id, device_cluster_data, xml_cluster: XmlCluster, privilege: Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum):
""" Runs a command only if the required cluster privilege is HIGHER than the specified privilege. In this way,
no commands are actually run on the device, which means there are no side effects. However, we can differentiate
ACL rejections from commands being unsupported.
"""
ota_exception = self.user_params.get('ci_only_linux_skip_ota_cluster_disallowed_for_certification', False)
if cluster_id == Clusters.OtaSoftwareUpdateRequestor.id and ota_exception:
logging.warn('WARNING: Skipping OTA cluster check for CI. THIS IS DISALLOWED FOR CERTIFICATION')
return
logging.info(f'Testing commands on {xml_cluster.name} at privilege {privilege}')
for command_id in checkable_commands(cluster_id, device_cluster_data, xml_cluster):
spec_requires = xml_cluster.accepted_commands[command_id].privilege
command = Clusters.ClusterObjects.ALL_ACCEPTED_COMMANDS[cluster_id][command_id]
location = CommandPathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, command_id=command_id)
name = f"Command test - privilege {privilege}"
if operation_allowed(spec_requires, privilege):
# In this test, we're only checking that the disallowed commands are rejected so that there are
# no side effects. Commands are checked with admin privilege in their cluster tests. The error that
# may be let through here is if the spec requires operate and the implementation requires admin.
continue
logging.info(
f' Testing command {xml_cluster.accepted_commands[command_id].name} from cluster {xml_cluster.name} - at privilege {privilege}, requires {spec_requires}')
try:
timed = None
if command.must_use_timed_invoke:
# This command requires a timedRequest. Setting the timed value to largest value (unsigned int).
# We're sending the command right away, so this value doesn't matter, but we do need to set a value here to trigger the timed request message.
timed = 65535
await self.send_single_cmd(cmd=command(), dev_ctrl=self.TH2, endpoint=endpoint_id, timedRequestTimeoutMs=timed)
# If this was successful, that's an error
self.record_error(test_name=name, location=location,
problem=f"Unexpected success sending command {command} with privilege {privilege}")
self.success = False
logging.info(' Received unexpected SUCCESS')
except InteractionModelError as e:
if e.status != Status.UnsupportedAccess:
self.record_error(test_name=name, location=location,
problem=f'Unexpected error sending command {command} with privilege {privilege} - expected UNSUPPORTED_ACCESS, got {e.status}')
self.success = False
logging.info(f' Received unexpected error {e}')
else:
logging.info(' Received expected error')
async def _run_read_access_test_for_cluster_privilege(self, endpoint_id, cluster_id, device_cluster_data, xml_cluster: XmlCluster, privilege: Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum):
# TODO: This assumes all attributes are readable. Which they are currently. But we don't have a general way to mark otherwise.
for attribute_id in checkable_attributes(cluster_id, device_cluster_data, xml_cluster):
spec_requires = xml_cluster.attributes[attribute_id].read_access
attribute = Clusters.ClusterObjects.ALL_ATTRIBUTES[cluster_id][attribute_id]
cluster_class = Clusters.ClusterObjects.ALL_CLUSTERS[cluster_id]
if operation_allowed(spec_requires, privilege):
ret = await self.read_single_attribute_check_success(dev_ctrl=self.TH2, endpoint=endpoint_id, cluster=cluster_class, attribute=attribute, assert_on_error=False, test_name=f"Read access Checker - {privilege}")
if ret is None:
self.success = False
else:
ret = await self.read_single_attribute_expect_error(dev_ctrl=self.TH2, endpoint=endpoint_id, cluster=cluster_class, attribute=attribute, error=Status.UnsupportedAccess, assert_on_error=False, test_name=f"Read access Checker - {privilege}")
if ret is None:
self.success = False
async def _run_write_access_test_for_cluster_privilege(self, endpoint_id, cluster_id, cluster, xml_cluster: XmlCluster, privilege: Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum, wildcard_read):
for attribute_id in checkable_attributes(cluster_id, cluster, xml_cluster):
spec_requires = xml_cluster.attributes[attribute_id].write_access
is_optional_write = xml_cluster.attributes[attribute_id].write_optional
attribute = Clusters.ClusterObjects.ALL_ATTRIBUTES[cluster_id][attribute_id]
cluster_class = Clusters.ClusterObjects.ALL_CLUSTERS[cluster_id]
location = AttributePathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, attribute_id=attribute_id)
test_name = f'Write access checker - {privilege}'
logging.info(f"Testing attribute {attribute} on endpoint {endpoint_id}")
if attribute == Clusters.AccessControl.Attributes.Acl and privilege == Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kAdminister:
logging.info("Skipping ACL attribute check for admin privilege as this is known to be writeable and is being used for this test")
continue
# Because we read everything with admin, we should have this in the wildcard read
# This will only not work if we end up with write-only attributes. We do not currently have any of these.
val = wildcard_read.attributes[endpoint_id][cluster_class][attribute]
if isinstance(val, ValueDecodeFailure):
self.record_error(test_name=test_name, location=location,
problem=f"Attribute {attribute} returned a read error {val} - unable to write current value")
self.success = False
continue
if isinstance(val, list):
# Use an empty list for writes in case the list is large and does not fit
val = []
resp = await self.TH2.WriteAttribute(nodeid=self.dut_node_id, attributes=[(endpoint_id, attribute(val))])
if spec_requires == Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kUnknownEnumValue:
# not writeable - expect an unsupported write response
if resp[0].Status != Status.UnsupportedWrite:
self.record_error(test_name=test_name, location=location,
problem=f"Unexpected error writing non-writeable attribute - expected Unsupported Write, got {resp[0].Status}")
self.success = False
elif is_optional_write and resp[0].Status == Status.UnsupportedWrite:
# unsupported optional writeable attribute - this is fine, no error
continue
elif operation_allowed(spec_requires, privilege):
# Write the default attribute. We don't care if this fails, as long as it fails with a DIFFERENT error than the access
# This is OK because access is required to be checked BEFORE any other thing to avoid leaking device information.
# For example, because we don't have any range information, we might be writing an out of range value, but this will
# get rejected by the ACL check before the range check.
# See section: 8.4.3.2
if resp[0].Status == Status.UnsupportedAccess:
self.record_error(test_name=test_name, location=location,
problem="Unexpected UnsupportedAccess writing attribute")
self.success = False
else:
if resp[0].Status != Status.UnsupportedAccess:
self.record_error(test_name=test_name, location=location,
problem=f"Unexpected error writing attribute - expected Unsupported Access, got {resp[0].Status}")
self.success = False
if resp[0].Status == Status.Success and isinstance(val, list):
# Reset the value to the original if we managed to write an empty list
val = wildcard_read.attributes[endpoint_id][cluster_class][attribute]
await self.TH2.WriteAttribute(nodeid=self.dut_node_id, attributes=[(endpoint_id, attribute(val))])
async def run_access_test(self, test_type: AccessTestType):
# Step precondition, 1 and 2 are handled in the class setup, but need to be marked for every test
self.step("precondition")
self.step(1)
self.step(2)
# Read all the attributes on TH2 using admin access
check_step = 3
if test_type == AccessTestType.WRITE:
self.step(3)
await self._setup_acl(privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kAdminister)
self.step(4)
wildcard_read = await self.TH2.Read(self.dut_node_id, [()])
check_step = 5
self.step(check_step)
enum = Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum
privilege_enum = [p for p in enum if p != enum.kUnknownEnumValue]
for privilege in privilege_enum:
logging.info(f"Testing for {privilege}")
self.step(step_number_with_privilege(check_step, 'a', privilege))
await self._setup_acl(privilege=privilege)
self.step(step_number_with_privilege(check_step, 'b', privilege))
for endpoint_id, endpoint in self.endpoints_tlv.items():
for cluster_id, device_cluster_data in endpoint.items():
if not is_standard_cluster_id(cluster_id) or cluster_id not in self.xml_clusters or cluster_id not in Clusters.ClusterObjects.ALL_ATTRIBUTES:
# These cases have already been recorded by the _record_errors function
continue
xml_cluster = self.xml_clusters[cluster_id]
if test_type == AccessTestType.READ:
await self._run_read_access_test_for_cluster_privilege(endpoint_id, cluster_id, device_cluster_data, xml_cluster, privilege)
elif test_type == AccessTestType.WRITE:
await self._run_write_access_test_for_cluster_privilege(endpoint_id, cluster_id, device_cluster_data, xml_cluster, privilege, wildcard_read)
elif test_type == AccessTestType.INVOKE:
await self._maybe_run_command_access_test_for_cluster_privilege(endpoint_id, cluster_id, device_cluster_data, xml_cluster, privilege)
else:
self.fail_current_test("Unsupported test type")
if not self.success:
self.fail_current_test("One or more access violations was found")
def steps_TC_ACE_2_1(self):
steps = [TestStep("precondition", "DUT is commissioned", is_commissioning=True),
TestStep(1, "TH_commissioner performs a wildcard read (done during test setup)"),
TestStep(2, "TH_commissioner reads the ACL attribute (done during test setup)"),
TestStep(3, "Repeat steps 3a and 3b for each permission level")]
enum = Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum
privilege_enum = [p for p in enum if p != enum.kUnknownEnumValue]
for p in privilege_enum:
steps.append(TestStep(step_number_with_privilege(3, 'a', p),
"TH_commissioner gives TH_second_commissioner the specified privilege"))
steps.append(TestStep(step_number_with_privilege(3, 'b', p),
"TH_second_controller reads all the attributes and checks for appropriate permission errors"))
return steps
def desc_TC_ACE_2_1(self):
return "[TC-ACE-2.1] Attribute read privilege enforcement - [DUT as Server]"
@async_test_body
async def test_TC_ACE_2_1(self):
await self.run_access_test(AccessTestType.READ)
def steps_TC_ACE_2_2(self):
steps = [TestStep("precondition", "DUT is commissioned", is_commissioning=True),
TestStep(1, "TH_commissioner performs a wildcard read (done during test setup)"),
TestStep(2, "TH_commissioner reads the ACL attribute (done during test setup)"),
TestStep(3, "TH_commissioner grants TH_second_controller admin permission"),
TestStep(4, "TH_second_controller performs a wildcard read"),
TestStep(5, "Repeat steps 5a and 5b for each permission level")]
enum = Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum
privilege_enum = [p for p in enum if p != enum.kUnknownEnumValue]
for p in privilege_enum:
steps.append(TestStep(step_number_with_privilege(5, 'a', p),
"TH_commissioner gives TH_second_commissioner the specified privilege"))
steps.append(TestStep(step_number_with_privilege(5, 'b', p),
"TH_second_commissioner writes all the attributes and checks for appropriate permission errors"))
return steps
def desc_TC_ACE_2_2(self):
return "[TC-ACE-2.2] Attribute write privilege enforcement - [DUT as Server]"
@async_test_body
async def test_TC_ACE_2_2(self):
await self.run_access_test(AccessTestType.WRITE)
def steps_TC_ACE_2_3(self):
steps = [TestStep("precondition", "DUT is commissioned", is_commissioning=True),
TestStep(1, "TH_commissioner performs a wildcard read (done during test setup)"),
TestStep(2, "TH_commissioner reads the ACL attribute (done during test setup)"),
TestStep(3, "Repeat steps 3a and 3b for each permission level")]
enum = Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum
privilege_enum = [p for p in enum if p != enum.kUnknownEnumValue]
for p in privilege_enum:
steps.append(TestStep(step_number_with_privilege(3, 'a', p),
"TH_commissioner gives TH_second_controller the specified privilege"))
steps.append(TestStep(step_number_with_privilege(3, 'b', p),
"""For each standard command on each standard cluster on each endpoint,
TH_second_controller checks the permission requirements for that command.
If the permission required for the command is HIGHER than the permission level being tested,
TH_second_controller sends the command to the DUT using default values.
Regardless of the command contents, the DUT should return an access error since access must be checked
before the command is processed. Receipt of an UNSUPPORTED_COMMAND error is a conformance failure.""",
"DUT returns UNSUPPORTED_ACCESS error"))
return steps
def desc_TC_ACE_2_3(self):
return "[TC-ACE-2.3] Command Privilege Enforcement - [DUT as Server]"
@async_test_body
async def test_TC_ACE_2_3(self):
await self.run_access_test(AccessTestType.INVOKE)
if __name__ == "__main__":
default_matter_test_main()