Convert former python yaml tests parser to be a runner/adapter (#24103)
* Convert former python yaml tests parser to be a runner/adapter
A common intermediary python YAML test parser has been created. This
removes all the YAML python parsing bits from the current chip-repl
version. It then turns that former parse into a runner that converts
TestStep actions from the common parsed structure to something that
chip-repl can execute. The responses from the accessory are then
translated back to something the common parser expectes to validate
the response.
* Address PR comments
diff --git a/scripts/tests/yamltests/fixes.py b/scripts/tests/yamltests/fixes.py
index a0126b0..676739d 100644
--- a/scripts/tests/yamltests/fixes.py
+++ b/scripts/tests/yamltests/fixes.py
@@ -54,8 +54,6 @@
return value
-# TODO(thampson) This method is a clone of the method in
-# src/controller/python/chip/yaml/format_converter.py and should eventually be removed in that file.
def convert_yaml_octet_string_to_bytes(s: str) -> bytes:
'''Convert YAML octet string body to bytes.
diff --git a/src/controller/python/BUILD.gn b/src/controller/python/BUILD.gn
index a0381f0..4f4c09c 100644
--- a/src/controller/python/BUILD.gn
+++ b/src/controller/python/BUILD.gn
@@ -230,12 +230,10 @@
"chip/utils/CommissioningBuildingBlocks.py",
"chip/utils/__init__.py",
"chip/yaml/__init__.py",
- "chip/yaml/constraints.py",
"chip/yaml/data_model_lookup.py",
"chip/yaml/errors.py",
"chip/yaml/format_converter.py",
- "chip/yaml/parser.py",
- "chip/yaml/variable_storage.py",
+ "chip/yaml/runner.py",
]
if (chip_controller) {
diff --git a/src/controller/python/chip/yaml/__init__.py b/src/controller/python/chip/yaml/__init__.py
index 055bec9..08850e2 100644
--- a/src/controller/python/chip/yaml/__init__.py
+++ b/src/controller/python/chip/yaml/__init__.py
@@ -20,4 +20,4 @@
# Provides Python APIs for Matter.
"""Provides yaml parser Python APIs for Matter."""
-from . import parser
+from . import runner
diff --git a/src/controller/python/chip/yaml/constraints.py b/src/controller/python/chip/yaml/constraints.py
deleted file mode 100644
index 49ac8c9..0000000
--- a/src/controller/python/chip/yaml/constraints.py
+++ /dev/null
@@ -1,221 +0,0 @@
-#
-# Copyright (c) 2022 Project CHIP Authors
-# All rights reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from abc import ABC, abstractmethod
-import chip.yaml.format_converter as Converter
-from .variable_storage import VariableStorage
-
-
-class ConstraintValidationError(Exception):
- def __init__(self, message):
- super().__init__(message)
-
-
-class BaseConstraint(ABC):
- '''Constrain Interface'''
-
- @abstractmethod
- def is_met(self, response) -> bool:
- pass
-
-
-class _LoadableConstraint(BaseConstraint):
- '''Constraints where value might be stored in VariableStorage needing runtime load.'''
-
- def __init__(self, value, field_type, variable_storage: VariableStorage, config_values: dict):
- self._variable_storage = variable_storage
- # When not none _indirect_value_key is binding a name to the constraint value, and the
- # actual value can only be looked-up dynamically, which is why this is a key name.
- self._indirect_value_key = None
- self._value = None
-
- if value is None:
- # Default values set above is all we need here.
- return
-
- if isinstance(value, str) and self._variable_storage.is_key_saved(value):
- self._indirect_value_key = value
- else:
- self._value = Converter.parse_and_convert_yaml_value(
- value, field_type, config_values)
-
- def get_value(self):
- '''Gets the current value of the constraint.
-
- This method accounts for getting the runtime saved value from DUT previous responses.
- '''
- if self._indirect_value_key:
- return self._variable_storage.load(self._indirect_value_key)
- return self._value
-
-
-class _ConstraintHasValue(BaseConstraint):
- def __init__(self, has_value):
- self._has_value = has_value
-
- def is_met(self, response) -> bool:
- raise ConstraintValidationError('HasValue constraint currently not implemented')
-
-
-class _ConstraintType(BaseConstraint):
- def __init__(self, type):
- self._type = type
-
- def is_met(self, response) -> bool:
- raise ConstraintValidationError('Type constraint currently not implemented')
-
-
-class _ConstraintStartsWith(BaseConstraint):
- def __init__(self, starts_with):
- self._starts_with = starts_with
-
- def is_met(self, response) -> bool:
- return response.startswith(self._starts_with)
-
-
-class _ConstraintEndsWith(BaseConstraint):
- def __init__(self, ends_with):
- self._ends_with = ends_with
-
- def is_met(self, response) -> bool:
- return response.endswith(self._ends_with)
-
-
-class _ConstraintIsUpperCase(BaseConstraint):
- def __init__(self, is_upper_case):
- self._is_upper_case = is_upper_case
-
- def is_met(self, response) -> bool:
- return response.isupper() == self._is_upper_case
-
-
-class _ConstraintIsLowerCase(BaseConstraint):
- def __init__(self, is_lower_case):
- self._is_lower_case = is_lower_case
-
- def is_met(self, response) -> bool:
- return response.islower() == self._is_lower_case
-
-
-class _ConstraintMinValue(_LoadableConstraint):
- def __init__(self, min_value, field_type, variable_storage: VariableStorage,
- config_values: dict):
- super().__init__(min_value, field_type, variable_storage, config_values)
-
- def is_met(self, response) -> bool:
- min_value = self.get_value()
- return response >= min_value
-
-
-class _ConstraintMaxValue(_LoadableConstraint):
- def __init__(self, max_value, field_type, variable_storage: VariableStorage,
- config_values: dict):
- super().__init__(max_value, field_type, variable_storage, config_values)
-
- def is_met(self, response) -> bool:
- max_value = self.get_value()
- return response <= max_value
-
-
-class _ConstraintContains(BaseConstraint):
- def __init__(self, contains):
- self._contains = contains
-
- def is_met(self, response) -> bool:
- return set(self._contains).issubset(response)
-
-
-class _ConstraintExcludes(BaseConstraint):
- def __init__(self, excludes):
- self._excludes = excludes
-
- def is_met(self, response) -> bool:
- return set(self._excludes).isdisjoint(response)
-
-
-class _ConstraintHasMaskSet(BaseConstraint):
- def __init__(self, has_masks_set):
- self._has_masks_set = has_masks_set
-
- def is_met(self, response) -> bool:
- return all([(response & mask) == mask for mask in self._has_masks_set])
-
-
-class _ConstraintHasMaskClear(BaseConstraint):
- def __init__(self, has_masks_clear):
- self._has_masks_clear = has_masks_clear
-
- def is_met(self, response) -> bool:
- return all([(response & mask) == 0 for mask in self._has_masks_clear])
-
-
-class _ConstraintNotValue(_LoadableConstraint):
- def __init__(self, not_value, field_type, variable_storage: VariableStorage,
- config_values: dict):
- super().__init__(not_value, field_type, variable_storage, config_values)
-
- def is_met(self, response) -> bool:
- not_value = self.get_value()
- return response != not_value
-
-
-def get_constraints(constraints, field_type, variable_storage: VariableStorage,
- config_values: dict) -> list[BaseConstraint]:
- _constraints = []
- if 'hasValue' in constraints:
- _constraints.append(_ConstraintHasValue(constraints.get('hasValue')))
-
- if 'type' in constraints:
- _constraints.append(_ConstraintType(constraints.get('type')))
-
- if 'startsWith' in constraints:
- _constraints.append(_ConstraintStartsWith(constraints.get('startsWith')))
-
- if 'endsWith' in constraints:
- _constraints.append(_ConstraintEndsWith(constraints.get('endsWith')))
-
- if 'isUpperCase' in constraints:
- _constraints.append(_ConstraintIsUpperCase(constraints.get('isUpperCase')))
-
- if 'isLowerCase' in constraints:
- _constraints.append(_ConstraintIsLowerCase(constraints.get('isLowerCase')))
-
- if 'minValue' in constraints:
- _constraints.append(_ConstraintMinValue(
- constraints.get('minValue'), field_type, variable_storage, config_values))
-
- if 'maxValue' in constraints:
- _constraints.append(_ConstraintMaxValue(
- constraints.get('maxValue'), field_type, variable_storage, config_values))
-
- if 'contains' in constraints:
- _constraints.append(_ConstraintContains(constraints.get('contains')))
-
- if 'excludes' in constraints:
- _constraints.append(_ConstraintExcludes(constraints.get('excludes')))
-
- if 'hasMasksSet' in constraints:
- _constraints.append(_ConstraintHasMaskSet(constraints.get('hasMasksSet')))
-
- if 'hasMasksClear' in constraints:
- _constraints.append(_ConstraintHasMaskClear(constraints.get('hasMasksClear')))
-
- if 'notValue' in constraints:
- _constraints.append(_ConstraintNotValue(
- constraints.get('notValue'), field_type, variable_storage, config_values))
-
- return _constraints
diff --git a/src/controller/python/chip/yaml/data_model_lookup.py b/src/controller/python/chip/yaml/data_model_lookup.py
index 1e0c3ff..215ac8f 100644
--- a/src/controller/python/chip/yaml/data_model_lookup.py
+++ b/src/controller/python/chip/yaml/data_model_lookup.py
@@ -32,6 +32,10 @@
def get_attribute(self, cluster: str, attribute: str):
pass
+ @abstractmethod
+ def get_event(self, cluster: str, event: str):
+ pass
+
class PreDefinedDataModelLookup(DataModelLookup):
def get_cluster(self, cluster: str):
@@ -53,3 +57,10 @@
return getattr(attributes, attribute, None)
except AttributeError:
return None
+
+ def get_event(self, cluster: str, event: str):
+ try:
+ events = getattr(Clusters, cluster, None).Events
+ return getattr(events, event, None)
+ except AttributeError:
+ return None
diff --git a/src/controller/python/chip/yaml/errors.py b/src/controller/python/chip/yaml/errors.py
index e6c9012..092128f 100644
--- a/src/controller/python/chip/yaml/errors.py
+++ b/src/controller/python/chip/yaml/errors.py
@@ -20,7 +20,7 @@
super().__init__(message)
-class UnexpectedParsingError(ParsingError):
+class UnexpectedParsingError(ValueError):
def __init__(self, message):
super().__init__(message)
diff --git a/src/controller/python/chip/yaml/format_converter.py b/src/controller/python/chip/yaml/format_converter.py
index fc3c5a1..ce2edfd 100644
--- a/src/controller/python/chip/yaml/format_converter.py
+++ b/src/controller/python/chip/yaml/format_converter.py
@@ -20,73 +20,88 @@
from chip.tlv import uint, float32
import enum
from chip.yaml.errors import ValidationError
-import binascii
-def substitute_in_config_variables(field_value, config_values: dict):
- ''' Substitutes values that are config variables.
+def _case_insensitive_getattr(object, attr_name, default):
+ for attr in dir(object):
+ if attr.lower() == attr_name.lower():
+ return getattr(object, attr)
+ return default
- YAML values can contain a string of a configuration variable name. In these instances we
- substitute the configuration variable name with the actual value.
- For examples see unittest src/controller/python/test/unit_tests/test_yaml_format_converter.py
+def _get_target_type_fields(test_spec_definition, cluster_name, target_name):
+ element = test_spec_definition.get_type_by_name(cluster_name, target_name)
+ if hasattr(element, 'fields'):
+ return element.fields
+ return None
- # TODO This should also substitue any saveAs values as well as perform any required
- # evaluations.
+
+def from_data_model_to_test_definition(test_spec_definition, cluster_name, response_definition,
+ response_value):
+ '''Converts value from data model to definitions provided in test_spec_definition.
Args:
- 'field_value': Value as extracted from YAML.
- 'config_values': Dictionary of global configuration variables.
- Returns:
- Value with all global configuration variables substituted with the real value.
+ 'test_spec_definition': The spec cluster definition used by the test parser.
+ 'cluster_name': Used when we need to look up information in 'test_spec_definition'.
+ 'response_definition': Type we are converting 'response_value' to. This will be one of
+ two types: list[idl.matter_idl_types.Field] or idl.matter_idl_types.Field
+ 'response_value': Response value that we want to convert to
'''
- if isinstance(field_value, dict):
- return {key: substitute_in_config_variables(
- field_value[key], config_values) for key in field_value}
- if isinstance(field_value, list):
- return [substitute_in_config_variables(item, config_values) for item in field_value]
- if isinstance(field_value, str) and field_value in config_values:
- config_value = config_values[field_value]
- if isinstance(config_value, dict) and 'defaultValue' in config_value:
- # TODO currently we don't validate that if config_value['type'] is provided
- # that the type does in fact match our expectation.
- return config_value['defaultValue']
- return config_values[field_value]
+ if response_value is None:
+ return response_value
- return field_value
+ # We first check to see if response_definition is list[idl.matter_idl_types.Field]. When we
+ # have list[idl.matter_idl_types.Field] that means we have a structure with multiple fields
+ # that need to be worked through recursively to properly convert the value to the right type.
+ if isinstance(response_definition, list):
+ rv = {}
+ for item in response_definition:
+ value = _case_insensitive_getattr(response_value, item.name, None)
+ if item.is_optional and value is None:
+ continue
+ rv[item.name] = from_data_model_to_test_definition(test_spec_definition, cluster_name,
+ item, value)
+ return rv
+
+ # We convert uint to python int because constraints first check that it is an expected type.
+ response_value_type = type(response_value)
+ if response_value_type == uint:
+ return int(response_value)
+
+ if response_definition is None:
+ return response_value
+
+ if response_value is NullValue:
+ return None
+
+ # For single float values types there seems to be a floating precision issue. By using '%g'
+ # it naturally give 6 most significat digits for us which is the amount of prcision we are
+ # looking for to give parity results to what chip-tool was getting (For TestCluster.yaml it
+ # give value back of `0.100000`.
+ if response_value_type == float32 and response_definition.data_type.name.lower() == 'single':
+ return float('%g' % response_value)
+
+ response_sub_definition = _get_target_type_fields(test_spec_definition, cluster_name,
+ response_definition.data_type.name)
+
+ # Check below is to see if the field itself is an array, for example array of ints.
+ if response_definition.is_list:
+ return [
+ from_data_model_to_test_definition(test_spec_definition, cluster_name,
+ response_sub_definition, item) for item in response_value
+ ]
+
+ return from_data_model_to_test_definition(test_spec_definition, cluster_name,
+ response_sub_definition, response_value)
-def convert_yaml_octet_string_to_bytes(s: str) -> bytes:
- '''Convert YAML octet string body to bytes.
+def convert_list_of_name_value_pair_to_dict(arg_values):
+ '''Converts list of dict with items with keys 'name' and 'value' into single dict.
- Included handling any c-style hex escapes (e.g. \x5a) and 'hex:' prefix.
- '''
- # Step 1: handle explicit "hex:" prefix
- if s.startswith('hex:'):
- return binascii.unhexlify(s[4:])
-
- # Step 2: convert non-hex-prefixed to bytes
- # TODO(#23669): This does not properly support utf8 octet strings. We mimic
- # javascript codegen behavior. Behavior of javascript is:
- # * Octet string character >= u+0200 errors out.
- # * Any character greater than 0xFF has the upper bytes chopped off.
- as_bytes = [ord(c) for c in s]
-
- if any([value > 0x200 for value in as_bytes]):
- raise ValueError('Unsupported char in octet string %r' % as_bytes)
- accumulated_hex = ''.join([f"{(v & 0xFF):02x}" for v in as_bytes])
- return binascii.unhexlify(accumulated_hex)
-
-
-def convert_name_value_pair_to_dict(arg_values):
- ''' Fix yaml command arguments.
-
- For some reason, instead of treating the entire data payload of a
- command as a singular struct, the top-level args are specified as 'name'
- and 'value' pairs, while the payload of each argument is itself
- correctly encapsulated. This fixes up this oddity to create a new
- key/value pair with the key being the value of the 'name' field, and
- the value being 'value' field.
+ The test step contains a list of arguments that have multiple properties other than
+ 'name' and 'value'. For the purposes of executing a test all these other attributes are not
+ important. We only want a simple dictionary of a new key/value where with the key being the
+ value of the 'name' field, and the value being 'value' field.
'''
ret_value = {}
@@ -96,21 +111,16 @@
return ret_value
-def convert_yaml_type(field_value, field_type, inline_cast_dict_to_struct):
- ''' Converts yaml value to provided pythonic type.
+def convert_to_data_model_type(field_value, field_type):
+ '''Converts value to provided data model pythonic object type.
- The YAML representation when converted to a dictionary does not line up to
- the python type data model for the various command/attribute/event object
- types. This function converts 'field_value' to the appropriate provided
+ The values provided by parser does not line up to the python data model for the various
+ command/attribute/event object types. This function converts 'field_value' to the provided
'field_type'.
Args:
- 'field_value': Value as extracted from yaml
- 'field_type': Pythonic command/attribute/event object type that we
- are converting value to.
- 'inline_cast_dict_to_struct': If true, for any dictionary 'field_value'
- types provided we will do a convertion to the corresponding data
- model class in `field_type` by doing field_type.FromDict(...).
+ 'field_value': Value as extracted by YAML parser.
+ 'field_type': Pythonic command/attribute/event object type that we are converting value to.
'''
origin = typing.get_origin(field_type)
@@ -152,10 +162,8 @@
raise ValidationError(
f'Did not find field "{item}" in {str(field_type)}') from None
- return_field_value[field_descriptor.Label] = convert_yaml_type(
- field_value[item], field_descriptor.Type, inline_cast_dict_to_struct)
- if inline_cast_dict_to_struct:
- return field_type.FromDict(return_field_value)
+ return_field_value[field_descriptor.Label] = convert_to_data_model_type(
+ field_value[item], field_descriptor.Type)
return return_field_value
elif(type(field_value) is float):
return float32(field_value)
@@ -165,8 +173,7 @@
# The field type passed in is the type of the list element and not list[T].
for idx, item in enumerate(field_value):
- field_value[idx] = convert_yaml_type(item, list_element_type,
- inline_cast_dict_to_struct)
+ field_value[idx] = convert_to_data_model_type(item, list_element_type)
return field_value
# YAML conversion treats all numbers as ints. Convert to a uint type if the schema
# type indicates so.
@@ -177,31 +184,6 @@
# YAML treats enums as ints. Convert to the typed enum class.
elif (issubclass(field_type, enum.Enum)):
return field_type(field_value)
- # YAML treats bytes as strings. Convert to a byte string.
- elif (field_type == bytes and type(field_value) != bytes):
- return convert_yaml_octet_string_to_bytes(field_value)
# By default, just return the field_value casted to field_type.
else:
return field_type(field_value)
-
-
-def parse_and_convert_yaml_value(field_value, field_type, config_values: dict,
- inline_cast_dict_to_struct: bool = False):
- ''' Parse and converts YAML type
-
- Parsing the YAML value means performing required substitutions and evaluations. Parsing is
- then followed by converting from the YAML type done using yaml.safe_load() to the type used in
- the various command/attribute/event object data model types.
-
- Args:
- 'field_value': Value as extracted from yaml to be parsed
- 'field_type': Pythonic command/attribute/event object type that we
- are converting value to.
- 'config_values': Dictionary of global configuration variables.
- 'inline_cast_dict_to_struct': If true, for any dictionary 'field_value'
- types provided we will do an inline convertion to the corresponding
- struct in `field_type` by doing field_type.FromDict(...).
- '''
- field_value_with_config_variables = substitute_in_config_variables(field_value, config_values)
- return convert_yaml_type(field_value_with_config_variables, field_type,
- inline_cast_dict_to_struct)
diff --git a/src/controller/python/chip/yaml/parser.py b/src/controller/python/chip/yaml/parser.py
deleted file mode 100644
index b70a743..0000000
--- a/src/controller/python/chip/yaml/parser.py
+++ /dev/null
@@ -1,467 +0,0 @@
-#
-# Copyright (c) 2022 Project CHIP Authors
-# All rights reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from abc import ABC, abstractmethod
-from dataclasses import dataclass, field
-from chip import ChipDeviceCtrl
-from chip.tlv import float32
-import yaml
-import stringcase
-import chip.interaction_model
-import asyncio as asyncio
-import logging
-import math
-from chip.yaml.errors import ParsingError, UnexpectedParsingError
-from .data_model_lookup import *
-import chip.yaml.format_converter as Converter
-from .variable_storage import VariableStorage
-from .constraints import get_constraints
-
-_SUCCESS_STATUS_CODE = "SUCCESS"
-_NODE_ID_DEFAULT = 0x12345
-_ENDPOINT_DETAULT = '' # TODO why is this an empty string
-_CLUSTER_DEFAULT = ''
-_TIMEOUT_DEFAULT = 90
-logger = logging.getLogger('YamlParser')
-
-
-@dataclass
-class _ExecutionContext:
- ''' Objects that is commonly passed around this file that are vital to test execution.'''
- # Data model lookup to get python attribute, cluster, command object.
- data_model_lookup: DataModelLookup = None
- # Where various test action response are stored and loaded from.
- variable_storage: VariableStorage = None
- # Top level configuration values for a yaml test.
- config_values: dict = None
-
-
-class _VariableToSave:
- def __init__(self, variable_name: str, variable_storage: VariableStorage):
- self._variable_name = variable_name
- self._variable_storage = variable_storage
- self._variable_storage.save(self._variable_name, None)
-
- def save_response(self, value):
- self._variable_storage.save(self._variable_name, value)
-
-
-class _ExpectedResponse:
- def __init__(self, value, response_type, context: _ExecutionContext):
- self._load_expected_response_in_verify = None
- self._expected_response_type = response_type
- self._expected_response = None
- self._variable_storage = context.variable_storage
- if isinstance(value, str) and self._variable_storage.is_key_saved(value):
- self._load_expected_response_in_verify = value
- else:
- self._expected_response = Converter.parse_and_convert_yaml_value(
- value, response_type, context.config_values, inline_cast_dict_to_struct=True)
-
- def verify(self, response):
- if (self._expected_response_type is None):
- return True
-
- if self._load_expected_response_in_verify is not None:
- self._expected_response = self._variable_storage.load(
- self._load_expected_response_in_verify)
-
- if isinstance(self._expected_response_type, float32):
- if not math.isclose(self._expected_response, response, rel_tol=1e-6):
- logger.error(f"Expected response {self._expected_response} didn't match "
- f"actual object {response}")
- return False
-
- if (self._expected_response != response):
- logger.error(f"Expected response {self._expected_response} didn't match "
- f"actual object {response}")
- return False
- return True
-
-
-class BaseAction(ABC):
- '''Interface for a single yaml action that is to be executed.'''
-
- def __init__(self, label):
- self._label = label
-
- @property
- def label(self):
- return self._label
-
- @abstractmethod
- def run_action(self, dev_ctrl: ChipDeviceCtrl, endpoint: int, node_id: int):
- pass
-
-
-class InvokeAction(BaseAction):
- '''Single invoke action to be executed including validation of response.'''
-
- def __init__(self, item: dict, cluster: str, context: _ExecutionContext):
- '''Parse cluster invoke from yaml test configuration.
-
- Args:
- 'item': Dictionary containing single invoke to be parsed.
- 'cluster': Name of cluster which to invoke action is targeting.
- 'context': Contains test-wide common objects such as DataModelLookup instance, storage
- for device responses and top level test configurations variable.
- Raises:
- ParsingError: Raised if there is a benign error, and there is currently no
- action to perform for this write attribute.
- UnexpectedParsingError: Raised if there is an unexpected parsing error.
- '''
- super().__init__(item['label'])
- self._command_name = stringcase.pascalcase(item['command'])
- self._cluster = cluster
- self._request_object = None
- self._expected_raw_response: dict = field(default_factory=dict)
- self._expected_response_object = None
-
- command = context.data_model_lookup.get_command(
- self._cluster, self._command_name)
-
- if command is None:
- raise ParsingError(
- f'Failed to find cluster:{self._cluster} Command:{self._command_name}')
-
- command_object = command()
- if (item.get('arguments')):
- args = item['arguments']['values']
-
- request_data_as_dict = Converter.convert_name_value_pair_to_dict(args)
-
- try:
- request_data = Converter.parse_and_convert_yaml_value(
- request_data_as_dict, type(command_object), context.config_values)
- except ValueError:
- raise ParsingError('Could not covert yaml type')
-
- # Create a cluster object for the request from the provided YAML data.
- self._request_object = command_object.FromDict(request_data)
- else:
- self._request_object = command_object
-
- self._expected_raw_response = item.get('response')
-
- if (self._request_object.response_type is not None and
- self._expected_raw_response is not None and
- self._expected_raw_response.get('values')):
- response_type = stringcase.pascalcase(self._request_object.response_type)
- expected_command = context.data_model_lookup.get_command(self._cluster,
- response_type)
- expected_response_args = self._expected_raw_response['values']
- expected_response_data_as_dict = Converter.convert_name_value_pair_to_dict(
- expected_response_args)
- expected_response_data = Converter.parse_and_convert_yaml_value(
- expected_response_data_as_dict, expected_command, context.config_values)
- self._expected_response_object = expected_command.FromDict(expected_response_data)
-
- def run_action(self, dev_ctrl: ChipDeviceCtrl, endpoint: int, node_id: int):
- try:
- resp = asyncio.run(dev_ctrl.SendCommand(node_id, endpoint, self._request_object))
- except chip.interaction_model.InteractionModelError:
- if self._expected_raw_response is None:
- raise
-
- expected_status_code = self._expected_raw_response.get('error')
- if expected_status_code is not None and expected_status_code != _SUCCESS_STATUS_CODE:
- logger.debug('Got error response, but was expected')
- else:
- raise
-
- if (self._expected_response_object is not None):
- if (self._expected_response_object != resp):
- logger.error(f'Expected response {self._expected_response_object} did not match '
- f'actual object {resp}')
-
-
-class ReadAttributeAction(BaseAction):
- '''Single read attribute action to be executed including validation.'''
-
- def __init__(self, item: dict, cluster: str, context: _ExecutionContext):
- '''Parse read attribute action from yaml test configuration.
-
- Args:
- 'item': Dictionary contains single read attribute action to be parsed.
- 'cluster': Name of cluster read attribute action is targeting.
- 'context': Contains test-wide common objects such as DataModelLookup instance, storage
- for device responses and top level test configurations variable.
- Raises:
- ParsingError: Raised if there is a benign error, and there is currently no
- action to perform for this read attribute.
- UnexpectedParsingError: Raised if there is an unexpected parsing error.
- '''
- super().__init__(item['label'])
- self._attribute_name = stringcase.pascalcase(item['attribute'])
- self._constraints = []
- self._cluster = cluster
- self._cluster_object = None
- self._request_object = None
- self._expected_raw_response: dict = field(default_factory=dict)
- self._expected_response: _ExpectedResponse = None
- self._possibly_unsupported = False
- self._variable_to_save = None
-
- self._cluster_object = context.data_model_lookup.get_cluster(self._cluster)
- if self._cluster_object is None:
- raise UnexpectedParsingError(
- f'ReadAttribute failed to find cluster object:{self._cluster}')
-
- self._request_object = context.data_model_lookup.get_attribute(
- self._cluster, self._attribute_name)
- if self._request_object is None:
- raise ParsingError(
- f'ReadAttribute failed to find cluster:{self._cluster} '
- f'Attribute:{self._attribute_name}')
-
- if (item.get('arguments')):
- raise UnexpectedParsingError(
- f'ReadAttribute should not contain arguments. {self.label}')
-
- if self._request_object.attribute_type is None:
- raise UnexpectedParsingError(
- f'ReadAttribute doesnt have valid attribute_type. {self.label}')
-
- if 'optional' in item:
- self._possibly_unsupported = True
-
- self._expected_raw_response = item.get('response')
- if (self._expected_raw_response is None):
- # TODO actually if response is missing it typically means that we need to confirm
- # that we got a successful response. This will be implemented later to consider all
- # possible corner cases around that (if there are corner cases).
- raise UnexpectedParsingError(f'ReadAttribute missing expected response. {self.label}')
-
- variable_name = self._expected_raw_response.get('saveAs')
- if variable_name:
- self._variable_to_save = _VariableToSave(variable_name, context.variable_storage)
-
- if 'value' in self._expected_raw_response:
- expected_response_value = self._expected_raw_response['value']
- self._expected_response = _ExpectedResponse(expected_response_value,
- self._request_object.attribute_type.Type,
- context)
-
- constraints = self._expected_raw_response.get('constraints')
- if constraints:
- self._constraints = get_constraints(constraints,
- self._request_object.attribute_type.Type,
- context.variable_storage,
- context.config_values)
-
- def run_action(self, dev_ctrl: ChipDeviceCtrl, endpoint: int, node_id: int):
- try:
- resp = asyncio.run(dev_ctrl.ReadAttribute(node_id, [(self._request_object)]))
- except chip.interaction_model.InteractionModelError:
- if self._expected_raw_response is None:
- raise
-
- expected_status_code = self._expected_raw_response.get('error')
- if expected_status_code is not None and expected_status_code != _SUCCESS_STATUS_CODE:
- logger.debug('Got error response, but was expected')
- else:
- raise
-
- if self._possibly_unsupported and not resp:
- # We have found an unsupported attribute. Parsed test did specify that it might be
- # unsupported, so nothing left to validate.
- return
-
- # TODO Currently there are no checks that this indexing won't fail. Need to add some
- # initial validity checks. Coming soon an a future PR.
- parsed_resp = resp[endpoint][self._cluster_object][self._request_object]
-
- if self._variable_to_save is not None:
- self._variable_to_save.save_response(parsed_resp)
-
- if not all([constraint.is_met(parsed_resp) for constraint in self._constraints]):
- logger.error(f'Constraints check failed')
- # TODO how should we fail the test here?
-
- if self._expected_response is not None:
- self._expected_response.verify(parsed_resp)
-
-
-class WriteAttributeAction(BaseAction):
- '''Single write attribute action to be executed including validation.'''
-
- def __init__(self, item: dict, cluster: str, context: _ExecutionContext):
- '''Parse write attribute action from yaml test configuration.
-
- Args:
- 'item': Dictionary contains single write attribute action to be parsed.
- 'cluster': Name of cluster write attribute action is targeting.
- 'context': Contains test-wide common objects such as DataModelLookup instance, storage
- for device responses and top level test configurations variable.
- Raises:
- ParsingError: Raised if there is a benign error, and there is currently no
- action to perform for this write attribute.
- UnexpectedParsingError: Raised if there is an unexpected parsing error.
- '''
- super().__init__(item['label'])
- self._attribute_name = stringcase.pascalcase(item['attribute'])
- self._cluster = cluster
- self._request_object = None
-
- attribute = context.data_model_lookup.get_attribute(
- self._cluster, self._attribute_name)
- if attribute is None:
- raise ParsingError(
- f'WriteAttribute failed to find cluster:{self._cluster} '
- f'Attribute:{self._attribute_name}')
-
- if (item.get('arguments')):
- args = item['arguments']['value']
- try:
- request_data = Converter.parse_and_convert_yaml_value(
- args, attribute.attribute_type.Type, context.config_values)
- except ValueError:
- raise ParsingError('Could not covert yaml type')
-
- # Create a cluster object for the request from the provided YAML data.
- self._request_object = attribute(request_data)
- else:
- raise UnexpectedParsingError(f'WriteAttribute action does have arguments {self.label}')
-
- def run_action(self, dev_ctrl: ChipDeviceCtrl, endpoint: int, node_id: int):
- try:
- resp = asyncio.run(
- dev_ctrl.WriteAttribute(node_id, [(endpoint, self._request_object)]))
- except chip.interaction_model.InteractionModelError:
- if (self.expected_raw_response is not None and
- self.expected_raw_response.get('error')):
- logger.debug('Got error, but was expected')
- else:
- raise
-
- # TODO: confirm resp give a Success value, although not all write action are expected
- # to succeed, hence why this is a todo and not simply just done. Below is example of
- # what success check might look like.
- # asserts.assert_equal(resp[0].Status, StatusEnum.Success, 'label write must succeed')
-
-
-class YamlTestParser:
- '''Parses the test YAMLs and converts to a more natural Pythonic representation.
-
- The parser also permits execution of those tests there-after.
- '''
-
- def __init__(self, yaml_path: str):
- '''Constructor that parser the given a path to YAML test file.'''
- with open(yaml_path, 'r') as stream:
- try:
- self._raw_data = yaml.safe_load(stream)
- except yaml.YAMLError as exc:
- raise exc
-
- if 'name' not in self._raw_data:
- raise UnexpectedParsingError("YAML expected to have 'name'")
- self._name = self._raw_data['name']
-
- if 'config' not in self._raw_data:
- raise UnexpectedParsingError("YAML expected to have 'config'")
- self._config = self._raw_data['config']
-
- self._config.setdefault('nodeId', _NODE_ID_DEFAULT)
- self._config.setdefault('endpoint', _ENDPOINT_DETAULT)
- self._config.setdefault('cluster', _CLUSTER_DEFAULT)
- # TODO timeout is currently not used
- self._config.setdefault('timeout', _TIMEOUT_DEFAULT)
-
- self._config['cluster'] = self._config['cluster'].replace(' ', '').replace('/', '')
- self._base_action_test_list = []
- self._context = _ExecutionContext(data_model_lookup=PreDefinedDataModelLookup(),
- variable_storage=VariableStorage(),
- config_values=self._config)
-
- for item in self._raw_data['tests']:
- # This currently behaves differently than the c++ version. We are evaluating if test
- # is disabled before anything else, allowing for incorrectly named commands.
- if item.get('disabled'):
- logger.info(f"Test is disabled, skipping {item['label']}")
- continue
-
- action = None
- cluster = self._config['cluster']
- # Some of the tests contain 'cluster over-rides' that refer to a different
- # cluster than that specified in 'config'.
- if (item.get('cluster')):
- cluster = item.get('cluster').replace(' ', '').replace('/', '')
- if item['command'] == 'writeAttribute':
- action = self._attribute_write_action_factory(item, cluster)
- elif item['command'] == 'readAttribute':
- action = self._attribute_read_action_factory(item, cluster)
- else:
- action = self._invoke_action_factory(item, cluster)
-
- if action is not None:
- self._base_action_test_list.append(action)
- else:
- logger.warn(f"Failed to parse {item['label']}")
-
- def _invoke_action_factory(self, item: dict, cluster: str):
- '''Parse cluster command from yaml test configuration.
-
- Args:
- 'item': Dictionary contains single cluster action test to be parsed
- 'cluster': Name of cluster action is targeting.
- Returns:
- InvokeAction if 'item' is a valid action to be executed.
- None if 'item' was not parsed for a known reason that is not fatal.
- '''
- try:
- return InvokeAction(item, cluster, self._context)
- except ParsingError:
- return None
-
- def _attribute_read_action_factory(self, item: dict, cluster: str):
- '''Parse read attribute action from yaml test configuration.
-
- Args:
- 'item': Dictionary contains single read attribute action to be parsed.
- 'cluster': Name of cluster read attribute action is targeting.
- Returns:
- ReadAttributeAction if 'item' is a valid action to be executed.
- None if 'item' was not parsed for a known reason that is not fatal.
- '''
- try:
- return ReadAttributeAction(item, cluster, self._context)
- except ParsingError:
- return None
-
- def _attribute_write_action_factory(self, item: dict, cluster: str):
- '''Parse write attribute action from yaml test configuration.
-
- Args:
- 'item': Dictionary contains single write attribute action to be parsed.
- 'cluster': Name of cluster write attribute action is targeting.
- Returns:
- WriteAttributeAction if 'item' is a valid action to be executed.
- None if 'item' was not parsed for a known reason that is not fatal.
- '''
- try:
- return WriteAttributeAction(item, cluster, self._context)
- except ParsingError:
- return None
-
- def execute_tests(self, dev_ctrl: ChipDeviceCtrl):
- '''Executes parsed YAML tests.'''
- self._context.variable_storage.clear()
- for idx, action in enumerate(self._base_action_test_list):
- logger.info(f'test: {idx} -- Executing{action.label}')
-
- action.run_action(dev_ctrl, self._config['endpoint'], self._config['nodeId'])
diff --git a/src/controller/python/chip/yaml/runner.py b/src/controller/python/chip/yaml/runner.py
new file mode 100644
index 0000000..f55d6cc
--- /dev/null
+++ b/src/controller/python/chip/yaml/runner.py
@@ -0,0 +1,386 @@
+#
+# Copyright (c) 2022 Project CHIP Authors
+# All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from abc import ABC, abstractmethod
+from dataclasses import dataclass
+from chip import ChipDeviceCtrl
+from enum import Enum
+import stringcase
+import chip.interaction_model
+import asyncio as asyncio
+import logging
+from chip.yaml.errors import ParsingError, UnexpectedParsingError
+from chip.clusters.Attribute import AttributeStatus, ValueDecodeFailure
+from .data_model_lookup import *
+import chip.yaml.format_converter as Converter
+
+logger = logging.getLogger('YamlParser')
+
+
+class _ActionStatus(Enum):
+ SUCCESS = 'success',
+ ERROR = 'error'
+
+
+@dataclass
+class _ActionResult:
+ status: _ActionStatus
+ response: object
+
+
+@dataclass
+class _ExecutionContext:
+ ''' Objects that is commonly passed around this file that are vital to test execution.'''
+ # Data model lookup to get python attribute, cluster, command object.
+ data_model_lookup: DataModelLookup = None
+
+
+class BaseAction(ABC):
+ '''Interface for a single YAML action that is to be executed.'''
+
+ def __init__(self, label):
+ self._label = label
+
+ @property
+ def label(self):
+ return self._label
+
+ @abstractmethod
+ def run_action(self, dev_ctrl: ChipDeviceCtrl) -> _ActionResult:
+ pass
+
+
+class InvokeAction(BaseAction):
+ '''Single invoke action to be executed.'''
+
+ def __init__(self, test_step, cluster: str, context: _ExecutionContext):
+ '''Converts 'test_step' to invoke command action that can execute with ChipDeviceCtrl.
+
+ Args:
+ 'test_step': Step containing information required to run invoke command action.
+ 'cluster': Name of cluster which to invoke action is targeting.
+ 'context': Contains test-wide common objects such as DataModelLookup instance.
+ Raises:
+ ParsingError: Raised if there is a benign error, and there is currently no
+ action to perform for this write attribute.
+ UnexpectedParsingError: Raised if there is an unexpected parsing error.
+ '''
+ super().__init__(test_step.label)
+ self._command_name = stringcase.pascalcase(test_step.command)
+ self._cluster = cluster
+ self._request_object = None
+ self._expected_response_object = None
+ self._endpoint = test_step.endpoint
+ self._node_id = test_step.node_id
+
+ command = context.data_model_lookup.get_command(self._cluster, self._command_name)
+
+ if command is None:
+ raise ParsingError(
+ f'Failed to find cluster:{self._cluster} Command:{self._command_name}')
+
+ command_object = command()
+ if (test_step.arguments):
+ args = test_step.arguments['values']
+ request_data_as_dict = Converter.convert_list_of_name_value_pair_to_dict(args)
+
+ try:
+ request_data = Converter.convert_to_data_model_type(
+ request_data_as_dict, type(command_object))
+ except ValueError:
+ # TODO after allowing out of bounds enums to be written this should be changed to
+ # UnexpectedParsingError.
+ raise ParsingError('Could not covert yaml type')
+
+ self._request_object = command_object.FromDict(request_data)
+ else:
+ self._request_object = command_object
+
+ def run_action(self, dev_ctrl: ChipDeviceCtrl) -> _ActionResult:
+ try:
+ resp = asyncio.run(dev_ctrl.SendCommand(self._node_id, self._endpoint,
+ self._request_object))
+ except chip.interaction_model.InteractionModelError as error:
+ return _ActionResult(status=_ActionStatus.ERROR, response=error)
+
+ # Commands with no response give a None response. In those cases we return a success
+ return _ActionResult(status=_ActionStatus.SUCCESS, response=resp)
+
+
+class ReadAttributeAction(BaseAction):
+ '''Single read attribute action to be executed.'''
+
+ def __init__(self, test_step, cluster: str, context: _ExecutionContext):
+ '''Converts 'test_step' to read attribute action that can execute with ChipDeviceCtrl.
+
+ Args:
+ 'test_step': Step containing information required to run read attribute action.
+ 'cluster': Name of cluster read attribute action is targeting.
+ 'context': Contains test-wide common objects such as DataModelLookup instance.
+ Raises:
+ ParsingError: Raised if there is a benign error, and there is currently no
+ action to perform for this read attribute.
+ UnexpectedParsingError: Raised if there is an unexpected parsing error.
+ '''
+ super().__init__(test_step.label)
+ self._attribute_name = stringcase.pascalcase(test_step.attribute)
+ self._cluster = cluster
+ self._endpoint = test_step.endpoint
+ self._node_id = test_step.node_id
+ self._cluster_object = None
+ self._request_object = None
+
+ self._possibly_unsupported = bool(test_step.optional)
+
+ self._cluster_object = context.data_model_lookup.get_cluster(self._cluster)
+ if self._cluster_object is None:
+ raise UnexpectedParsingError(
+ f'ReadAttribute failed to find cluster object:{self._cluster}')
+
+ self._request_object = context.data_model_lookup.get_attribute(
+ self._cluster, self._attribute_name)
+ if self._request_object is None:
+ raise ParsingError(
+ f'ReadAttribute failed to find cluster:{self._cluster} '
+ f'Attribute:{self._attribute_name}')
+
+ if test_step.arguments:
+ raise UnexpectedParsingError(
+ f'ReadAttribute should not contain arguments. {self.label}')
+
+ if self._request_object.attribute_type is None:
+ raise UnexpectedParsingError(
+ f'ReadAttribute doesnt have valid attribute_type. {self.label}')
+
+ def run_action(self, dev_ctrl: ChipDeviceCtrl) -> _ActionResult:
+ try:
+ raw_resp = asyncio.run(dev_ctrl.ReadAttribute(self._node_id,
+ [(self._endpoint, self._request_object)]))
+ except chip.interaction_model.InteractionModelError as error:
+ return _ActionResult(status=_ActionStatus.ERROR, response=error)
+
+ if self._possibly_unsupported and not raw_resp:
+ # We have found an unsupported attribute. TestStep provided did specify that it might be
+ # unsupported, so nothing left to validate. We just return a failure here.
+ return _ActionResult(status=_ActionStatus.ERROR, response=None)
+
+ # TODO Currently there are no checks that this indexing won't fail. Need to add some
+ # initial validity checks. Coming soon in a future PR.
+ resp = raw_resp[self._endpoint][self._cluster_object][self._request_object]
+
+ if isinstance(resp, ValueDecodeFailure):
+ # response.Reason is of type chip.interaction_model.Status.
+ return _ActionResult(status=_ActionStatus.ERROR, response=resp.Reason)
+
+ # decode() is expecting to get a DataModelLookup Object type to grab certain attributes
+ # like cluster id.
+ return_val = self._request_object(resp)
+ return _ActionResult(status=_ActionStatus.SUCCESS, response=return_val)
+
+
+class WriteAttributeAction(BaseAction):
+ '''Single write attribute action to be executed.'''
+
+ def __init__(self, test_step, cluster: str, context: _ExecutionContext):
+ '''Converts 'test_step' to write attribute action that can execute with ChipDeviceCtrl.
+
+ Args:
+ 'test_step': Step containing information required to run write attribute action.
+ 'cluster': Name of cluster write attribute action is targeting.
+ 'context': Contains test-wide common objects such as DataModelLookup instance.
+ Raises:
+ ParsingError: Raised if there is a benign error, and there is currently no
+ action to perform for this write attribute.
+ UnexpectedParsingError: Raised if there is an unexpected parsing error.
+ '''
+ super().__init__(test_step.label)
+ self._attribute_name = stringcase.pascalcase(test_step.attribute)
+ self._cluster = cluster
+ self._endpoint = test_step.endpoint
+ self._node_id = test_step.node_id
+ self._request_object = None
+
+ attribute = context.data_model_lookup.get_attribute(
+ self._cluster, self._attribute_name)
+ if attribute is None:
+ raise ParsingError(
+ f'WriteAttribute failed to find cluster:{self._cluster} '
+ f'Attribute:{self._attribute_name}')
+
+ if not test_step.arguments:
+ raise UnexpectedParsingError(f'WriteAttribute action does have arguments {self.label}')
+
+ args = test_step.arguments['values']
+ if len(args) != 1:
+ raise UnexpectedParsingError(f'WriteAttribute is trying to write multiple values')
+ request_data_as_dict = args[0]
+ try:
+ # TODO this is an ugly hack
+ request_data = Converter.convert_to_data_model_type(
+ request_data_as_dict['value'], attribute.attribute_type.Type)
+ except ValueError:
+ raise ParsingError('Could not covert yaml type')
+
+ # Create a cluster object for the request from the provided YAML data.
+ self._request_object = attribute(request_data)
+
+ def run_action(self, dev_ctrl: ChipDeviceCtrl) -> _ActionResult:
+ try:
+ resp = asyncio.run(
+ dev_ctrl.WriteAttribute(self._node_id, [(self._endpoint, self._request_object)]))
+ except chip.interaction_model.InteractionModelError:
+ # TODO Should we be doing the same thing as InvokeAction on InteractionModelError?
+ raise
+ if len(resp) == 1 and isinstance(resp[0], AttributeStatus):
+ if resp[0].Status == chip.interaction_model.Status.Success:
+ return _ActionResult(status=_ActionStatus.SUCCESS, response=None)
+ else:
+ return _ActionResult(status=_ActionStatus.ERROR, response=resp[0].Status)
+
+ # We always expecte the response to be a list of length 1, for that reason we return error
+ # here.
+ return _ActionResult(status=_ActionStatus.ERROR, response=None)
+
+
+class ReplTestRunner:
+ '''Test runner to encode/decode values from YAML test Parser for executing the TestStep.
+
+ Uses ChipDeviceCtrl from chip-repl to execute parsed YAML TestSteps.
+ '''
+
+ def __init__(self, test_spec_definition, dev_ctrl):
+ self._test_spec_definition = test_spec_definition
+ self._dev_ctrl = dev_ctrl
+ self._context = _ExecutionContext(data_model_lookup=PreDefinedDataModelLookup())
+
+ def _invoke_action_factory(self, test_step, cluster: str):
+ '''Creates cluster invoke action command from TestStep.
+
+ Args:
+ 'test_step': Step containing information required to run an invoke command action.
+ 'cluster': Name of cluster action is targeting.
+ Returns:
+ InvokeAction if 'test_step' is a valid action to be executed.
+ None if we were unable to use the provided 'test_step' for a known reason that is not
+ fatal to test execution.
+ '''
+ try:
+ return InvokeAction(test_step, cluster, self._context)
+ except ParsingError:
+ return None
+
+ def _attribute_read_action_factory(self, test_step, cluster: str):
+ '''Creates read attribute command TestStep.
+
+ Args:
+ 'test_step': Step containing information required to run read attribute action.
+ 'cluster': Name of cluster read attribute action is targeting.
+ Returns:
+ ReadAttributeAction if 'test_step' is a valid read attribute to be executed.
+ None if we were unable to use the provided 'test_step' for a known reason that is not
+ fatal to test execution.
+ '''
+ try:
+ return ReadAttributeAction(test_step, cluster, self._context)
+ except ParsingError:
+ return None
+
+ def _attribute_write_action_factory(self, test_step, cluster: str):
+ '''Creates write attribute command TestStep.
+
+ Args:
+ 'test_step': Step containing information required to run write attribute action.
+ 'cluster': Name of cluster write attribute action is targeting.
+ Returns:
+ WriteAttributeAction if 'test_step' is a valid write attribute to be executed.
+ None if we were unable to use the provided 'test_step' for a known reason that is not
+ fatal to test execution.
+ '''
+ try:
+ return WriteAttributeAction(test_step, cluster, self._context)
+ except ParsingError:
+ return None
+
+ def encode(self, request) -> BaseAction:
+ action = None
+ cluster = request.cluster.replace(' ', '').replace('/', '')
+ command = request.command
+ # Some of the tests contain 'cluster over-rides' that refer to a different
+ # cluster than that specified in 'config'.
+ if command == 'writeAttribute':
+ action = self._attribute_write_action_factory(request, cluster)
+ elif command == 'readAttribute':
+ action = self._attribute_read_action_factory(request, cluster)
+ elif command == 'readEvent':
+ action = self._event_read_action_factory(request, cluster)
+ else:
+ action = self._invoke_action_factory(request, cluster)
+
+ if action is None:
+ logger.warn(f"Failed to parse {request.label}")
+ return action
+
+ def decode(self, result: _ActionResult):
+ # If this is a generic response, there is nothing to do.
+ if result.response is None:
+ # TODO Once yamltest and idl python packages are properly packaged as a single module
+ # the type we are returning will be formalized. For now TestStep.post_process_response
+ # expects this particular case to be sent as a string.
+ return 'success' if result.status == _ActionStatus.SUCCESS else 'failure'
+
+ response = result.response
+
+ decoded_response = {}
+ if isinstance(response, chip.interaction_model.InteractionModelError):
+ decoded_response['error'] = stringcase.snakecase(response.status.name).upper()
+ return decoded_response
+
+ if isinstance(response, chip.interaction_model.Status):
+ decoded_response['error'] = stringcase.snakecase(response.name).upper()
+ return decoded_response
+
+ cluster_name = self._test_spec_definition.get_cluster_name(response.cluster_id)
+ decoded_response['clusterId'] = cluster_name
+
+ if hasattr(response, 'command_id'):
+ decoded_response['command'] = self._test_spec_definition.get_response_name(
+ response.cluster_id, response.command_id)
+ response_definition = self._test_spec_definition.get_response_by_name(
+ cluster_name, decoded_response['command'])
+ decoded_response['value'] = Converter.from_data_model_to_test_definition(
+ self._test_spec_definition, cluster_name, response_definition.fields, response)
+
+ if hasattr(response, 'attribute_id'):
+ decoded_response['attribute'] = self._test_spec_definition.get_attribute_name(
+ response.cluster_id, response.attribute_id)
+ attribute = self._test_spec_definition.get_attribute_by_name(
+ cluster_name, decoded_response['attribute'])
+ # TODO Once we fix the issue of not being able to find the global attribute properly
+ # we should be able to remove this if/else statement below.
+ if attribute is None:
+ # When we cannot find the attribute it is because it is a global attribute like
+ # FeatureMap. Fortunately for these types we can get away with using
+ # 'response.value' directly for the time being.
+ decoded_response['value'] = response.value
+ else:
+ decoded_response['value'] = Converter.from_data_model_to_test_definition(
+ self._test_spec_definition, cluster_name, attribute.definition, response.value)
+
+ return decoded_response
+
+ def execute(self, action: BaseAction):
+ return action.run_action(self._dev_ctrl)
diff --git a/src/controller/python/chip/yaml/variable_storage.py b/src/controller/python/chip/yaml/variable_storage.py
deleted file mode 100644
index d62a7dd..0000000
--- a/src/controller/python/chip/yaml/variable_storage.py
+++ /dev/null
@@ -1,37 +0,0 @@
-#
-# Copyright (c) 2022 Project CHIP Authors
-# All rights reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-class VariableStorage:
- '''Stores key value pairs.
-
- This is a code readability convience object for saving/loading values.
- '''
-
- def __init__(self):
- self._saved_list = {}
-
- def save(self, key, value):
- self._saved_list[key] = value
-
- def load(self, key):
- return self._saved_list.get(key)
-
- def is_key_saved(self, key) -> bool:
- return key in self._saved_list
-
- def clear(self):
- self._saved_list.clear()
diff --git a/src/controller/python/test/unit_tests/test_yaml_format_converter.py b/src/controller/python/test/unit_tests/test_yaml_format_converter.py
deleted file mode 100644
index 05f58d2..0000000
--- a/src/controller/python/test/unit_tests/test_yaml_format_converter.py
+++ /dev/null
@@ -1,121 +0,0 @@
-#
-# Copyright (c) 2022 Project CHIP Authors
-# All rights reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from chip.yaml.format_converter import convert_yaml_octet_string_to_bytes, substitute_in_config_variables
-from binascii import unhexlify
-import unittest
-
-
-class TestOctetStringYamlDecode(unittest.TestCase):
- def test_common_cases(self):
- self.assertEqual(convert_yaml_octet_string_to_bytes("hex:aa55"), unhexlify("aa55"))
- self.assertEqual(convert_yaml_octet_string_to_bytes("hex:"), unhexlify(""))
- self.assertEqual(convert_yaml_octet_string_to_bytes("hex:AA55"), unhexlify("aa55"))
-
- self.assertEqual(convert_yaml_octet_string_to_bytes("0\xaa\x55"), unhexlify("30aa55"))
- self.assertEqual(convert_yaml_octet_string_to_bytes("0\xAA\x55"), unhexlify("30aa55"))
- self.assertEqual(convert_yaml_octet_string_to_bytes("0\xAa\x55"), unhexlify("30aa55"))
-
- self.assertEqual(convert_yaml_octet_string_to_bytes("0hex:"), b"0hex:")
- self.assertEqual(convert_yaml_octet_string_to_bytes("0hex:A"), b"0hex:A")
- self.assertEqual(convert_yaml_octet_string_to_bytes("0hex:AA55"), b"0hex:AA55")
-
- self.assertEqual(convert_yaml_octet_string_to_bytes("AA55"), b"AA55")
- self.assertEqual(convert_yaml_octet_string_to_bytes("AA\n\r\t55"), unhexlify("41410a0d093535"))
- # TODO(#23669): After utf8 is properly supported expected result is unhexlify("c3a9c3a90a0a")
- self.assertEqual(convert_yaml_octet_string_to_bytes("\xC3\xA9é\n\n"), unhexlify("c3a9e90a0a"))
-
- # Partial hex nibble
- with self.assertRaises(ValueError):
- convert_yaml_octet_string_to_bytes("hex:aa5")
-
-
-class TestSubstitueInConfigVariables(unittest.TestCase):
-
- def setUp(self):
- self.common_config = {
- 'arg1': {
- 'defaultValue': 1
- },
- 'arg2': {
- 'defaultValue': 2
- },
- 'no_explicit_default': 3
- }
-
- def test_basic_substitution(self):
- self.assertEqual(substitute_in_config_variables('arg1', self.common_config), 1)
- self.assertEqual(substitute_in_config_variables('arg2', self.common_config), 2)
- self.assertEqual(substitute_in_config_variables('arg3', self.common_config), 'arg3')
- self.assertEqual(substitute_in_config_variables('no_explicit_default', self.common_config), 3)
-
- def test_basis_dict_substitution(self):
- basic_dict = {
- 'arg1': 'arg1',
- 'arg2': 'arg2',
- 'arg3': 'arg3',
- 'no_explicit_default': 'no_explicit_default',
- }
- expected_dict = {
- 'arg1': 1,
- 'arg2': 2,
- 'arg3': 'arg3',
- 'no_explicit_default': 3,
- }
- self.assertEqual(substitute_in_config_variables(basic_dict, self.common_config), expected_dict)
-
- def test_basis_list_substitution(self):
- basic_list = ['arg1', 'arg2', 'arg3', 'no_explicit_default']
- expected_list = [1, 2, 'arg3', 3]
- self.assertEqual(substitute_in_config_variables(basic_list, self.common_config), expected_list)
-
- def test_complex_nested_type(self):
- complex_nested_type = {
- 'arg1': ['arg1', 'arg2', 'arg3', 'no_explicit_default'],
- 'arg2': 'arg22',
- 'arg3': {
- 'no_explicit_default': 'no_explicit_default',
- 'arg2': 'arg2',
- 'another_dict': {
- 'arg1': ['arg1', 'arg1', 'arg1', 'no_explicit_default'],
- },
- 'another_list': ['arg1', 'arg2', 'arg3', 'no_explicit_default']
- },
- 'no_explicit_default': 'no_explicit_default',
- }
- expected_result = {
- 'arg1': [1, 2, 'arg3', 3],
- 'arg2': 'arg22',
- 'arg3': {
- 'no_explicit_default': 3,
- 'arg2': 2,
- 'another_dict': {
- 'arg1': [1, 1, 1, 3],
- },
- 'another_list': [1, 2, 'arg3', 3]
- },
- 'no_explicit_default': 3,
- }
- self.assertEqual(substitute_in_config_variables(complex_nested_type, self.common_config), expected_result)
-
-
-def main():
- unittest.main()
-
-
-if __name__ == "__main__":
- main()