| # |
| # Copyright (c) 2023 Project CHIP Authors |
| # All rights reserved. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| import xml.etree.ElementTree as ElementTree |
| |
| import chip.clusters as Clusters |
| from global_attribute_ids import GlobalAttributeIds |
| from matter_testing_support import MatterBaseTest, ProblemNotice, default_matter_test_main |
| from mobly import asserts |
| from spec_parsing_support import (ClusterParser, XmlCluster, add_cluster_data_from_xml, build_xml_clusters, |
| check_clusters_for_unknown_commands, combine_derived_clusters_with_base) |
| |
| # TODO: improve the test coverage here |
| # https://github.com/project-chip/connectedhomeip/issues/30958 |
| |
| CLUSTER_ID = 0x0BEE |
| CLUSTER_NAME = "TestCluster" |
| ATTRIBUTE_NAME = "TestAttribute" |
| ATTRIBUTE_ID = 0x0000 |
| |
| |
| def single_attribute_cluster_xml(read_access: str, write_access: str, write_supported: str): |
| xml_cluster = f'<cluster xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="types types.xsd cluster cluster.xsd" id="{CLUSTER_ID}" name="{CLUSTER_NAME}" revision="3">' |
| revision_table = ('<revisionHistory>' |
| '<revision revision="1" summary="Initial Release"/>' |
| '<revision revision="2" summary="Some other revision"/>' |
| '<revision revision="3" summary="another revision"/>' |
| '</revisionHistory>') |
| id_table = ('<clusterIds>' |
| f'<clusterId id="{CLUSTER_ID}" name="{CLUSTER_NAME}"/>' |
| '</clusterIds>') |
| classification = '<classification hierarchy="base" role="utility" picsCode="TEST" scope="Node"/>' |
| read_access_str = f'read="true" readPrivilege="{read_access}"' if read_access is not None else "" |
| write_access_str = f'write="{write_supported}" writePrivilege="{write_access}"' if write_access is not None else "" |
| attribute = ('<attributes>' |
| f'<attribute id="{ATTRIBUTE_ID}" name="{ATTRIBUTE_NAME}" type="uint16" default="MS">' |
| f'<access {read_access_str} {write_access_str}/>' |
| '<quality changeOmitted="false" nullable="false" scene="false" persistence="fixed" reportable="false"/>' |
| '<mandatoryConform/>' |
| '</attribute>' |
| '</attributes>') |
| |
| return (f'{xml_cluster}' |
| f'{revision_table}' |
| f'{id_table}' |
| f'{classification}' |
| f'{attribute}' |
| '</cluster>') |
| |
| |
| def parse_cluster(xml: str) -> XmlCluster: |
| cluster = ElementTree.fromstring(xml) |
| parser = ClusterParser(cluster, CLUSTER_ID, CLUSTER_NAME) |
| return parser.create_cluster() |
| |
| |
| def get_access_enum_from_string(access_str: str) -> Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum: |
| if access_str == 'view': |
| return Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView |
| if access_str == 'operate': |
| return Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kOperate |
| if access_str == 'manage': |
| return Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kManage |
| if access_str == 'admin': |
| return Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kAdminister |
| if access_str is None: |
| return Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kUnknownEnumValue |
| asserts.fail("Unknown access string") |
| |
| |
| BASE_CLUSTER_XML_STR = ( |
| '<cluster xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="types types.xsd cluster cluster.xsd" id="" name="Test Base" revision="1">' |
| ' <revisionHistory>' |
| ' <revision revision="1" summary="Initial version"/>' |
| ' </revisionHistory>' |
| ' <clusterIds>' |
| ' <clusterId name="Test Base"/>' |
| ' </clusterIds>' |
| ' <classification hierarchy="base" role="application" picsCode="BASE" scope="Endpoint"/>' |
| ' <features>' |
| ' <feature bit="0" code="DEPONOFF" name="OnOff" summary="Dependency with the OnOff cluster">' |
| ' <optionalConform/>' |
| ' </feature>' |
| ' </features>' |
| ' <attributes>' |
| ' <attribute id="0x0000" name="SupportedModes" type="list" default="MS">' |
| ' <entry type="ModeOptionStruct"/>' |
| ' <access read="true" readPrivilege="view"/>' |
| ' <quality changeOmitted="false" nullable="false" scene="false" persistence="fixed" reportable="false"/>' |
| ' <mandatoryConform/>' |
| ' <constraint type="countBetween" from="2" to="255"/>' |
| ' </attribute>' |
| ' <attribute id="0x0001" name="CurrentMode" type="uint8" default="MS">' |
| ' <access read="true" readPrivilege="view"/>' |
| ' <quality changeOmitted="false" nullable="false" scene="true" persistence="nonVolatile" reportable="false"/>' |
| ' <mandatoryConform/>' |
| ' <constraint type="desc"/>' |
| ' </attribute>' |
| ' <attribute id="0x0002" name="StartUpMode" type="uint8" default="MS">' |
| ' <access read="true" write="true" readPrivilege="view" writePrivilege="operate"/>' |
| ' <quality changeOmitted="false" nullable="true" scene="false" persistence="nonVolatile" reportable="false"/>' |
| ' <optionalConform/>' |
| ' <constraint type="desc"/>' |
| ' </attribute>' |
| ' <attribute id="0x0003" name="OnMode" type="uint8" default="null">' |
| ' <access read="true" write="true" readPrivilege="view" writePrivilege="operate"/>' |
| ' <quality changeOmitted="false" nullable="true" scene="false" persistence="nonVolatile" reportable="false"/>' |
| ' <mandatoryConform>' |
| ' <feature name="DEPONOFF"/>' |
| ' </mandatoryConform>' |
| ' <constraint type="desc"/>' |
| ' </attribute>' |
| ' </attributes>' |
| ' <commands>' |
| ' <command id="0x00" name="ChangeToMode" response="ChangeToModeResponse" direction="commandToServer">' |
| ' <access invokePrivilege="operate"/>' |
| ' <mandatoryConform/>' |
| ' <field id="0" name="NewMode" type="uint8">' |
| ' <mandatoryConform/>' |
| ' <constraint type="desc"/>' |
| ' </field>' |
| ' </command>' |
| ' <command id="0x01" name="ChangeToModeResponse" direction="responseFromServer">' |
| ' <access invokePrivilege="operate"/>' |
| ' <mandatoryConform/>' |
| ' <field id="0" name="Status" type="enum8">' |
| ' <enum>' |
| ' <item from="0x00" to="0x3F" name="CommonCodes" summary="Common standard values defined in the generic Mode Base cluster specification.">' |
| ' <mandatoryConform/>' |
| ' </item>' |
| ' </enum>' |
| ' <mandatoryConform/>' |
| ' <constraint type="desc"/>' |
| ' </field>' |
| ' <field id="1" name="StatusText" type="string">' |
| ' <constraint type="maxLength" value="64"/>' |
| ' </field>' |
| ' </command>' |
| ' </commands>' |
| '</cluster>') |
| |
| DERIVED_CLUSTER_XML_STR = ( |
| '<cluster xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="types types.xsd cluster cluster.xsd" id="0xFFFF" name="Test Derived" revision="1">' |
| ' <revisionHistory>' |
| ' <revision revision="1" summary="Initial Release"/>' |
| ' </revisionHistory>' |
| ' <clusterIds>' |
| ' <clusterId id="0xFFFF" name="Test Derived"/>' |
| ' </clusterIds>' |
| ' <classification hierarchy="derived" baseCluster="Test Base" role="application" picsCode="MWOM" scope="Endpoint"/>' |
| ' <attributes>' |
| ' <attribute id="0x0000" name="SupportedModes">' |
| ' <mandatoryConform/>' |
| ' </attribute>' |
| ' <attribute id="0x0002" name="StartUpMode">' |
| ' <disallowConform/>' |
| ' </attribute>' |
| ' <attribute id="0x0003" name="OnMode">' |
| ' <disallowConform/>' |
| ' </attribute>' |
| ' </attributes>' |
| ' <commands>' |
| ' <command id="0x00" name="ChangeToMode">' |
| ' <access invokePrivilege="operate"/>' |
| ' <disallowConform/>' |
| ' </command>' |
| ' <command id="0x01" name="ChangeToModeResponse">' |
| ' <access invokePrivilege="operate"/>' |
| ' <disallowConform/>' |
| ' </command>' |
| ' </commands>' |
| '</cluster>' |
| ) |
| |
| CLUSTER_WITH_UNKNOWN_COMMAND = ( |
| '<cluster xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="types types.xsd cluster cluster.xsd" id="0xFFFE" name="Test Unknown Command" revision="1">' |
| ' <revisionHistory>' |
| ' <revision revision="1" summary="Initial version"/>' |
| ' </revisionHistory>' |
| ' <clusterIds>' |
| ' <clusterId id="0xFFFE" name="Test Unknown Command"/>' |
| ' </clusterIds>' |
| ' <classification hierarchy="base" role="application" picsCode="BASE" scope="Endpoint"/>' |
| ' <commands>' |
| ' <command id="0x00" name="ChangeToMode" direction="commandToClient">' |
| ' <access invokePrivilege="operate"/>' |
| ' <mandatoryConform/>' |
| ' </command>' |
| ' </commands>' |
| '</cluster>' |
| ) |
| |
| ALIASED_CLUSTERS = ( |
| '<cluster xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="types types.xsd cluster cluster.xsd" id="" name="Test Aliases" revision="1">' |
| ' <revisionHistory>' |
| ' <revision revision="1" summary="Initial version"/>' |
| ' </revisionHistory>' |
| ' <clusterIds>' |
| ' <clusterId id="0xFFFE" name="Test Alias1"/>' |
| ' <clusterId id="0xFFFD" name="Test Alias2"/>' |
| ' </clusterIds>' |
| ' <classification hierarchy="base" role="application" picsCode="BASE" scope="Endpoint"/>' |
| ' <commands>' |
| ' <command id="0x00" name="ChangeToMode" direction="commandToServer">' |
| ' <access invokePrivilege="operate"/>' |
| ' <mandatoryConform/>' |
| ' </command>' |
| ' </commands>' |
| '</cluster>' |
| ) |
| |
| |
| class TestSpecParsingSupport(MatterBaseTest): |
| def setup_class(self): |
| super().setup_class() |
| self.spec_xml_clusters, self.spec_problems = build_xml_clusters() |
| self.all_spec_clusters = set([(id, c.name, c.pics) for id, c in self.spec_xml_clusters.items()]) |
| |
| def test_spec_parsing_access(self): |
| strs = [None, 'view', 'operate', 'manage', 'admin'] |
| for read in strs: |
| for write in strs: |
| xml = single_attribute_cluster_xml(read, write, "true") |
| xml_cluster = parse_cluster(xml) |
| asserts.assert_is_not_none(xml_cluster.attributes, "No attributes found in cluster") |
| asserts.assert_is_not_none(xml_cluster.attribute_map, "No attribute map found in cluster") |
| asserts.assert_equal(len(xml_cluster.attributes), len(GlobalAttributeIds) + 1, "Unexpected number of attributes") |
| asserts.assert_true(ATTRIBUTE_ID in xml_cluster.attributes.keys(), |
| "Did not find test attribute in XmlCluster.attributes") |
| asserts.assert_equal(xml_cluster.attributes[ATTRIBUTE_ID].read_access, |
| get_access_enum_from_string(read), "Unexpected read access") |
| asserts.assert_equal(xml_cluster.attributes[ATTRIBUTE_ID].write_access, |
| get_access_enum_from_string(write), "Unexpected write access") |
| |
| def test_write_optional(self): |
| for write_support in ['true', 'optional']: |
| xml = single_attribute_cluster_xml('view', 'view', write_support) |
| xml_cluster = parse_cluster(xml) |
| asserts.assert_is_not_none(xml_cluster.attributes, "No attributes found in cluster") |
| asserts.assert_is_not_none(xml_cluster.attribute_map, "No attribute map found in cluster") |
| asserts.assert_equal(len(xml_cluster.attributes), len(GlobalAttributeIds) + 1, "Unexpected number of attributes") |
| asserts.assert_true(ATTRIBUTE_ID in xml_cluster.attributes.keys(), |
| "Did not find test attribute in XmlCluster.attributes") |
| asserts.assert_equal(xml_cluster.attributes[ATTRIBUTE_ID].write_optional, |
| write_support == 'optional', "Unexpected write_optional value") |
| |
| def test_derived_clusters(self): |
| clusters: dict[int, XmlCluster] = {} |
| pure_base_clusters: dict[str, XmlCluster] = {} |
| ids_by_name: dict[str, int] = {} |
| problems: list[ProblemNotice] = [] |
| base_cluster_xml = ElementTree.fromstring(BASE_CLUSTER_XML_STR) |
| derived_cluster_xml = ElementTree.fromstring(DERIVED_CLUSTER_XML_STR) |
| expected_global_attrs = [GlobalAttributeIds.FEATURE_MAP_ID, GlobalAttributeIds.ATTRIBUTE_LIST_ID, |
| GlobalAttributeIds.ACCEPTED_COMMAND_LIST_ID, GlobalAttributeIds.GENERATED_COMMAND_LIST_ID, GlobalAttributeIds.CLUSTER_REVISION_ID] |
| |
| add_cluster_data_from_xml(base_cluster_xml, clusters, pure_base_clusters, ids_by_name, problems) |
| add_cluster_data_from_xml(derived_cluster_xml, clusters, pure_base_clusters, ids_by_name, problems) |
| |
| asserts.assert_equal(len(clusters), 1, "Unexpected number of clusters") |
| asserts.assert_equal(len(pure_base_clusters), 1, "Unexpected number of pure base clusters") |
| asserts.assert_equal(len(ids_by_name), 1, "Unexpected number of IDs per name") |
| asserts.assert_equal(len(problems), 0, "Unexpected number of problems") |
| asserts.assert_equal(ids_by_name["Test Derived"], 0xFFFF, "Test derived name not added to IDs") |
| |
| asserts.assert_true(0xFFFF in clusters, "Derived ID not found in clusters") |
| asserts.assert_equal(set(clusters[0xFFFF].attributes.keys()), set( |
| [0, 2, 3] + expected_global_attrs), "Unexpected attribute list") |
| asserts.assert_equal(set(clusters[0xFFFF].accepted_commands.keys()), set([]), "Unexpected accepted commands") |
| asserts.assert_equal(set(clusters[0xFFFF].generated_commands.keys()), set([]), "Unexpected generated commands") |
| |
| asserts.assert_true("Test Base" in pure_base_clusters, "Base ID not found in derived clusters") |
| asserts.assert_equal(set(pure_base_clusters["Test Base"].attributes.keys()), set( |
| [0, 1, 2, 3] + expected_global_attrs), "Unexpected attribute list") |
| asserts.assert_equal(set(pure_base_clusters["Test Base"].accepted_commands.keys()), |
| set([0]), "Unexpected accepted commands") |
| asserts.assert_equal(set(pure_base_clusters["Test Base"].generated_commands.keys()), |
| set([1]), "Unexpected generated commands") |
| asserts.assert_equal(str(pure_base_clusters["Test Base"].accepted_commands[0].conformance), |
| "M", "Unexpected conformance on base accepted command") |
| asserts.assert_equal(str(pure_base_clusters["Test Base"].generated_commands[1].conformance), |
| "M", "Unexpected conformance on base generated command") |
| |
| asserts.assert_equal(len(pure_base_clusters["Test Base"].unknown_commands), |
| 0, "Unexpected number of unknown commands in base") |
| asserts.assert_equal(len(clusters[0xFFFF].unknown_commands), 2, "Unexpected number of unknown commands in derived cluster") |
| |
| combine_derived_clusters_with_base(clusters, pure_base_clusters, ids_by_name, problems) |
| # Ensure the base-only attribute (1) was added to the derived cluster |
| asserts.assert_equal(set(clusters[0xFFFF].attributes.keys()), set( |
| [0, 1, 2, 3] + expected_global_attrs), "Unexpected attribute list") |
| # Ensure the conformance overrides from the derived cluster are on the attributes |
| asserts.assert_equal(str(clusters[0xFFFF].attributes[0].conformance), "M", "Unexpected conformance on attribute 0") |
| asserts.assert_equal(str(clusters[0xFFFF].attributes[1].conformance), "M", "Unexpected conformance on attribute 1") |
| asserts.assert_equal(str(clusters[0xFFFF].attributes[2].conformance), "X", "Unexpected conformance on attribute 2") |
| asserts.assert_equal(str(clusters[0xFFFF].attributes[3].conformance), "X", "Unexpected conformance on attribute 3") |
| |
| # Ensure both the accepted and generated command overrides work |
| asserts.assert_true(set(clusters[0xFFFF].accepted_commands.keys()), |
| set([0]), "Unexpected accepted command list after merge") |
| asserts.assert_true(set(clusters[0xFFFF].generated_commands.keys()), set([1]), |
| "Unexpected generated command list after merge") |
| asserts.assert_equal(str(clusters[0xFFFF].accepted_commands[0].conformance), |
| "X", "Unexpected conformance on accepted commands") |
| asserts.assert_equal(str(clusters[0xFFFF].generated_commands[1].conformance), |
| "X", "Unexpected conformance on generated commands") |
| asserts.assert_equal(len(clusters[0xFFFF].unknown_commands), 0, "Unexpected number of unknown commands after merge") |
| |
| def test_missing_command_direction(self): |
| clusters: dict[int, XmlCluster] = {} |
| pure_base_clusters: dict[str, XmlCluster] = {} |
| ids_by_name: dict[str, int] = {} |
| problems: list[ProblemNotice] = [] |
| cluster_xml = ElementTree.fromstring(CLUSTER_WITH_UNKNOWN_COMMAND) |
| |
| add_cluster_data_from_xml(cluster_xml, clusters, pure_base_clusters, ids_by_name, problems) |
| check_clusters_for_unknown_commands(clusters, problems) |
| asserts.assert_equal(len(problems), 1, "Unexpected number of problems found") |
| asserts.assert_equal(problems[0].location.cluster_id, 0xFFFE, "Unexpected problem location (cluster id)") |
| asserts.assert_equal(problems[0].location.command_id, 0, "Unexpected problem location (command id)") |
| |
| def test_aliased_clusters(self): |
| clusters: dict[int, XmlCluster] = {} |
| pure_base_clusters: dict[str, XmlCluster] = {} |
| ids_by_name: dict[str, int] = {} |
| problems: list[ProblemNotice] = [] |
| cluster_xml = ElementTree.fromstring(ALIASED_CLUSTERS) |
| |
| add_cluster_data_from_xml(cluster_xml, clusters, pure_base_clusters, ids_by_name, problems) |
| asserts.assert_equal(len(problems), 0, "Unexpected problem parsing aliased clusters") |
| asserts.assert_equal(len(clusters), 2, "Unexpected number of clusters when parsing aliased cluster set") |
| asserts.assert_equal(len(pure_base_clusters), 0, "Unexpected number of pure base clusters") |
| asserts.assert_equal(len(ids_by_name), 2, "Unexpected number of ids by name") |
| |
| ids = [(id, c.name) for id, c in clusters.items()] |
| asserts.assert_true((0xFFFE, 'Test Alias1') in ids, "Unable to find Test Alias1 cluster in parsed clusters") |
| asserts.assert_true((0xFFFD, 'Test Alias2') in ids, "Unable to find Test Alias2 cluster in parsed clusters") |
| |
| def test_known_aliased_clusters(self): |
| known_aliased_clusters = set([(0x040C, 'Carbon Monoxide Concentration Measurement', 'CMOCONC'), |
| (0x040D, 'Carbon Dioxide Concentration Measurement', 'CDOCONC'), |
| (0x0413, 'Nitrogen Dioxide Concentration Measurement', 'NDOCONC'), |
| (0x0415, 'Ozone Concentration Measurement', 'OZCONC'), |
| # Change to "PM2.5 Concentration Measurement" once https://github.com/csa-data-model/projects/issues/453 is fixed |
| (0x042A, 'PM2', 'PMICONC'), |
| (0x042B, 'Formaldehyde Concentration Measurement', 'FLDCONC'), |
| (0x042C, 'PM1 Concentration Measurement', 'PMHCONC'), |
| (0x042D, 'PM10 Concentration Measurement', 'PMKCONC'), |
| (0x042E, 'Total Volatile Organic Compounds Concentration Measurement', 'TVOCCONC'), |
| (0x042F, 'Radon Concentration Measurement', 'RNCONC'), |
| (0x0071, 'HEPA Filter Monitoring', 'HEPAFREMON'), |
| (0x0072, 'Activated Carbon Filter Monitoring', 'ACFREMON'), |
| (0x0405, 'Relative Humidity Measurement', 'RH')]) |
| |
| missing_clusters = known_aliased_clusters - self.all_spec_clusters |
| asserts.assert_equal(len(missing_clusters), 0, f"Missing aliased clusters from DM XML - {missing_clusters}") |
| |
| def test_known_derived_clusters(self): |
| known_derived_clusters = set([(0x0048, 'Oven Cavity Operational State', 'OVENOPSTATE'), |
| (0x0049, 'Oven Mode', 'OTCCM'), |
| (0x0051, 'Laundry Washer Mode', 'LWM'), |
| (0x0052, 'Refrigerator And Temperature Controlled Cabinet Mode', 'TCCM'), |
| (0x0054, 'RVC Run Mode', 'RVCRUNM'), |
| (0x0055, 'RVC Clean Mode', 'RVCCLEANM'), |
| (0x0057, 'Refrigerator Alarm', 'REFALM'), |
| (0x0059, 'Dishwasher Mode', 'DISHM'), |
| (0x005c, 'Smoke CO Alarm', 'SMOKECO'), |
| (0x005d, 'Dishwasher Alarm', 'DISHALM'), |
| (0x005e, 'Microwave Oven Mode', 'MWOM'), |
| (0x0061, 'RVC Operational State', 'RVCOPSTATE')]) |
| |
| missing_clusters = known_derived_clusters - self.all_spec_clusters |
| asserts.assert_equal(len(missing_clusters), 0, f"Missing aliased clusters from DM XML - {missing_clusters}") |
| for d in known_derived_clusters: |
| asserts.assert_true(self.spec_xml_clusters is not None, "Derived cluster with no base cluster marker") |
| |
| |
| if __name__ == "__main__": |
| default_matter_test_main() |