blob: 6bd345cb72b10918473c73c5ee332da47be8ab2b [file] [log] [blame]
#
# Copyright (c) 2024 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
from enum import StrEnum
from typing import Any
import chip.clusters as Clusters
from chip.testing.basic_composition import arls_populated
from chip.testing.matter_testing import MatterBaseTest
from chip.testing.problem_notices import AttributePathLocation, CommandPathLocation, ProblemLocation
from chip.testing.runner import default_matter_test_main
from chip.testing.spec_parsing import PrebuiltDataModelDirectory, build_xml_clusters, build_xml_device_types
from fake_device_builder import create_minimal_cluster, create_minimal_dt
from mobly import asserts
from TC_DeviceConformance import DeviceConformanceTests
def create_onoff_endpoint(endpoint: int) -> dict[int, dict[int, dict[int, Any]]]:
# Really simple device with one endpoint that includes scenes management, which is provisional
# I'm ONLY populating the global attributes since the conformance test only uses these.
endpoint_tlv = {endpoint: {}}
on_off_device_type_id = 0x0100
on_off_device_type_revision = 3
descriptor_cluster_revision = 3
identify_cluster_revision = 5
on_off_cluster_revision = 6
scenes_cluster_revision = 1
# Descriptor
attr = Clusters.Descriptor.Attributes
attrs = {}
attrs[attr.FeatureMap.attribute_id] = 0
attrs[attr.AcceptedCommandList.attribute_id] = []
attrs[attr.GeneratedCommandList.attribute_id] = []
attrs[attr.ClusterRevision.attribute_id] = descriptor_cluster_revision
attrs[attr.DeviceTypeList.attribute_id] = [Clusters.Descriptor.Structs.DeviceTypeStruct(
deviceType=on_off_device_type_id, revision=on_off_device_type_revision)]
attrs[attr.ServerList.attribute_id] = [Clusters.Identify.id,
Clusters.Groups.id, Clusters.ScenesManagement.id, Clusters.OnOff.id]
attrs[attr.ClientList.attribute_id] = []
attrs[attr.PartsList.attribute_id] = []
attrs[attr.AttributeList.attribute_id] = []
attrs[attr.AttributeList.attribute_id] = list[attrs.keys()]
endpoint_tlv[endpoint][Clusters.Descriptor.id] = attrs
# Identify
attr = Clusters.Identify.Attributes
attrs = {}
attrs[attr.FeatureMap.attribute_id] = 0
attrs[attr.AcceptedCommandList.attribute_id] = [Clusters.Identify.Commands.Identify.command_id]
attrs[attr.GeneratedCommandList.attribute_id] = []
attrs[attr.ClusterRevision.attribute_id] = identify_cluster_revision
attrs[attr.IdentifyTime.attribute_id] = 0
attrs[attr.IdentifyType.attribute_id] = Clusters.Identify.Enums.IdentifyTypeEnum.kNone
attrs[attr.AttributeList.attribute_id] = []
attrs[attr.AttributeList.attribute_id] = list[attrs.keys()]
endpoint_tlv[endpoint][Clusters.Identify.id] = attrs
# OnOff
attr = Clusters.OnOff.Attributes
attrs = {}
# device type requires LT feature
attrs[attr.FeatureMap.attribute_id] = Clusters.OnOff.Bitmaps.Feature.kLighting
cmd = Clusters.OnOff.Commands
attrs[attr.AcceptedCommandList.attribute_id] = [cmd.Off.command_id, cmd.On.command_id, cmd.Toggle.command_id,
cmd.OffWithEffect.command_id, cmd.OnWithRecallGlobalScene.command_id, cmd.OnWithTimedOff.command_id]
attrs[attr.GeneratedCommandList.attribute_id] = []
attrs[attr.ClusterRevision.attribute_id] = on_off_cluster_revision
attrs[attr.OnOff.attribute_id] = False
attrs[attr.GlobalSceneControl.attribute_id] = False
attrs[attr.OnTime.attribute_id] = 0
attrs[attr.OffWaitTime.attribute_id] = 0
attrs[attr.StartUpOnOff.attribute_id] = Clusters.OnOff.Enums.StartUpOnOffEnum.kOff
attrs[attr.AttributeList.attribute_id] = []
attrs[attr.AttributeList.attribute_id] = list[attrs.keys()]
endpoint_tlv[endpoint][Clusters.OnOff.id] = attrs
# Scenes
attr = Clusters.ScenesManagement.Attributes
attrs = {}
attrs[attr.FeatureMap.attribute_id] = 0
cmd = Clusters.ScenesManagement.Commands
attrs[attr.AcceptedCommandList.attribute_id] = [cmd.AddScene.command_id, cmd.ViewScene.command_id, cmd.RemoveScene.command_id,
cmd.RemoveAllScenes.command_id, cmd.StoreScene.command_id, cmd.RecallScene.command_id, cmd.GetSceneMembership.command_id]
attrs[attr.GeneratedCommandList.attribute_id] = [cmd.AddSceneResponse.command_id, cmd.ViewSceneResponse.command_id,
cmd.RemoveSceneResponse.command_id, cmd.RemoveAllScenesResponse.command_id,
cmd.StoreSceneResponse.command_id, cmd.GetSceneMembershipResponse.command_id]
attrs[attr.ClusterRevision.attribute_id] = scenes_cluster_revision
attrs[attr.SceneTableSize.attribute_id] = 16
attrs[attr.FabricSceneInfo.attribute_id] = []
attrs[attr.AttributeList.attribute_id] = []
attrs[attr.AttributeList.attribute_id] = list[attrs.keys()]
endpoint_tlv[endpoint][Clusters.ScenesManagement.id] = attrs
return endpoint_tlv
class TestConformanceSupport(MatterBaseTest, DeviceConformanceTests):
def setup_class(self):
# Latest fully qualified version
# TODO: It might be good to find a way to run this against each directory.
self.xml_clusters, self.problems = build_xml_clusters(PrebuiltDataModelDirectory.k1_4)
self.xml_device_types, problems = build_xml_device_types(PrebuiltDataModelDirectory.k1_4)
self.problems.extend(problems)
def test_provisional_cluster(self):
# NOTE: I'm actually FORCING scenes to provisional in this test because it will not be provisional
# forever.
self.xml_clusters[Clusters.ScenesManagement.id].is_provisional = True
self.endpoints_tlv = create_onoff_endpoint(1)
# The CI flag here is to deal with example code that improperly implements the network commissioning cluster.
# It does not apply here.
success, problems = self.check_conformance(ignore_in_progress=False, is_ci=False, allow_provisional=False)
asserts.assert_false(success, "Unexpected success parsing endpoint with provisional cluster")
success, problems = self.check_conformance(ignore_in_progress=False, is_ci=False, allow_provisional=True)
asserts.assert_true(success, "Unexpected failure parsing endpoint with provisional cluster and allow_provisional enabled")
self.xml_clusters[Clusters.ScenesManagement.id].is_provisional = False
success, problems = self.check_conformance(ignore_in_progress=False, is_ci=False, allow_provisional=False)
asserts.assert_true(success, "Unexpected failure parsing endpoint with no clusters marked as provisional")
def add_macl(self, root_endpoint: dict[int, dict[int, Any]], populate_arl: bool = False, populate_commissioning_arl: bool = False):
ac = Clusters.AccessControl
root_endpoint[ac.id][ac.Attributes.FeatureMap.attribute_id] = ac.Bitmaps.Feature.kManagedDevice
root_endpoint[ac.id][ac.Attributes.Arl.attribute_id] = []
root_endpoint[ac.id][ac.Attributes.CommissioningARL.attribute_id] = []
root_endpoint[ac.id][ac.Attributes.AttributeList.attribute_id].extend([
ac.Attributes.Arl.attribute_id, ac.Attributes.CommissioningARL.attribute_id])
root_endpoint[ac.id][ac.Attributes.AcceptedCommandList.attribute_id].append(ac.Commands.ReviewFabricRestrictions.command_id)
root_endpoint[ac.id][ac.Attributes.GeneratedCommandList.attribute_id].append(
ac.Commands.ReviewFabricRestrictionsResponse.command_id)
generic_restriction = ac.Structs.AccessRestrictionStruct(
type=ac.Enums.AccessRestrictionTypeEnum.kAttributeAccessForbidden, id=1)
entry = ac.Structs.CommissioningAccessRestrictionEntryStruct(endpoint=1, cluster=2, restrictions=generic_restriction)
if populate_arl:
root_endpoint[ac.id][ac.Attributes.Arl.attribute_id] = [entry]
if populate_commissioning_arl:
root_endpoint[ac.id][ac.Attributes.CommissioningARL.attribute_id] = [entry]
def test_macl_handling(self):
nim_id = self._get_device_type_id('network infrastructure manager')
root_node_id = self._get_device_type_id('root node')
on_off_id = self._get_device_type_id('On/Off Light')
root = create_minimal_dt(self.xml_clusters, self.xml_device_types, device_type_id=root_node_id)
nim = create_minimal_dt(self.xml_clusters, self.xml_device_types, device_type_id=nim_id)
self.endpoints_tlv = {0: root, 1: nim}
root_no_tlv = create_minimal_dt(self.xml_clusters, self.xml_device_types,
device_type_id=root_node_id, is_tlv_endpoint=False)
nim_no_tlv = create_minimal_dt(self.xml_clusters, self.xml_device_types, device_type_id=nim_id, is_tlv_endpoint=False)
self.endpoints = {0: root_no_tlv, 1: nim_no_tlv}
asserts.assert_true(self._has_device_type_supporting_macl(), "Did not find supported device in generated device")
success, problems = self.check_conformance(ignore_in_progress=False, is_ci=False, allow_provisional=True)
self.problems.extend(problems)
asserts.assert_true(success, "Unexpected failure parsing minimal dt")
self.add_macl(root)
# A MACL is allowed when there is a NIM, so this should succeed as well
success, problems = self.check_conformance(ignore_in_progress=False, is_ci=False, allow_provisional=True)
self.problems.extend(problems)
asserts.assert_true(success, "Unexpected failure with NIM and MACL")
# A MACL is not allowed when there is no NIM
self.endpoints[1] = create_minimal_dt(self.xml_clusters, self.xml_device_types,
device_type_id=on_off_id, is_tlv_endpoint=False)
success, problems = self.check_conformance(ignore_in_progress=False, is_ci=False, allow_provisional=True)
self.problems.extend(problems)
asserts.assert_false(success, "Unexpected success with On/Off and MACL")
# TODO: what happens if there is a NIM and a non-NIM endpoint?
def test_macl_restrictions(self):
nim_id = self._get_device_type_id('network infrastructure manager')
root_node_id = self._get_device_type_id('root node')
root = create_minimal_dt(self.xml_clusters, self.xml_device_types, device_type_id=root_node_id)
nim = create_minimal_dt(self.xml_clusters, self.xml_device_types, device_type_id=nim_id)
self.endpoints_tlv = {0: root, 1: nim}
# device with no macl
arl_data = arls_populated(self.endpoints_tlv)
asserts.assert_false(arl_data.have_arl, "Unexpected ARL found")
asserts.assert_false(arl_data.have_carl, "Unexpected CommissioningARL found")
# device with unpopulated macl
self.add_macl(root)
arl_data = arls_populated(self.endpoints_tlv)
asserts.assert_false(arl_data.have_arl, "Unexpected ARL found")
asserts.assert_false(arl_data.have_carl, "Unexpected CommissioningARL found")
# device with populated ARL
self.add_macl(root, populate_arl=True)
arl_data = arls_populated(self.endpoints_tlv)
asserts.assert_true(arl_data.have_arl, "Did not find expected ARL")
asserts.assert_false(arl_data.have_carl, "Unexpected CommissioningARL found")
# device with populated commissioning ARL
self.add_macl(root, populate_commissioning_arl=True)
arl_data = arls_populated(self.endpoints_tlv)
asserts.assert_false(arl_data.have_arl, "Unexpected ARL found")
asserts.assert_true(arl_data.have_carl, "Did not find expected Commissioning ARL")
# device with both
self.add_macl(root, populate_arl=True, populate_commissioning_arl=True)
arl_data = arls_populated(self.endpoints_tlv)
asserts.assert_true(arl_data.have_arl, "Did not find expected ARL")
asserts.assert_true(arl_data.have_carl, "Did not find expected Commissioning ARL")
def test_error_locations(self):
root_node_id = self._get_device_type_id('root node')
on_off_id = self._get_device_type_id('On/Off Light')
eevse_id = self._get_device_type_id('Energy EVSE')
root = create_minimal_dt(self.xml_clusters, self.xml_device_types, device_type_id=root_node_id)
on_off = create_minimal_dt(self.xml_clusters, self.xml_device_types, device_type_id=on_off_id)
eevse = create_minimal_dt(self.xml_clusters, self.xml_device_types, device_type_id=eevse_id)
self.endpoints_tlv = {0: root, 1: on_off, 2: eevse}
success, problems = self.check_conformance(ignore_in_progress=False, is_ci=False, allow_provisional=False)
for p in problems:
logging.info(p)
asserts.assert_true(success, "Unexpected failure on minimal on/off device")
asserts.assert_equal(len(problems), 0, "Unexpected problems reported for on/off device type")
# Includes Disallowed feature - CacheAndSync is provisional
class ProblemType(StrEnum):
kIncludesDisallowed = "disallowed"
kMandatoryNotPresent = "not present"
kUnknown = "unknown"
kChoice = "choice"
def run_check_with_expected_failure(msg_suffix: str, expected_location: ProblemLocation, problem_type: ProblemType):
success, problems = self.check_conformance(ignore_in_progress=False, is_ci=False, allow_provisional=False)
asserts.assert_false(success, f"Unexpected success on minimal on/off device {msg_suffix}")
logging.info("Problems reported (expect at least 1)")
for p in problems:
logging.info(p)
asserts.assert_greater_equal(
len(problems), 1, "Did not receive expected number of problem reports for on/off device type (expected at least 1)")
locations = [p.location for p in problems]
asserts.assert_in(expected_location, locations, "Did not get expected problem location")
for p in problems:
if p.location == expected_location:
asserts.assert_in(str(problem_type), p.problem.lower(), "Did not find expected problem notice")
# Disallowed feature
msg_suffix = "with disallowed feature"
cluster_id = Clusters.GroupKeyManagement.id
feature_map_id = Clusters.GroupKeyManagement.Attributes.FeatureMap.attribute_id
expected_location = AttributePathLocation(endpoint_id=0, cluster_id=cluster_id, attribute_id=feature_map_id)
self.endpoints_tlv[0][cluster_id][feature_map_id] = Clusters.GroupKeyManagement.Bitmaps.Feature.kCacheAndSync
run_check_with_expected_failure(msg_suffix, expected_location, ProblemType.kIncludesDisallowed)
# Includes unknown feature
msg_suffix = "with unknown feature"
cluster_id = Clusters.GroupKeyManagement.id
feature_map_id = Clusters.GroupKeyManagement.Attributes.FeatureMap.attribute_id
expected_location = AttributePathLocation(endpoint_id=0, cluster_id=cluster_id, attribute_id=feature_map_id)
self.endpoints_tlv[0][cluster_id][feature_map_id] = 2
run_check_with_expected_failure(msg_suffix, expected_location, ProblemType.kUnknown)
self.endpoints_tlv[0][cluster_id][feature_map_id] = 0
# Missing mandatory feature
msg_suffix = "with missing mandatory feature"
cluster_id = Clusters.EnergyEvse.id
feature_map_id = Clusters.EnergyEvse.Attributes.FeatureMap.attribute_id
expected_location = AttributePathLocation(endpoint_id=2, cluster_id=cluster_id, attribute_id=feature_map_id)
old_feature = self.endpoints_tlv[2][cluster_id][feature_map_id]
self.endpoints_tlv[2][cluster_id][feature_map_id] = 0
run_check_with_expected_failure(msg_suffix, expected_location, ProblemType.kMandatoryNotPresent)
self.endpoints_tlv[2][cluster_id][feature_map_id] = old_feature
# Add a cluster with choice conformance on the features - Network commissioning
msg_suffix = "with a network commissioning cluster"
# Add a network commissioning cluster
cluster_id = Clusters.NetworkCommissioning.id
self.endpoints_tlv[0][cluster_id] = create_minimal_cluster(self.xml_clusters, cluster_id)
# This will have no features populated by default - mark ethernet - this doesn't require any additional attributes or commands
feature_map_id = Clusters.NetworkCommissioning.Attributes.FeatureMap.attribute_id
self.endpoints_tlv[0][cluster_id][feature_map_id] = Clusters.NetworkCommissioning.Bitmaps.Feature.kEthernetNetworkInterface
success, problems = self.check_conformance(ignore_in_progress=False, is_ci=False, allow_provisional=False)
for p in problems:
logging.info(p)
asserts.assert_true(success, f"Unexpected failure on device {msg_suffix}")
asserts.assert_equal(len(problems), 0, f"Unexpected problems reported on device {msg_suffix}")
# Improper feature choice conformance - more than one
msg_suffix = "with too many choice conformance features"
expected_location = AttributePathLocation(endpoint_id=0, cluster_id=cluster_id, attribute_id=feature_map_id)
old_feature = self.endpoints_tlv[0][cluster_id][feature_map_id]
self.endpoints_tlv[0][cluster_id][feature_map_id] |= Clusters.NetworkCommissioning.Bitmaps.Feature.kWiFiNetworkInterface
run_check_with_expected_failure(msg_suffix, expected_location, ProblemType.kChoice)
# Improper Feature choice conformance - not enough
msg_suffix = "with no choice conformance features"
self.endpoints_tlv[0][cluster_id][feature_map_id] = 0
run_check_with_expected_failure(msg_suffix, expected_location, ProblemType.kChoice)
self.endpoints_tlv[0][cluster_id][feature_map_id] = Clusters.NetworkCommissioning.Bitmaps.Feature.kEthernetNetworkInterface
# Includes disallowed attribute
msg_suffix = "with disallowed attribute"
cluster_id = Clusters.NetworkCommissioning.id
attr_list_id = Clusters.NetworkCommissioning.Attributes.AttributeList.attribute_id
old_attributes = self.endpoints_tlv[0][cluster_id][attr_list_id]
scan_attr_id = Clusters.NetworkCommissioning.Attributes.ScanMaxTimeSeconds.attribute_id
self.endpoints_tlv[0][cluster_id][attr_list_id].append(scan_attr_id)
self.endpoints_tlv[0][cluster_id][scan_attr_id] = 0
expected_location = AttributePathLocation(endpoint_id=0, cluster_id=cluster_id, attribute_id=scan_attr_id)
run_check_with_expected_failure(msg_suffix, expected_location, ProblemType.kIncludesDisallowed)
self.endpoints_tlv[0][cluster_id][attr_list_id] = old_attributes
del self.endpoints_tlv[0][cluster_id][scan_attr_id]
# Missing mandatory attribute
msg_suffix = "with missing mandatory attribute"
cluster_id = Clusters.AccessControl.id
attr_list_id = Clusters.AccessControl.Attributes.AttributeList.attribute_id
old_attributes = self.endpoints_tlv[0][cluster_id][attr_list_id]
self.endpoints_tlv[0][cluster_id][attr_list_id] = old_attributes[1:]
del self.endpoints_tlv[0][cluster_id][old_attributes[0]]
expected_location = AttributePathLocation(endpoint_id=0, cluster_id=cluster_id, attribute_id=old_attributes[0])
run_check_with_expected_failure(msg_suffix, expected_location, ProblemType.kMandatoryNotPresent)
self.endpoints_tlv[0][cluster_id][attr_list_id] = old_attributes
self.endpoints_tlv[0][cluster_id][old_attributes[0]] = 0
# Includes disallowed command
msg_suffix = "with disallowed command"
cluster_id = Clusters.AdministratorCommissioning.id
accepted_cmd_id = Clusters.AdministratorCommissioning.Attributes.AcceptedCommandList.attribute_id
old_cmds = self.endpoints_tlv[0][cluster_id][accepted_cmd_id]
obcw_cmd_id = Clusters.AdministratorCommissioning.Commands.OpenBasicCommissioningWindow.command_id
self.endpoints_tlv[0][cluster_id][accepted_cmd_id].append(obcw_cmd_id)
expected_location = CommandPathLocation(endpoint_id=0, cluster_id=cluster_id, command_id=obcw_cmd_id)
run_check_with_expected_failure(msg_suffix, expected_location, ProblemType.kIncludesDisallowed)
self.endpoints_tlv[0][cluster_id][accepted_cmd_id] = old_cmds
# Missing mandatory command
msg_suffix = "with missing mandatory command"
cluster_id = Clusters.AdministratorCommissioning.id
accepted_cmd_id = Clusters.AdministratorCommissioning.Attributes.AcceptedCommandList.attribute_id
old_cmds = self.endpoints_tlv[0][cluster_id][accepted_cmd_id]
self.endpoints_tlv[0][cluster_id][accepted_cmd_id] = []
ocw_cmd_id = Clusters.AdministratorCommissioning.Commands.OpenCommissioningWindow.command_id
expected_location = CommandPathLocation(endpoint_id=0, cluster_id=cluster_id, command_id=ocw_cmd_id)
run_check_with_expected_failure(msg_suffix, expected_location, ProblemType.kMandatoryNotPresent)
self.endpoints_tlv[0][cluster_id][accepted_cmd_id] = old_cmds
if __name__ == "__main__":
default_matter_test_main()