blob: c64a3470d19ed1350997e38e81dc36285bdfc088 [file] [log] [blame]
#
# Copyright (c) 2023 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# See https://github.com/project-chip/connectedhomeip/blob/master/docs/testing/python.md#defining-the-ci-test-arguments
# for details about the block below.
#
# === BEGIN CI TEST ARGUMENTS ===
# test-runner-runs: run1
# test-runner-run/run1/app: ${CHIP_LOCK_APP}
# test-runner-run/run1/factoryreset: True
# test-runner-run/run1/quiet: True
# test-runner-run/run1/app-args: --discriminator 1234 --KVS kvs1 --trace-to json:${TRACE_APP}.json
# test-runner-run/run1/script-args: --storage-path admin_storage.json --manual-code 10054912339 --bool-arg ignore_in_progress:True allow_provisional:True --PICS src/app/tests/suites/certification/ci-pics-values --trace-to json:${TRACE_TEST_JSON}.json --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto --tests test_TC_IDM_10_2
# === END CI TEST ARGUMENTS ===
# TODO: Enable 10.5 in CI once the door lock OTA requestor problem is sorted.
from typing import Callable
import chip.clusters as Clusters
from basic_composition_support import BasicCompositionTests
from chip.tlv import uint
from choice_conformance_support import (evaluate_attribute_choice_conformance, evaluate_command_choice_conformance,
evaluate_feature_choice_conformance)
from conformance_support import ConformanceDecision, conformance_allowed
from global_attribute_ids import (ClusterIdType, DeviceTypeIdType, GlobalAttributeIds, cluster_id_type, device_type_id_type,
is_valid_device_type_id)
from matter_testing_support import (AttributePathLocation, ClusterPathLocation, CommandPathLocation, DeviceTypePathLocation,
MatterBaseTest, ProblemNotice, ProblemSeverity, async_test_body, default_matter_test_main)
from spec_parsing_support import CommandType, build_xml_clusters, build_xml_device_types
class DeviceConformanceTests(BasicCompositionTests):
async def setup_class_helper(self):
await super().setup_class_helper()
self.xml_clusters, self.problems = build_xml_clusters()
self.xml_device_types, problems = build_xml_device_types()
self.problems.extend(problems)
def check_conformance(self, ignore_in_progress: bool, is_ci: bool):
problems = []
success = True
def conformance_str(conformance: Callable, feature_map: uint, feature_dict: dict[str, uint]) -> str:
codes = []
for mask, details in feature_dict.items():
if mask & feature_map:
codes.append(details.code)
return f'Conformance: {str(conformance)}, implemented features: {",".join(codes)}'
def record_problem(location, problem, severity):
problems.append(ProblemNotice("IDM-10.2", location, severity, problem, ""))
def record_error(location, problem):
nonlocal success
record_problem(location, problem, ProblemSeverity.ERROR)
success = False
def record_warning(location, problem):
record_problem(location, problem, ProblemSeverity.WARNING)
ignore_attributes: dict[int, list[int]] = {}
if ignore_in_progress:
# This is a manually curated list of attributes that are in-progress in the SDK, but have landed in the spec
in_progress_attributes = {Clusters.BasicInformation.id: [0x15, 0x016],
Clusters.PowerSource.id: [0x20, 0x21, 0x22, 0x23, 0x24, 0x25, 0x26, 0x27, 0x28, 0x29, 0x2A]}
ignore_attributes.update(in_progress_attributes)
if is_ci:
# The network commissioning clusters on the CI select the features on the fly and end up non-conformant
# on these attributes. Production devices should not.
ci_ignore_attributes = {Clusters.NetworkCommissioning.id: [
Clusters.NetworkCommissioning.Attributes.ScanMaxTimeSeconds.attribute_id, Clusters.NetworkCommissioning.Attributes.ConnectMaxTimeSeconds.attribute_id]}
ignore_attributes.update(ci_ignore_attributes)
success = True
allow_provisional = self.user_params.get("allow_provisional", False)
# TODO: automate this once https://github.com/csa-data-model/projects/issues/454 is done.
provisional_cluster_ids = [Clusters.ContentControl.id, Clusters.ScenesManagement.id, Clusters.BallastConfiguration.id,
Clusters.EnergyPreference.id, Clusters.DeviceEnergyManagement.id, Clusters.DeviceEnergyManagementMode.id, Clusters.PulseWidthModulation.id,
Clusters.ProxyConfiguration.id, Clusters.ProxyDiscovery.id, Clusters.ProxyValid.id]
# TODO: Remove this once the latest 1.3 lands with the clusters removed from the DM XML and change the warning below about missing DM XMLs into a proper error
# These are clusters that weren't part of the 1.3 spec that landed in the SDK before the branch cut
provisional_cluster_ids.extend([Clusters.DemandResponseLoadControl.id])
# These clusters are zigbee only. I don't even know why they're part of the codegen, but we should get rid of them.
provisional_cluster_ids.extend([Clusters.BarrierControl.id, Clusters.OnOffSwitchConfiguration.id,
Clusters.BinaryInputBasic.id, Clusters.ElectricalMeasurement.id])
for endpoint_id, endpoint in self.endpoints_tlv.items():
for cluster_id, cluster in endpoint.items():
cluster_location = ClusterPathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id)
if not allow_provisional and cluster_id in provisional_cluster_ids:
record_error(location=cluster_location, problem='Provisional cluster found on device')
continue
if cluster_id not in self.xml_clusters.keys():
if (cluster_id & 0xFFFF_0000) != 0:
# manufacturer cluster
continue
# TODO: update this from a warning once we have all the data
record_warning(location=cluster_location,
problem='Standard cluster found on device, but is not present in spec data')
continue
feature_map = cluster[GlobalAttributeIds.FEATURE_MAP_ID]
attribute_list = cluster[GlobalAttributeIds.ATTRIBUTE_LIST_ID]
all_command_list = cluster[GlobalAttributeIds.ACCEPTED_COMMAND_LIST_ID] + \
cluster[GlobalAttributeIds.GENERATED_COMMAND_LIST_ID]
# Feature conformance checking
feature_masks = [1 << i for i in range(32) if feature_map & (1 << i)]
for f in feature_masks:
location = AttributePathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id,
attribute_id=GlobalAttributeIds.FEATURE_MAP_ID)
if f not in self.xml_clusters[cluster_id].features.keys():
record_error(location=location, problem=f'Unknown feature with mask 0x{f:02x}')
continue
xml_feature = self.xml_clusters[cluster_id].features[f]
conformance_decision_with_choice = xml_feature.conformance(feature_map, attribute_list, all_command_list)
if not conformance_allowed(conformance_decision_with_choice, allow_provisional):
record_error(location=location, problem=f'Disallowed feature with mask 0x{f:02x}')
for feature_mask, xml_feature in self.xml_clusters[cluster_id].features.items():
conformance_decision_with_choice = xml_feature.conformance(feature_map, attribute_list, all_command_list)
if conformance_decision_with_choice.decision == ConformanceDecision.MANDATORY and feature_mask not in feature_masks:
record_error(
location=location, problem=f'Required feature with mask 0x{f:02x} is not present in feature map. {conformance_str(xml_feature.conformance, feature_map, self.xml_clusters[cluster_id].features)}')
# Attribute conformance checking
for attribute_id, attribute in cluster.items():
if cluster_id in ignore_attributes and attribute_id in ignore_attributes[cluster_id]:
continue
location = AttributePathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, attribute_id=attribute_id)
if attribute_id not in self.xml_clusters[cluster_id].attributes.keys():
# TODO: Consolidate the range checks with IDM-10.1 once that lands
if attribute_id <= 0x4FFF:
# manufacturer attribute
record_error(location=location, problem='Standard attribute found on device, but not in spec')
continue
xml_attribute = self.xml_clusters[cluster_id].attributes[attribute_id]
conformance_decision_with_choice = xml_attribute.conformance(feature_map, attribute_list, all_command_list)
if not conformance_allowed(conformance_decision_with_choice, allow_provisional):
location = AttributePathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, attribute_id=attribute_id)
record_error(
location=location, problem=f'Attribute 0x{attribute_id:02x} is included, but is disallowed by conformance. {conformance_str(xml_attribute.conformance, feature_map, self.xml_clusters[cluster_id].features)}')
for attribute_id, xml_attribute in self.xml_clusters[cluster_id].attributes.items():
if cluster_id in ignore_attributes and attribute_id in ignore_attributes[cluster_id]:
continue
conformance_decision_with_choice = xml_attribute.conformance(feature_map, attribute_list, all_command_list)
if conformance_decision_with_choice.decision == ConformanceDecision.MANDATORY and attribute_id not in cluster.keys():
location = AttributePathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, attribute_id=attribute_id)
record_error(
location=location, problem=f'Attribute 0x{attribute_id:02x} is required, but is not present on the DUT. {conformance_str(xml_attribute.conformance, feature_map, self.xml_clusters[cluster_id].features)}')
def check_spec_conformance_for_commands(command_type: CommandType):
global_attribute_id = GlobalAttributeIds.ACCEPTED_COMMAND_LIST_ID if command_type == CommandType.ACCEPTED else GlobalAttributeIds.GENERATED_COMMAND_LIST_ID
xml_commands_dict = self.xml_clusters[cluster_id].accepted_commands if command_type == CommandType.ACCEPTED else self.xml_clusters[cluster_id].generated_commands
command_list = cluster[global_attribute_id]
for command_id in command_list:
location = CommandPathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, command_id=command_id)
if command_id not in xml_commands_dict:
# TODO: Consolidate range checks with IDM-10.1 once that lands
if command_id <= 0xFF:
# manufacturer command
continue
record_error(location=location, problem='Standard command found on device, but not in spec')
continue
xml_command = xml_commands_dict[command_id]
conformance_decision_with_choice = xml_command.conformance(feature_map, attribute_list, all_command_list)
if not conformance_allowed(conformance_decision_with_choice, allow_provisional):
record_error(
location=location, problem=f'Command 0x{command_id:02x} is included, but disallowed by conformance. {conformance_str(xml_command.conformance, feature_map, self.xml_clusters[cluster_id].features)}')
for command_id, xml_command in xml_commands_dict.items():
conformance_decision_with_choice = xml_command.conformance(feature_map, attribute_list, all_command_list)
if conformance_decision_with_choice.decision == ConformanceDecision.MANDATORY and command_id not in command_list:
location = CommandPathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, command_id=command_id)
record_error(
location=location, problem=f'Command 0x{command_id:02x} is required, but is not present on the DUT. {conformance_str(xml_command.conformance, feature_map, self.xml_clusters[cluster_id].features)}')
# Command conformance checking
check_spec_conformance_for_commands(CommandType.ACCEPTED)
check_spec_conformance_for_commands(CommandType.GENERATED)
feature_choice_problems = evaluate_feature_choice_conformance(
endpoint_id, cluster_id, self.xml_clusters, feature_map, attribute_list, all_command_list)
attribute_choice_problems = evaluate_attribute_choice_conformance(
endpoint_id, cluster_id, self.xml_clusters, feature_map, attribute_list, all_command_list)
command_choice_problem = evaluate_command_choice_conformance(
endpoint_id, cluster_id, self.xml_clusters, feature_map, attribute_list, all_command_list)
if feature_choice_problems or attribute_choice_problems or command_choice_problem:
success = False
problems.extend(feature_choice_problems + attribute_choice_problems + command_choice_problem)
print(f'success = {success}')
return success, problems
def check_revisions(self, ignore_in_progress: bool):
problems = []
success = True
def record_problem(location, problem, severity):
problems.append(ProblemNotice("IDM-10.3", location, severity, problem, ""))
def record_error(location, problem):
nonlocal success
record_problem(location, problem, ProblemSeverity.ERROR)
success = False
def record_warning(location, problem):
record_problem(location, problem, ProblemSeverity.WARNING)
ignore_revisions: list[int] = []
if ignore_in_progress:
# This is a manually curated list of cluster revisions that are in-progress in the SDK, but have landed in the spec
in_progress_revisions = [Clusters.BasicInformation.id, Clusters.PowerSource.id, Clusters.NetworkCommissioning.id]
ignore_revisions.extend(in_progress_revisions)
for endpoint_id, endpoint in self.endpoints_tlv.items():
for cluster_id, cluster in endpoint.items():
if cluster_id not in self.xml_clusters.keys():
if (cluster_id & 0xFFFF_0000) != 0:
# manufacturer cluster
continue
location = ClusterPathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id)
# TODO: update this from a warning once we have all the data
record_warning(location=location, problem='Standard cluster found on device, but is not present in spec data')
continue
if cluster_id in ignore_revisions:
continue
if int(self.xml_clusters[cluster_id].revision) != cluster[GlobalAttributeIds.CLUSTER_REVISION_ID]:
location = AttributePathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id,
attribute_id=GlobalAttributeIds.CLUSTER_REVISION_ID)
record_error(
location=location, problem=f'Revision found on cluster ({cluster[GlobalAttributeIds.CLUSTER_REVISION_ID]}) does not match revision listed in the spec ({self.xml_clusters[cluster_id].revision})')
return success, problems
def check_device_type(self, fail_on_extra_clusters: bool = True, allow_provisional: bool = False) -> tuple[bool, list[ProblemNotice]]:
success = True
problems = []
def record_problem(location, problem, severity):
problems.append(ProblemNotice("IDM-10.5", location, severity, problem, ""))
def record_error(location, problem):
nonlocal success
record_problem(location, problem, ProblemSeverity.ERROR)
success = False
def record_warning(location, problem):
record_problem(location, problem, ProblemSeverity.WARNING)
for endpoint_id, endpoint in self.endpoints.items():
if Clusters.Descriptor not in endpoint:
location = ClusterPathLocation(endpoint_id=endpoint_id, cluster_id=Clusters.Descriptor.id)
record_error(location=location, problem='No descriptor cluster found on endpoint')
continue
device_type_list = endpoint[Clusters.Descriptor][Clusters.Descriptor.Attributes.DeviceTypeList]
invalid_device_types = [x for x in device_type_list if not is_valid_device_type_id(device_type_id_type(x.deviceType))]
standard_device_types = [x for x in endpoint[Clusters.Descriptor]
[Clusters.Descriptor.Attributes.DeviceTypeList] if device_type_id_type(x.deviceType) == DeviceTypeIdType.kStandard]
endpoint_clusters = []
server_clusters = []
for device_type in invalid_device_types:
location = DeviceTypePathLocation(device_type_id=device_type.deviceType)
record_error(location=location, problem='Invalid device type ID (out of valid range)')
for device_type in standard_device_types:
device_type_id = device_type.deviceType
location = DeviceTypePathLocation(device_type_id=device_type_id)
if device_type_id not in self.xml_device_types.keys():
record_error(location=location, problem='Unknown device type ID in standard range')
continue
if device_type_id not in self.xml_device_types.keys():
location = DeviceTypePathLocation(device_type_id=device_type_id)
record_error(location=location, problem='Unknown device type')
continue
# TODO: check revision. Possibly in another test?
xml_device = self.xml_device_types[device_type_id]
# IDM 10.1 checks individual clusters for validity,
# so here we can ignore checks for invalid and manufacturer clusters.
server_clusters = [x for x in endpoint[Clusters.Descriptor]
[Clusters.Descriptor.Attributes.ServerList] if cluster_id_type(x) == ClusterIdType.kStandard]
# As a start, we are only checking server clusters
# TODO: check client clusters too?
for cluster_id, cluster_requirement in xml_device.server_clusters.items():
# Device type cluster conformances do not include any conformances based on cluster elements
conformance_decision_with_choice = cluster_requirement.conformance(0, [], [])
location = DeviceTypePathLocation(device_type_id=device_type_id, cluster_id=cluster_id)
if conformance_decision_with_choice.decision == ConformanceDecision.MANDATORY and cluster_id not in server_clusters:
record_error(location=location,
problem=f"Mandatory cluster {cluster_requirement.name} for device type {xml_device.name} is not present in the server list")
success = False
if cluster_id in server_clusters and not conformance_allowed(conformance_decision_with_choice, allow_provisional):
record_error(location=location,
problem=f"Disallowed cluster {cluster_requirement.name} found in server list for device type {xml_device.name}")
success = False
# If we want to check for extra clusters on the endpoint, we need to know the entire set of clusters in all the device type
# lists across all the device types on the endpoint.
endpoint_clusters += xml_device.server_clusters.keys()
if fail_on_extra_clusters:
fn = record_error
else:
fn = record_warning
extra_clusters = set(server_clusters) - set(endpoint_clusters)
for extra in extra_clusters:
location = ClusterPathLocation(endpoint_id=endpoint_id, cluster_id=extra)
fn(location=location, problem=f"Extra cluster found on endpoint with device types {device_type_list}")
return success, problems
class TC_DeviceConformance(MatterBaseTest, DeviceConformanceTests):
@async_test_body
async def setup_class(self):
super().setup_class()
await self.setup_class_helper()
def test_TC_IDM_10_2(self):
ignore_in_progress = self.user_params.get("ignore_in_progress", False)
is_ci = self.check_pics('PICS_SDK_CI_ONLY')
success, problems = self.check_conformance(ignore_in_progress, is_ci)
self.problems.extend(problems)
if not success:
self.fail_current_test("Problems with conformance")
def test_TC_IDM_10_3(self):
ignore_in_progress = self.user_params.get("ignore_in_progress", False)
success, problems = self.check_revisions(ignore_in_progress)
self.problems.extend(problems)
if not success:
self.fail_current_test("Problems with cluster revision on at least one cluster")
def test_TC_IDM_10_5(self):
fail_on_extra_clusters = self.user_params.get("fail_on_extra_clusters", True)
success, problems = self.check_device_type(fail_on_extra_clusters)
self.problems.extend(problems)
if not success:
self.fail_current_test("Problems with Device type conformance on one or more endpoints")
if __name__ == "__main__":
default_matter_test_main()