| import logging |
| from copy import deepcopy |
| from enum import Enum, auto |
| from typing import Optional |
| |
| import chip.clusters as Clusters |
| from basic_composition_support import BasicCompositionTests |
| from chip.interaction_model import Status |
| from chip.tlv import uint |
| from global_attribute_ids import GlobalAttributeIds |
| from matter_testing_support import (AttributePathLocation, ClusterPathLocation, MatterBaseTest, TestStep, async_test_body, |
| default_matter_test_main) |
| from spec_parsing_support import XmlCluster, build_xml_clusters |
| |
| |
| class AccessTestType(Enum): |
| READ = auto() |
| WRITE = auto() |
| |
| |
| def step_number_with_privilege(step: int, substep: str, privilege: Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum) -> str: |
| return f'{step}{substep}_{privilege.name}' |
| |
| |
| def operation_allowed(spec_requires: Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum, |
| acl_set_to: Optional[Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum]) -> bool: |
| ''' Determines if the action is allowed on the device based on the spec_requirements and the current ACL privilege granted. |
| |
| The spec parsing uses kUnknownEnumValue to indicate that NO access is allowed for this attribute |
| or command (ex. command with no write access). |
| ACL uses None to indicate that no access has been granted to this controller. |
| In both of these cases, the action is disallowed. In all other cases, access is allowed if the ACL |
| grants a privilege at or above the privilege required in the spec. |
| ''' |
| if spec_requires == Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kUnknownEnumValue or acl_set_to is None: |
| return False |
| return spec_requires <= acl_set_to |
| |
| |
| def checkable_attributes(cluster_id, cluster, xml_cluster) -> list[uint]: |
| all_attrs = cluster[GlobalAttributeIds.ATTRIBUTE_LIST_ID] |
| |
| def known_cluster_attribute(attribute_id) -> bool: |
| ''' Returns true if this is a non-manufacturer specific attribute that has information in the XML and has python codegen data''' |
| return attribute_id <= 0xFFFF and attribute_id in xml_cluster.attributes and attribute_id in Clusters.ClusterObjects.ALL_ATTRIBUTES[cluster_id] |
| return [x for x in all_attrs if known_cluster_attribute(x)] |
| |
| |
| class AccessChecker(MatterBaseTest, BasicCompositionTests): |
| @async_test_body |
| async def setup_class(self): |
| # TODO: Make this into a proper default in the class so we're not overriding the command lines |
| self.user_params["use_pase_only"] = False |
| super().setup_class() |
| await self.setup_class_helper() |
| self.xml_clusters, self.problems = build_xml_clusters() |
| acl_attr = Clusters.AccessControl.Attributes.Acl |
| self.default_acl = await self.read_single_attribute_check_success(cluster=Clusters.AccessControl, attribute=acl_attr) |
| self._record_errors() |
| # We need to run this test from two controllers so we can test access to the ACL cluster while retaining access to the ACL cluster |
| fabric_admin = self.certificate_authority_manager.activeCaList[0].adminList[0] |
| self.TH2_nodeid = self.matter_test_config.controller_node_id + 1 |
| self.TH2 = fabric_admin.NewController(nodeId=self.TH2_nodeid) |
| |
| # Both the tests in this suite are potentially long-running if there are a large number of attributes on the DUT |
| # and the network is slow. Set the default to 3 minutes to account for this. |
| @property |
| def default_timeout(self) -> int: |
| return 180 |
| |
| @async_test_body |
| async def setup_test(self): |
| super().setup_test() |
| self.success = True |
| |
| @async_test_body |
| async def teardown_test(self): |
| await self.default_controller.WriteAttribute(self.dut_node_id, attributes=[ |
| (0, Clusters.AccessControl.Attributes.Acl(self.default_acl))]) |
| |
| async def _setup_acl(self, privilege: Optional[Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum]): |
| if privilege is None: |
| return |
| new_acl = deepcopy(self.default_acl) |
| new_entry = Clusters.AccessControl.Structs.AccessControlEntryStruct( |
| privilege=privilege, authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase, subjects=[self.TH2_nodeid]) |
| new_acl.append(new_entry) |
| await self.default_controller.WriteAttribute(self.dut_node_id, attributes=[(0, Clusters.AccessControl.Attributes.Acl(new_acl))]) |
| |
| def _record_errors(self): |
| ''' Checks through all the endpoints and records all the spec warnings in one go so we don't get repeats''' |
| all_clusters = set() |
| attrs: dict[uint, set()] = {} |
| |
| for endpoint_id, endpoint in self.endpoints_tlv.items(): |
| all_clusters |= set(endpoint.keys()) |
| for cluster_id, device_cluster_data in endpoint.items(): |
| # Find all the attributes for this cluster across all endpoint |
| if cluster_id not in attrs: |
| attrs[cluster_id] = set() |
| # discard MEI attributes as we do not have access information for them. |
| attrs[cluster_id].update( |
| set([id for id in device_cluster_data[GlobalAttributeIds.ATTRIBUTE_LIST_ID] if id <= 0xFFFF])) |
| |
| # Remove MEI clusters - we don't have information available to check these. |
| all_clusters = [id for id in all_clusters if id <= 0x7FFF] |
| for cluster_id in all_clusters: |
| location = ClusterPathLocation(endpoint_id=0, cluster_id=cluster_id) |
| if cluster_id not in self.xml_clusters: |
| # TODO: Upgrade from warning when the spec XML stabilizes |
| self.record_warning(test_name="Access Checker", location=location, problem="Cluster not present in spec data") |
| continue |
| if cluster_id not in Clusters.ClusterObjects.ALL_ATTRIBUTES: |
| self.record_error(test_name="Access Checker", location=location, problem="Unknown cluster") |
| self.success = False |
| continue |
| |
| # check that we have information for all the required attributes |
| xml_cluster = self.xml_clusters[cluster_id] |
| for attribute_id in attrs[cluster_id]: |
| location = AttributePathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, attribute_id=attribute_id) |
| if attribute_id not in xml_cluster.attributes: |
| self.record_warning(test_name="Access Checker", location=location, |
| problem="Cluster attribute not found in spec XML") |
| continue |
| if attribute_id not in Clusters.ClusterObjects.ALL_ATTRIBUTES[cluster_id]: |
| self.record_error(test_name="Access Checker", location=location, |
| problem="Unknown attribute") |
| self.success = False |
| continue |
| |
| async def _run_read_access_test_for_cluster_privilege(self, endpoint_id, cluster_id, device_cluster_data, xml_cluster: XmlCluster, privilege: Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum): |
| # TODO: This assumes all attributes are readable. Which they are currently. But we don't have a general way to mark otherwise. |
| for attribute_id in checkable_attributes(cluster_id, device_cluster_data, xml_cluster): |
| spec_requires = xml_cluster.attributes[attribute_id].read_access |
| attribute = Clusters.ClusterObjects.ALL_ATTRIBUTES[cluster_id][attribute_id] |
| cluster_class = Clusters.ClusterObjects.ALL_CLUSTERS[cluster_id] |
| |
| if operation_allowed(spec_requires, privilege): |
| ret = await self.read_single_attribute_check_success(dev_ctrl=self.TH2, endpoint=endpoint_id, cluster=cluster_class, attribute=attribute, assert_on_error=False, test_name=f"Read access Checker - {privilege}") |
| if ret is None: |
| self.success = False |
| else: |
| ret = await self.read_single_attribute_expect_error(dev_ctrl=self.TH2, endpoint=endpoint_id, cluster=cluster_class, attribute=attribute, error=Status.UnsupportedAccess, assert_on_error=False, test_name=f"Read access Checker - {privilege}") |
| if ret is None: |
| self.success = False |
| |
| async def _run_write_access_test_for_cluster_privilege(self, endpoint_id, cluster_id, cluster, xml_cluster: XmlCluster, privilege: Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum, wildcard_read): |
| for attribute_id in checkable_attributes(cluster_id, cluster, xml_cluster): |
| spec_requires = xml_cluster.attributes[attribute_id].write_access |
| is_optional_write = xml_cluster.attributes[attribute_id].write_optional |
| |
| attribute = Clusters.ClusterObjects.ALL_ATTRIBUTES[cluster_id][attribute_id] |
| cluster_class = Clusters.ClusterObjects.ALL_CLUSTERS[cluster_id] |
| location = AttributePathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, attribute_id=attribute_id) |
| test_name = f'Write access checker - {privilege}' |
| logging.info(f"Testing attribute {attribute} on endpoint {endpoint_id}") |
| if attribute == Clusters.AccessControl.Attributes.Acl and privilege == Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kAdminister: |
| logging.info("Skipping ACL attribute check for admin privilege as this is known to be writeable and is being used for this test") |
| continue |
| |
| # Because we read everything with admin, we should have this in the wildcard read |
| # This will only not work if we end up with write-only attributes. We do not currently have any of these. |
| val = wildcard_read.attributes[endpoint_id][cluster_class][attribute] |
| if isinstance(val, list): |
| # Use an empty list for writes in case the list is large and does not fit |
| val = [] |
| |
| resp = await self.TH2.WriteAttribute(nodeid=self.dut_node_id, attributes=[(endpoint_id, attribute(val))]) |
| if spec_requires == Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kUnknownEnumValue: |
| # not writeable - expect an unsupported write response |
| if resp[0].Status != Status.UnsupportedWrite: |
| self.record_error(test_name=test_name, location=location, |
| problem=f"Unexpected error writing non-writeable attribute - expected Unsupported Write, got {resp[0].Status}") |
| self.success = False |
| elif is_optional_write and resp[0].Status == Status.UnsupportedWrite: |
| # unsupported optional writeable attribute - this is fine, no error |
| continue |
| elif operation_allowed(spec_requires, privilege): |
| # Write the default attribute. We don't care if this fails, as long as it fails with a DIFFERENT error than the access |
| # This is OK because access is required to be checked BEFORE any other thing to avoid leaking device information. |
| # For example, because we don't have any range information, we might be writing an out of range value, but this will |
| # get rejected by the ACL check before the range check. |
| # See section: 8.4.3.2 |
| if resp[0].Status == Status.UnsupportedAccess: |
| self.record_error(test_name=test_name, location=location, |
| problem="Unexpected UnsupportedAccess writing attribute") |
| self.success = False |
| else: |
| if resp[0].Status != Status.UnsupportedAccess: |
| self.record_error(test_name=test_name, location=location, |
| problem=f"Unexpected error writing attribute - expected Unsupported Access, got {resp[0].Status}") |
| self.success = False |
| |
| if resp[0].Status == Status.Success and isinstance(val, list): |
| # Reset the value to the original if we managed to write an empty list |
| val = wildcard_read.attributes[endpoint_id][cluster_class][attribute] |
| await self.TH2.WriteAttribute(nodeid=self.dut_node_id, attributes=[(endpoint_id, attribute(val))]) |
| |
| async def run_access_test(self, test_type: AccessTestType): |
| # Step precondition, 1 and 2 are handled in the class setup, but need to be marked for every test |
| self.step("precondition") |
| self.step(1) |
| self.step(2) |
| # Read all the attributes on TH2 using admin access |
| check_step = 3 |
| if test_type == AccessTestType.WRITE: |
| self.step(3) |
| await self._setup_acl(privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kAdminister) |
| self.step(4) |
| wildcard_read = await self.TH2.Read(self.dut_node_id, [()]) |
| check_step = 5 |
| |
| self.step(check_step) |
| enum = Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum |
| privilege_enum = [p for p in enum if p != enum.kUnknownEnumValue] |
| for privilege in privilege_enum: |
| logging.info(f"Testing for {privilege}") |
| self.step(step_number_with_privilege(check_step, 'a', privilege)) |
| await self._setup_acl(privilege=privilege) |
| self.step(step_number_with_privilege(check_step, 'b', privilege)) |
| for endpoint_id, endpoint in self.endpoints_tlv.items(): |
| for cluster_id, device_cluster_data in endpoint.items(): |
| if cluster_id > 0x7FFF or cluster_id not in self.xml_clusters or cluster_id not in Clusters.ClusterObjects.ALL_ATTRIBUTES: |
| # These cases have already been recorded by the _record_errors function |
| continue |
| xml_cluster = self.xml_clusters[cluster_id] |
| if test_type == AccessTestType.READ: |
| await self._run_read_access_test_for_cluster_privilege(endpoint_id, cluster_id, device_cluster_data, xml_cluster, privilege) |
| elif test_type == AccessTestType.WRITE: |
| await self._run_write_access_test_for_cluster_privilege(endpoint_id, cluster_id, device_cluster_data, xml_cluster, privilege, wildcard_read) |
| else: |
| self.fail_current_test("Unsupported test type") |
| if not self.success: |
| self.fail_current_test("One or more access violations was found") |
| |
| def steps_TC_ACE_2_1(self): |
| steps = [TestStep("precondition", "DUT is commissioned", is_commissioning=True), |
| TestStep(1, "TH_commissioner performs a wildcard read"), |
| TestStep(2, "TH_commissioner reads the ACL attribute"), |
| TestStep(3, "Repeat steps 3a and 3b for each permission level")] |
| enum = Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum |
| privilege_enum = [p for p in enum if p != enum.kUnknownEnumValue] |
| for p in privilege_enum: |
| steps.append(TestStep(step_number_with_privilege(3, 'a', p), |
| "TH_commissioner gives TH_second_commissioner the specified privilege")) |
| steps.append(TestStep(step_number_with_privilege(3, 'b', p), |
| "TH_second_controller reads all the attributes and checks for appropriate permission errors")) |
| return steps |
| |
| def desc_TC_ACE_2_1(self): |
| return "[TC-ACE-2.1] Attribute read privilege enforcement - [DUT as Server]" |
| |
| @async_test_body |
| async def test_TC_ACE_2_1(self): |
| await self.run_access_test(AccessTestType.READ) |
| |
| def steps_TC_ACE_2_2(self): |
| steps = [TestStep("precondition", "DUT is commissioned", is_commissioning=True), |
| TestStep(1, "TH_commissioner performs a wildcard read"), |
| TestStep(2, "TH_commissioner reads the ACL attribute"), |
| TestStep(3, "TH_commissioner grants TH_second_controller admin permission"), |
| TestStep(4, "TH_second_controller performs a wildcard read"), |
| TestStep(5, "Repeat steps 5a and 5b for each permission level")] |
| enum = Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum |
| privilege_enum = [p for p in enum if p != enum.kUnknownEnumValue] |
| for p in privilege_enum: |
| steps.append(TestStep(step_number_with_privilege(5, 'a', p), |
| "TH_commissioner gives TH_second_commissioner the specified privilege")) |
| steps.append(TestStep(step_number_with_privilege(5, 'b', p), |
| "TH_second_commissioner writes all the attributes and checks for appropriate permission errors")) |
| return steps |
| |
| def desc_TC_ACE_2_2(self): |
| return "[TC-ACE-2.2] Attribute write privilege enforcement - [DUT as Server]" |
| |
| @async_test_body |
| async def test_TC_ACE_2_2(self): |
| await self.run_access_test(AccessTestType.WRITE) |
| |
| |
| if __name__ == "__main__": |
| default_matter_test_main() |