blob: 1db208fa39f40b874f7fcb77550040ca9d268222 [file] [log] [blame]
#
# Copyright (c) 2023 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from typing import Callable
import chip.clusters as Clusters
from basic_composition_support import BasicCompositionTests
from chip.tlv import uint
from conformance_support import ConformanceDecision, conformance_allowed
from global_attribute_ids import GlobalAttributeIds
from matter_testing_support import (AttributePathLocation, ClusterPathLocation, CommandPathLocation, MatterBaseTest, ProblemNotice,
ProblemSeverity, async_test_body, default_matter_test_main)
from spec_parsing_support import CommandType, build_xml_clusters
class DeviceConformanceTests(BasicCompositionTests):
async def setup_class_helper(self):
await super().setup_class_helper()
self.xml_clusters, self.problems = build_xml_clusters()
def check_conformance(self, ignore_in_progress: bool, is_ci: bool):
problems = []
success = True
def conformance_str(conformance: Callable, feature_map: uint, feature_dict: dict[str, uint]) -> str:
codes = []
for mask, details in feature_dict.items():
if mask & feature_map:
codes.append(details.code)
return f'Conformance: {str(conformance)}, implemented features: {",".join(codes)}'
def record_problem(location, problem, severity):
problems.append(ProblemNotice("IDM-10.2", location, severity, problem, ""))
def record_error(location, problem):
nonlocal success
record_problem(location, problem, ProblemSeverity.ERROR)
success = False
def record_warning(location, problem):
record_problem(location, problem, ProblemSeverity.WARNING)
ignore_attributes: dict[int, list[int]] = {}
ignore_features: dict[int, list[int]] = {}
if ignore_in_progress:
# This is a manually curated list of attributes that are in-progress in the SDK, but have landed in the spec
in_progress_attributes = {Clusters.BasicInformation.id: [0x15, 0x016],
Clusters.PowerSource.id: [0x20, 0x21, 0x22, 0x23, 0x24, 0x25, 0x26, 0x27, 0x28, 0x29, 0x2A]}
ignore_attributes.update(in_progress_attributes)
# The spec currently has an error on the power source features
# This should be removed once https://github.com/CHIP-Specifications/connectedhomeip-spec/pull/7823 lands
in_progress_features = {Clusters.PowerSource.id: [(1 << 2), (1 << 3), (1 << 4), (1 << 5)]}
ignore_features.update(in_progress_features)
if is_ci:
# The network commissioning clusters on the CI select the features on the fly and end up non-conformant
# on these attributes. Production devices should not.
ci_ignore_attributes = {Clusters.NetworkCommissioning.id: [
Clusters.NetworkCommissioning.Attributes.ScanMaxTimeSeconds.attribute_id, Clusters.NetworkCommissioning.Attributes.ConnectMaxTimeSeconds.attribute_id]}
ignore_attributes.update(ci_ignore_attributes)
success = True
allow_provisional = self.user_params.get("allow_provisional", False)
# TODO: automate this once https://github.com/csa-data-model/projects/issues/454 is done.
provisional_cluster_ids = [Clusters.ContentControl.id, Clusters.ScenesManagement.id, Clusters.BallastConfiguration.id,
Clusters.EnergyPreference.id, Clusters.DeviceEnergyManagement.id, Clusters.DeviceEnergyManagementMode.id, Clusters.PulseWidthModulation.id,
Clusters.ProxyConfiguration.id, Clusters.ProxyDiscovery.id, Clusters.ProxyValid.id]
for endpoint_id, endpoint in self.endpoints_tlv.items():
for cluster_id, cluster in endpoint.items():
cluster_location = ClusterPathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id)
if cluster_id not in self.xml_clusters.keys():
if (cluster_id & 0xFFFF_0000) != 0:
# manufacturer cluster
continue
# TODO: update this from a warning once we have all the data
record_warning(location=cluster_location,
problem='Standard cluster found on device, but is not present in spec data')
continue
if not allow_provisional and cluster_id in provisional_cluster_ids:
record_error(location=cluster_location, problem='Provisional cluster found on device')
feature_map = cluster[GlobalAttributeIds.FEATURE_MAP_ID]
attribute_list = cluster[GlobalAttributeIds.ATTRIBUTE_LIST_ID]
all_command_list = cluster[GlobalAttributeIds.ACCEPTED_COMMAND_LIST_ID] + \
cluster[GlobalAttributeIds.GENERATED_COMMAND_LIST_ID]
# Feature conformance checking
feature_masks = [1 << i for i in range(32) if feature_map & (1 << i)]
for f in feature_masks:
location = AttributePathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id,
attribute_id=GlobalAttributeIds.FEATURE_MAP_ID)
if f not in self.xml_clusters[cluster_id].features.keys():
record_error(location=location, problem=f'Unknown feature with mask 0x{f:02x}')
continue
if cluster_id in ignore_features and f in ignore_features[cluster_id]:
continue
xml_feature = self.xml_clusters[cluster_id].features[f]
conformance_decision = xml_feature.conformance(feature_map, attribute_list, all_command_list)
if not conformance_allowed(conformance_decision, allow_provisional):
record_error(location=location, problem=f'Disallowed feature with mask 0x{f:02x}')
for feature_mask, xml_feature in self.xml_clusters[cluster_id].features.items():
if cluster_id in ignore_features and feature_mask in ignore_features[cluster_id]:
continue
conformance_decision = xml_feature.conformance(feature_map, attribute_list, all_command_list)
if conformance_decision == ConformanceDecision.MANDATORY and feature_mask not in feature_masks:
record_error(
location=location, problem=f'Required feature with mask 0x{f:02x} is not present in feature map. {conformance_str(xml_feature.conformance, feature_map, self.xml_clusters[cluster_id].features)}')
# Attribute conformance checking
for attribute_id, attribute in cluster.items():
if cluster_id in ignore_attributes and attribute_id in ignore_attributes[cluster_id]:
continue
location = AttributePathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, attribute_id=attribute_id)
if attribute_id not in self.xml_clusters[cluster_id].attributes.keys():
# TODO: Consolidate the range checks with IDM-10.1 once that lands
if attribute_id <= 0x4FFF:
# manufacturer attribute
record_error(location=location, problem='Standard attribute found on device, but not in spec')
continue
xml_attribute = self.xml_clusters[cluster_id].attributes[attribute_id]
conformance_decision = xml_attribute.conformance(feature_map, attribute_list, all_command_list)
if not conformance_allowed(conformance_decision, allow_provisional):
location = AttributePathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, attribute_id=attribute_id)
record_error(
location=location, problem=f'Attribute 0x{attribute_id:02x} is included, but is disallowed by conformance. {conformance_str(xml_attribute.conformance, feature_map, self.xml_clusters[cluster_id].features)}')
for attribute_id, xml_attribute in self.xml_clusters[cluster_id].attributes.items():
if cluster_id in ignore_attributes and attribute_id in ignore_attributes[cluster_id]:
continue
conformance_decision = xml_attribute.conformance(feature_map, attribute_list, all_command_list)
if conformance_decision == ConformanceDecision.MANDATORY and attribute_id not in cluster.keys():
location = AttributePathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, attribute_id=attribute_id)
record_error(
location=location, problem=f'Attribute 0x{attribute_id:02x} is required, but is not present on the DUT. {conformance_str(xml_attribute.conformance, feature_map, self.xml_clusters[cluster_id].features)}')
def check_spec_conformance_for_commands(command_type: CommandType):
global_attribute_id = GlobalAttributeIds.ACCEPTED_COMMAND_LIST_ID if command_type == CommandType.ACCEPTED else GlobalAttributeIds.GENERATED_COMMAND_LIST_ID
xml_commands_dict = self.xml_clusters[cluster_id].accepted_commands if command_type == CommandType.ACCEPTED else self.xml_clusters[cluster_id].generated_commands
command_list = cluster[global_attribute_id]
for command_id in command_list:
location = CommandPathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, command_id=command_id)
if command_id not in xml_commands_dict:
# TODO: Consolidate range checks with IDM-10.1 once that lands
if command_id <= 0xFF:
# manufacturer command
continue
record_error(location=location, problem='Standard command found on device, but not in spec')
continue
xml_command = xml_commands_dict[command_id]
conformance_decision = xml_command.conformance(feature_map, attribute_list, all_command_list)
if not conformance_allowed(conformance_decision, allow_provisional):
record_error(
location=location, problem=f'Command 0x{command_id:02x} is included, but disallowed by conformance. {conformance_str(xml_command.conformance, feature_map, self.xml_clusters[cluster_id].features)}')
for command_id, xml_command in xml_commands_dict.items():
conformance_decision = xml_command.conformance(feature_map, attribute_list, all_command_list)
if conformance_decision == ConformanceDecision.MANDATORY and command_id not in command_list:
location = CommandPathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, command_id=command_id)
record_error(
location=location, problem=f'Command 0x{command_id:02x} is required, but is not present on the DUT. {conformance_str(xml_command.conformance, feature_map, self.xml_clusters[cluster_id].features)}')
# Command conformance checking
check_spec_conformance_for_commands(CommandType.ACCEPTED)
check_spec_conformance_for_commands(CommandType.GENERATED)
# TODO: Add choice checkers
print(f'success = {success}')
return success, problems
def check_revisions(self, ignore_in_progress: bool):
problems = []
success = True
def record_problem(location, problem, severity):
problems.append(ProblemNotice("IDM-10.3", location, severity, problem, ""))
def record_error(location, problem):
nonlocal success
record_problem(location, problem, ProblemSeverity.ERROR)
success = False
def record_warning(location, problem):
record_problem(location, problem, ProblemSeverity.WARNING)
ignore_revisions: list[int] = []
if ignore_in_progress:
# This is a manually curated list of cluster revisions that are in-progress in the SDK, but have landed in the spec
in_progress_revisions = [Clusters.BasicInformation.id, Clusters.PowerSource.id, Clusters.NetworkCommissioning.id]
ignore_revisions.extend(in_progress_revisions)
for endpoint_id, endpoint in self.endpoints_tlv.items():
for cluster_id, cluster in endpoint.items():
if cluster_id not in self.xml_clusters.keys():
if (cluster_id & 0xFFFF_0000) != 0:
# manufacturer cluster
continue
location = ClusterPathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id)
# TODO: update this from a warning once we have all the data
record_warning(location=location, problem='Standard cluster found on device, but is not present in spec data')
continue
if cluster_id in ignore_revisions:
continue
if int(self.xml_clusters[cluster_id].revision) != cluster[GlobalAttributeIds.CLUSTER_REVISION_ID]:
location = AttributePathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id,
attribute_id=GlobalAttributeIds.CLUSTER_REVISION_ID)
record_error(
location=location, problem=f'Revision found on cluster ({cluster[GlobalAttributeIds.CLUSTER_REVISION_ID]}) does not match revision listed in the spec ({self.xml_clusters[cluster_id].revision})')
return success, problems
class TC_DeviceConformance(MatterBaseTest, DeviceConformanceTests):
@async_test_body
async def setup_class(self):
super().setup_class()
await self.setup_class_helper()
def test_TC_IDM_10_2(self):
ignore_in_progress = self.user_params.get("ignore_in_progress", False)
is_ci = self.check_pics('PICS_SDK_CI_ONLY')
success, problems = self.check_conformance(ignore_in_progress, is_ci)
self.problems.extend(problems)
if not success:
self.fail_current_test("Problems with conformance")
def test_TC_IDM_10_3(self):
ignore_in_progress = self.user_params.get("ignore_in_progress", False)
success, problems = self.check_revisions(ignore_in_progress)
self.problems.extend(problems)
if not success:
self.fail_current_test("Problems with cluster revision on at least one cluster")
if __name__ == "__main__":
default_matter_test_main()