blob: 69e2d2bb86d8ef322252d894a9f9549e21077ec7 [file] [log] [blame]
#
# Copyright (c) 2025 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from typing import Any, Optional
import chip.clusters as Clusters
from chip.testing.conformance import ConformanceDecision
from chip.testing.global_attribute_ids import GlobalAttributeIds
from chip.testing.spec_parsing import XmlCluster, XmlDeviceType
from chip.tlv import uint
def _is_mandatory(conformance, feature_map=0):
return conformance(feature_map, [], []).decision == ConformanceDecision.MANDATORY
def _get_field_by_label(cl_object: Clusters.ClusterObjects.ClusterObject, label: str) -> Optional[Clusters.ClusterObjects.ClusterObjectFieldDescriptor]:
for field in cl_object.descriptor.Fields:
if field.Label == label:
return field
return None
def create_minimal_cluster(xml_clusters: dict[uint, XmlCluster], cluster_id: int) -> dict[int, Any]:
attrs = {}
mandatory_features = [mask for mask, f in xml_clusters[cluster_id].features.items() if _is_mandatory(f.conformance)]
feature_map = 0
for mask in mandatory_features:
feature_map |= mask
mandatory_attributes = [id for id, a in xml_clusters[cluster_id].attributes.items()
if _is_mandatory(a.conformance, feature_map)]
for m in mandatory_attributes:
# dummy versions - we're not using the values in this test
attrs[m] = 0
attrs[GlobalAttributeIds.FEATURE_MAP_ID] = feature_map
attrs[GlobalAttributeIds.ATTRIBUTE_LIST_ID] = mandatory_attributes
mandatory_accepted_commands = [id for id, a in xml_clusters[cluster_id].accepted_commands.items()
if _is_mandatory(a.conformance, feature_map)]
attrs[GlobalAttributeIds.ACCEPTED_COMMAND_LIST_ID] = mandatory_accepted_commands
mandatory_generated_commands = [id for id, a in xml_clusters[cluster_id].generated_commands.items()
if _is_mandatory(a.conformance, feature_map)]
attrs[GlobalAttributeIds.GENERATED_COMMAND_LIST_ID] = mandatory_generated_commands
attrs[GlobalAttributeIds.CLUSTER_REVISION_ID] = xml_clusters[cluster_id].revision
return attrs
def create_minimal_dt(xml_clusters: dict[uint, XmlCluster], xml_device_types: dict[uint, XmlDeviceType], device_type_id: int, is_tlv_endpoint: bool = True) -> dict[int, dict[int, Any]]:
''' Creates the internals of an endpoint with the minimal set of clusters, with the minimal set of attributes and commands. Global attributes only.
Does NOT take into account overrides yet.
'''
endpoint = {}
required_servers = [id for id, c in xml_device_types[device_type_id].server_clusters.items()
if _is_mandatory(c.conformance)]
required_clients = [id for id, c in xml_device_types[device_type_id].client_clusters.items()
if _is_mandatory(c.conformance)]
device_type_revision = xml_device_types[device_type_id].revision
for s in required_servers:
endpoint[s] = create_minimal_cluster(xml_clusters, s)
# Descriptor
attr = Clusters.Descriptor.Attributes
structs = Clusters.Descriptor.Structs
attrs = {}
attributes = [
attr.FeatureMap,
attr.AcceptedCommandList,
attr.GeneratedCommandList,
attr.ClusterRevision,
attr.DeviceTypeList,
attr.ServerList,
attr.ClientList,
attr.PartsList,
]
attribute_values = [
(0, 0), # FeatureMap
([], []), # AcceptedCommandList
([], []), # GeneratedCommandList
(xml_clusters[Clusters.Descriptor.id].revision,
xml_clusters[Clusters.Descriptor.id].revision), # ClusterRevision
([{_get_field_by_label(structs.DeviceTypeStruct, "deviceType").Tag: device_type_id,
_get_field_by_label(structs.DeviceTypeStruct, "revision").Tag: device_type_revision}],
[Clusters.Descriptor.Structs.DeviceTypeStruct(
deviceType=device_type_id, revision=device_type_revision)]), # DeviceTypeList
(required_servers, required_servers), # ServerList
(required_clients, required_clients), # ClientList
([], []), # PartsList
]
for attribute_name, attribute_value in zip(attributes, attribute_values):
key = attribute_name.attribute_id if is_tlv_endpoint else attribute_name
attrs[key] = attribute_value[0] if is_tlv_endpoint else attribute_value[1]
# Append the attribute list now that is populated.
attrs[attr.AttributeList.attribute_id if is_tlv_endpoint else attr.AttributeList] = list(attrs.keys())
endpoint[Clusters.Descriptor.id if is_tlv_endpoint else Clusters.Descriptor] = attrs
return endpoint