blob: ddf3e421a7f75c85ee52f65120f00311beedb0fa [file] [log] [blame]
#
# Copyright (c) 2024 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from typing import Any
import chip.clusters as Clusters
from chip.testing.basic_composition import arls_populated
from chip.testing.conformance import ConformanceDecision
from chip.testing.global_attribute_ids import GlobalAttributeIds
from chip.testing.matter_testing import MatterBaseTest, async_test_body, default_matter_test_main
from chip.testing.spec_parsing import build_xml_clusters, build_xml_device_types
from mobly import asserts
from TC_DeviceConformance import DeviceConformanceTests
def create_onoff_endpoint(endpoint: int) -> dict[int, dict[int, dict[int, Any]]]:
# Really simple device with one endpoint that includes scenes management, which is provisional
# I'm ONLY populating the global attributes since the conformance test only uses these.
endpoint_tlv = {endpoint: {}}
on_off_device_type_id = 0x0100
on_off_device_type_revision = 3
descriptor_cluster_revision = 3
identify_cluster_revision = 5
on_off_cluster_revision = 6
scenes_cluster_revision = 1
# Descriptor
attr = Clusters.Descriptor.Attributes
attrs = {}
attrs[attr.FeatureMap.attribute_id] = 0
attrs[attr.AcceptedCommandList.attribute_id] = []
attrs[attr.GeneratedCommandList.attribute_id] = []
attrs[attr.ClusterRevision.attribute_id] = descriptor_cluster_revision
attrs[attr.DeviceTypeList.attribute_id] = [Clusters.Descriptor.Structs.DeviceTypeStruct(
deviceType=on_off_device_type_id, revision=on_off_device_type_revision)]
attrs[attr.ServerList.attribute_id] = [Clusters.Identify.id,
Clusters.Groups.id, Clusters.ScenesManagement.id, Clusters.OnOff.id]
attrs[attr.ClientList.attribute_id] = []
attrs[attr.PartsList.attribute_id] = []
attrs[attr.AttributeList.attribute_id] = []
attrs[attr.AttributeList.attribute_id] = list[attrs.keys()]
endpoint_tlv[endpoint][Clusters.Descriptor.id] = attrs
# Identify
attr = Clusters.Identify.Attributes
attrs = {}
attrs[attr.FeatureMap.attribute_id] = 0
attrs[attr.AcceptedCommandList.attribute_id] = [Clusters.Identify.Commands.Identify.command_id]
attrs[attr.GeneratedCommandList.attribute_id] = []
attrs[attr.ClusterRevision.attribute_id] = identify_cluster_revision
attrs[attr.IdentifyTime.attribute_id] = 0
attrs[attr.IdentifyType.attribute_id] = Clusters.Identify.Enums.IdentifyTypeEnum.kNone
attrs[attr.AttributeList.attribute_id] = []
attrs[attr.AttributeList.attribute_id] = list[attrs.keys()]
endpoint_tlv[endpoint][Clusters.Identify.id] = attrs
# OnOff
attr = Clusters.OnOff.Attributes
attrs = {}
# device type requires LT feature
attrs[attr.FeatureMap.attribute_id] = Clusters.OnOff.Bitmaps.Feature.kLighting
cmd = Clusters.OnOff.Commands
attrs[attr.AcceptedCommandList.attribute_id] = [cmd.Off.command_id, cmd.On.command_id, cmd.Toggle.command_id,
cmd.OffWithEffect.command_id, cmd.OnWithRecallGlobalScene.command_id, cmd.OnWithTimedOff.command_id]
attrs[attr.GeneratedCommandList.attribute_id] = []
attrs[attr.ClusterRevision.attribute_id] = on_off_cluster_revision
attrs[attr.OnOff.attribute_id] = False
attrs[attr.GlobalSceneControl.attribute_id] = False
attrs[attr.OnTime.attribute_id] = 0
attrs[attr.OffWaitTime.attribute_id] = 0
attrs[attr.StartUpOnOff.attribute_id] = Clusters.OnOff.Enums.StartUpOnOffEnum.kOff
attrs[attr.AttributeList.attribute_id] = []
attrs[attr.AttributeList.attribute_id] = list[attrs.keys()]
endpoint_tlv[endpoint][Clusters.OnOff.id] = attrs
# Scenes
attr = Clusters.ScenesManagement.Attributes
attrs = {}
attrs[attr.FeatureMap.attribute_id] = 0
cmd = Clusters.ScenesManagement.Commands
attrs[attr.AcceptedCommandList.attribute_id] = [cmd.AddScene.command_id, cmd.ViewScene.command_id, cmd.RemoveScene.command_id,
cmd.RemoveAllScenes.command_id, cmd.StoreScene.command_id, cmd.RecallScene.command_id, cmd.GetSceneMembership.command_id]
attrs[attr.GeneratedCommandList.attribute_id] = [cmd.AddSceneResponse.command_id, cmd.ViewSceneResponse.command_id,
cmd.RemoveSceneResponse.command_id, cmd.RemoveAllScenesResponse.command_id,
cmd.StoreSceneResponse.command_id, cmd.GetSceneMembershipResponse.command_id]
attrs[attr.ClusterRevision.attribute_id] = scenes_cluster_revision
attrs[attr.SceneTableSize.attribute_id] = 16
attrs[attr.FabricSceneInfo.attribute_id] = []
attrs[attr.AttributeList.attribute_id] = []
attrs[attr.AttributeList.attribute_id] = list[attrs.keys()]
endpoint_tlv[endpoint][Clusters.ScenesManagement.id] = attrs
return endpoint_tlv
def is_mandatory(conformance):
return conformance(0, [], []).decision == ConformanceDecision.MANDATORY
class TestConformanceSupport(MatterBaseTest, DeviceConformanceTests):
def setup_class(self):
self.xml_clusters, self.problems = build_xml_clusters()
self.xml_device_types, problems = build_xml_device_types()
self.problems.extend(problems)
@async_test_body
async def test_provisional_cluster(self):
# NOTE: I'm actually FORCING scenes to provisional in this test because it will not be provisional
# forever.
self.xml_clusters[Clusters.ScenesManagement.id].is_provisional = True
self.endpoints_tlv = create_onoff_endpoint(1)
# The CI flag here is to deal with example code that improperly implements the network commissioning cluster.
# It does not apply here.
success, problems = self.check_conformance(ignore_in_progress=False, is_ci=False, allow_provisional=False)
asserts.assert_false(success, "Unexpected success parsing endpoint with provisional cluster")
success, problems = self.check_conformance(ignore_in_progress=False, is_ci=False, allow_provisional=True)
asserts.assert_true(success, "Unexpected failure parsing endpoint with provisional cluster and allow_provisional enabled")
self.xml_clusters[Clusters.ScenesManagement.id].is_provisional = False
success, problems = self.check_conformance(ignore_in_progress=False, is_ci=False, allow_provisional=False)
asserts.assert_true(success, "Unexpected failure parsing endpoint with no clusters marked as provisional")
def _create_minimal_cluster(self, cluster_id: int) -> dict[int, Any]:
attrs = {}
attrs[GlobalAttributeIds.FEATURE_MAP_ID] = 0
mandatory_attributes = [id for id, a in self.xml_clusters[cluster_id].attributes.items() if is_mandatory(a.conformance)]
for m in mandatory_attributes:
# dummy versions - we're not using the values in this test
attrs[m] = 0
attrs[GlobalAttributeIds.ATTRIBUTE_LIST_ID] = mandatory_attributes
mandatory_accepted_commands = [id for id, a in self.xml_clusters[cluster_id].accepted_commands.items()
if is_mandatory(a.conformance)]
attrs[GlobalAttributeIds.ACCEPTED_COMMAND_LIST_ID] = mandatory_accepted_commands
mandatory_generated_commands = [id for id, a in self.xml_clusters[cluster_id].generated_commands.items()
if is_mandatory(a.conformance)]
attrs[GlobalAttributeIds.GENERATED_COMMAND_LIST_ID] = mandatory_generated_commands
attrs[GlobalAttributeIds.CLUSTER_REVISION_ID] = self.xml_clusters[cluster_id].revision
return attrs
def _create_minimal_dt(self, device_type_id: int) -> dict[int, dict[int, Any]]:
''' Creates the internals of an endpoint_tlv with the minimal set of clusters, with the minimal set of attributes and commands. Global attributes only.
Does NOT take into account overrides yet.
'''
endpoint_tlv = {}
required_servers = [id for id, c in self.xml_device_types[device_type_id].server_clusters.items()
if is_mandatory(c.conformance)]
required_clients = [id for id, c in self.xml_device_types[device_type_id].client_clusters.items()
if is_mandatory(c.conformance)]
device_type_revision = self.xml_device_types[device_type_id].revision
for s in required_servers:
endpoint_tlv[s] = self._create_minimal_cluster(s)
# Descriptor
attr = Clusters.Descriptor.Attributes
attrs = {}
attrs[attr.FeatureMap.attribute_id] = 0
attrs[attr.AcceptedCommandList.attribute_id] = []
attrs[attr.GeneratedCommandList.attribute_id] = []
attrs[attr.ClusterRevision.attribute_id] = self.xml_clusters[Clusters.Descriptor.id].revision
attrs[attr.DeviceTypeList.attribute_id] = [
Clusters.Descriptor.Structs.DeviceTypeStruct(deviceType=device_type_id, revision=device_type_revision)]
attrs[attr.ServerList.attribute_id] = required_servers
attrs[attr.ClientList.attribute_id] = required_clients
attrs[attr.PartsList.attribute_id] = []
attrs[attr.AttributeList.attribute_id] = []
attrs[attr.AttributeList.attribute_id] = list(attrs.keys())
endpoint_tlv[Clusters.Descriptor.id] = attrs
return endpoint_tlv
def add_macl(self, root_endpoint: dict[int, dict[int, Any]], populate_arl: bool = False, populate_commissioning_arl: bool = False):
ac = Clusters.AccessControl
root_endpoint[ac.id][ac.Attributes.FeatureMap.attribute_id] = ac.Bitmaps.Feature.kManagedDevice
root_endpoint[ac.id][ac.Attributes.Arl.attribute_id] = []
root_endpoint[ac.id][ac.Attributes.CommissioningARL.attribute_id] = []
root_endpoint[ac.id][ac.Attributes.AttributeList.attribute_id].extend([
ac.Attributes.Arl.attribute_id, ac.Attributes.CommissioningARL.attribute_id])
root_endpoint[ac.id][ac.Attributes.AcceptedCommandList.attribute_id].append(ac.Commands.ReviewFabricRestrictions.command_id)
root_endpoint[ac.id][ac.Attributes.GeneratedCommandList.attribute_id].append(
ac.Commands.ReviewFabricRestrictionsResponse.command_id)
generic_restriction = ac.Structs.AccessRestrictionStruct(
type=ac.Enums.AccessRestrictionTypeEnum.kAttributeAccessForbidden, id=1)
entry = ac.Structs.CommissioningAccessRestrictionEntryStruct(endpoint=1, cluster=2, restrictions=generic_restriction)
if populate_arl:
root_endpoint[ac.id][ac.Attributes.Arl.attribute_id] = [entry]
if populate_commissioning_arl:
root_endpoint[ac.id][ac.Attributes.CommissioningARL.attribute_id] = [entry]
@async_test_body
async def test_macl_handling(self):
nim_id = self._get_device_type_id('network infrastructure manager')
root_node_id = self._get_device_type_id('root node')
on_off_id = self._get_device_type_id('On/Off Light')
root = self._create_minimal_dt(device_type_id=root_node_id)
nim = self._create_minimal_dt(device_type_id=nim_id)
self.endpoints_tlv = {0: root, 1: nim}
asserts.assert_true(self._has_device_type_supporting_macl(), "Did not find supported device in generated device")
success, problems = self.check_conformance(ignore_in_progress=False, is_ci=False, allow_provisional=True)
self.problems.extend(problems)
asserts.assert_true(success, "Unexpected failure parsing minimal dt")
self.add_macl(root)
# A MACL is allowed when there is a NIM, so this should succeed as well
success, problems = self.check_conformance(ignore_in_progress=False, is_ci=False, allow_provisional=True)
self.problems.extend(problems)
asserts.assert_true(success, "Unexpected failure with NIM and MACL")
# A MACL is not allowed when there is no NIM
self.endpoints_tlv[1] = self._create_minimal_dt(device_type_id=on_off_id)
success, problems = self.check_conformance(ignore_in_progress=False, is_ci=False, allow_provisional=True)
self.problems.extend(problems)
asserts.assert_false(success, "Unexpected success with On/Off and MACL")
# TODO: what happens if there is a NIM and a non-NIM endpoint?
@async_test_body
async def test_macl_restrictions(self):
nim_id = self._get_device_type_id('network infrastructure manager')
root_node_id = self._get_device_type_id('root node')
root = self._create_minimal_dt(device_type_id=root_node_id)
nim = self._create_minimal_dt(device_type_id=nim_id)
self.endpoints_tlv = {0: root, 1: nim}
# device with no macl
arl_data = arls_populated(self.endpoints_tlv)
asserts.assert_false(arl_data.have_arl, "Unexpected ARL found")
asserts.assert_false(arl_data.have_carl, "Unexpected CommissioningARL found")
# device with unpopulated macl
self.add_macl(root)
arl_data = arls_populated(self.endpoints_tlv)
asserts.assert_false(arl_data.have_arl, "Unexpected ARL found")
asserts.assert_false(arl_data.have_carl, "Unexpected CommissioningARL found")
# device with populated ARL
self.add_macl(root, populate_arl=True)
arl_data = arls_populated(self.endpoints_tlv)
asserts.assert_true(arl_data.have_arl, "Did not find expected ARL")
asserts.assert_false(arl_data.have_carl, "Unexpected CommissioningARL found")
# device with populated commissioning ARL
self.add_macl(root, populate_commissioning_arl=True)
arl_data = arls_populated(self.endpoints_tlv)
asserts.assert_false(arl_data.have_arl, "Unexpected ARL found")
asserts.assert_true(arl_data.have_carl, "Did not find expected Commissioning ARL")
# device with both
self.add_macl(root, populate_arl=True, populate_commissioning_arl=True)
arl_data = arls_populated(self.endpoints_tlv)
asserts.assert_true(arl_data.have_arl, "Did not find expected ARL")
asserts.assert_true(arl_data.have_carl, "Did not find expected Commissioning ARL")
if __name__ == "__main__":
default_matter_test_main()