blob: 8fc5cd55ec66cdc3391a9e2b8fb1138893c56e2e [file] [log] [blame]
# Copyright (c) 2022 Project CHIP Authors
# All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from chip import ChipDeviceCtrl
from chip.clusters.Types import NullValue
from chip.tlv import float32
import yaml
import stringcase
import chip.interaction_model
import asyncio as asyncio
import logging
import math
from chip.yaml.errors import ParsingError, UnexpectedParsingError
from .data_model_lookup import *
import chip.yaml.format_converter as Converter
from .variable_storage import VariableStorage
_NODE_ID_DEFAULT = 0x12345
_ENDPOINT_DETAULT = '' # TODO why is this an empty string
logger = logging.getLogger('YamlParser')
class _ExecutionContext:
''' Objects that is commonly passed around this file that are vital to test execution.'''
# Data model lookup to get python attribute, cluster, command object.
data_model_lookup: DataModelLookup = None
# Where various test action response are stored and loaded from.
variable_storage: VariableStorage = None
# Top level configuration values for a yaml test.
config_values: dict = None
class _ConstraintValue:
'''Constraints that are numeric primitive data types'''
def __init__(self, value, field_type, context: _ExecutionContext):
self._variable_storage = context.variable_storage
# When not none _indirect_value_key is binding a name to the constraint value, and the
# actual value can only be looked-up dynamically, which is why this is a key name.
self._indirect_value_key = None
self._value = None
if value is None:
# Default values set above is all we need here.
if isinstance(value, str) and self._variable_storage.is_key_saved(value):
self._indirect_value_key = value
self._value = Converter.convert_yaml_type(
value, field_type)
def get_value(self):
'''Gets the current value of the constraint.
This method accounts for getting the runtime saved value from DUT previous responses.
if self._indirect_value_key:
return self._variable_storage.load(self._indirect_value_key)
return self._value
class _Constraints:
def __init__(self, constraints: dict, field_type, context: _ExecutionContext):
self._variable_storage = context.variable_storage
self._has_value = constraints.get('hasValue')
self._type = constraints.get('type')
self._starts_with = constraints.get('startsWith')
self._ends_with = constraints.get('endsWith')
self._is_upper_case = constraints.get('isUpperCase')
self._is_lower_case = constraints.get('isLowerCase')
self._min_value = _ConstraintValue(constraints.get('minValue'), field_type,
self._max_value = _ConstraintValue(constraints.get('maxValue'), field_type,
self._contains = constraints.get('contains')
self._excludes = constraints.get('excludes')
self._has_masks_set = constraints.get('hasMasksSet')
self._has_masks_clear = constraints.get('hasMasksClear')
self._not_value = _ConstraintValue(constraints.get('notValue'), field_type,
def are_constrains_met(self, response) -> bool:
return_value = True
if self._has_value:
logger.warn(f'HasValue constraint currently not implemented, forcing failure')
return_value = False
if self._type:
logger.warn(f'Type constraint currently not implemented, forcing failure')
return_value = False
if self._starts_with and not response.startswith(self._starts_with):
return_value = False
if self._ends_with and not response.endswith(self._ends_with):
return_value = False
if self._is_upper_case and not response.isupper():
return_value = False
if self._is_lower_case and not response.islower():
return_value = False
min_value = self._min_value.get_value()
if response is not NullValue and min_value and response < min_value:
return_value = False
max_value = self._max_value.get_value()
if response is not NullValue and max_value and response > max_value:
return_value = False
if self._contains and not set(self._contains).issubset(response):
return_value = False
if self._excludes and not set(self._excludes).isdisjoint(response):
return_value = False
if self._has_masks_set:
for mask in self._has_masks_set:
if (response & mask) != mask:
return_value = False
if self._has_masks_clear:
for mask in self._has_masks_clear:
if (response & mask) != 0:
return_value = False
not_value = self._not_value.get_value()
if not_value and response == not_value:
return_value = False
return return_value
class _VariableToSave:
def __init__(self, variable_name: str, variable_storage: VariableStorage):
self._variable_name = variable_name
self._variable_storage = variable_storage, None)
def save_response(self, value):, value)
class _ExpectedResponse:
def __init__(self, value, response_type, context: _ExecutionContext):
self._load_expected_response_in_verify = None
self._expected_response_type = response_type
self._expected_response = None
self._variable_storage = context.variable_storage
if isinstance(value, str) and self._variable_storage.is_key_saved(value):
self._load_expected_response_in_verify = value
self._expected_response = Converter.convert_yaml_type(
value, response_type, use_from_dict=True)
def verify(self, response):
if (self._expected_response_type is None):
return True
if self._load_expected_response_in_verify is not None:
self._expected_response = self._variable_storage.load(
if isinstance(self._expected_response_type, float32):
if not math.isclose(self._expected_response, response, rel_tol=1e-6):
logger.error(f"Expected response {self._expected_response} didn't match "
f"actual object {response}")
return False
if (self._expected_response != response):
logger.error(f"Expected response {self._expected_response} didn't match "
f"actual object {response}")
return False
return True
class BaseAction(ABC):
'''Interface for a single yaml action that is to be executed.'''
def __init__(self, label):
self._label = label
def label(self):
return self._label
def run_action(self, dev_ctrl: ChipDeviceCtrl, endpoint: int, node_id: int):
class InvokeAction(BaseAction):
'''Single invoke action to be executed including validation of response.'''
def __init__(self, item: dict, cluster: str, context: _ExecutionContext):
'''Parse cluster invoke from yaml test configuration.
'item': Dictionary containing single invoke to be parsed.
'cluster': Name of cluster which to invoke action is targeting.
'context': Contains test-wide common objects such as DataModelLookup instance, storage
for device responses and top level test configurations variable.
ParsingError: Raised if there is a benign error, and there is currently no
action to perform for this write attribute.
UnexpectedParsingError: Raised if there is an unexpected parsing error.
self._command_name = stringcase.pascalcase(item['command'])
self._cluster = cluster
self._request_object = None
self._expected_raw_response: dict = field(default_factory=dict)
self._expected_response_object = None
command = context.data_model_lookup.get_command(
self._cluster, self._command_name)
if command is None:
raise ParsingError(
f'Failed to find cluster:{self._cluster} Command:{self._command_name}')
command_object = command()
if (item.get('arguments')):
args = item['arguments']['values']
request_data_as_dict = Converter.convert_name_value_pair_to_dict(args)
request_data = Converter.convert_yaml_type(
request_data_as_dict, type(command_object))
except ValueError:
raise ParsingError('Could not covert yaml type')
# Create a cluster object for the request from the provided YAML data.
self._request_object = command_object.FromDict(request_data)
self._request_object = command_object
self._expected_raw_response = item.get('response')
if (self._request_object.response_type is not None and
self._expected_raw_response is not None and
response_type = stringcase.pascalcase(self._request_object.response_type)
expected_command = context.data_model_lookup.get_command(self._cluster,
expected_response_args = self._expected_raw_response['values']
expected_response_data_as_dict = Converter.convert_name_value_pair_to_dict(
expected_response_data = Converter.convert_yaml_type(
expected_response_data_as_dict, expected_command)
self._expected_response_object = expected_command.FromDict(expected_response_data)
def run_action(self, dev_ctrl: ChipDeviceCtrl, endpoint: int, node_id: int):
resp =, endpoint, self._request_object))
except chip.interaction_model.InteractionModelError:
if self._expected_raw_response is None:
expected_status_code = self._expected_raw_response.get('error')
if expected_status_code is not None and expected_status_code != _SUCCESS_STATUS_CODE:
logger.debug('Got error response, but was expected')
if (self._expected_response_object is not None):
if (self._expected_response_object != resp):
logger.error(f'Expected response {self._expected_response_object} did not match '
f'actual object {resp}')
class ReadAttributeAction(BaseAction):
'''Single read attribute action to be executed including validation.'''
def __init__(self, item: dict, cluster: str, context: _ExecutionContext):
'''Parse read attribute action from yaml test configuration.
'item': Dictionary contains single read attribute action to be parsed.
'cluster': Name of cluster read attribute action is targeting.
'context': Contains test-wide common objects such as DataModelLookup instance, storage
for device responses and top level test configurations variable.
ParsingError: Raised if there is a benign error, and there is currently no
action to perform for this read attribute.
UnexpectedParsingError: Raised if there is an unexpected parsing error.
self._attribute_name = stringcase.pascalcase(item['attribute'])
self._constraints = None
self._cluster = cluster
self._cluster_object = None
self._request_object = None
self._expected_raw_response: dict = field(default_factory=dict)
self._expected_response: _ExpectedResponse = None
self._possibly_unsupported = False
self._variable_to_save = None
self._cluster_object = context.data_model_lookup.get_cluster(self._cluster)
if self._cluster_object is None:
raise UnexpectedParsingError(
f'ReadAttribute failed to find cluster object:{self._cluster}')
self._request_object = context.data_model_lookup.get_attribute(
self._cluster, self._attribute_name)
if self._request_object is None:
raise ParsingError(
f'ReadAttribute failed to find cluster:{self._cluster} '
if (item.get('arguments')):
raise UnexpectedParsingError(
f'ReadAttribute should not contain arguments. {self.label}')
if self._request_object.attribute_type is None:
raise UnexpectedParsingError(
f'ReadAttribute doesnt have valid attribute_type. {self.label}')
if 'optional' in item:
self._possibly_unsupported = True
self._expected_raw_response = item.get('response')
if (self._expected_raw_response is None):
# TODO actually if response is missing it typically means that we need to confirm
# that we got a successful response. This will be implemented later to consider all
# possible corner cases around that (if there are corner cases).
raise UnexpectedParsingError(f'ReadAttribute missing expected response. {self.label}')
variable_name = self._expected_raw_response.get('saveAs')
if variable_name:
self._variable_to_save = _VariableToSave(variable_name, context.variable_storage)
if 'value' in self._expected_raw_response:
expected_response_value = self._expected_raw_response['value']
self._expected_response = _ExpectedResponse(expected_response_value,
constraints = self._expected_raw_response.get('constraints')
if constraints:
self._constraints = _Constraints(constraints,
def run_action(self, dev_ctrl: ChipDeviceCtrl, endpoint: int, node_id: int):
resp =, [(self._request_object)]))
except chip.interaction_model.InteractionModelError:
if self._expected_raw_response is None:
expected_status_code = self._expected_raw_response.get('error')
if expected_status_code is not None and expected_status_code != _SUCCESS_STATUS_CODE:
logger.debug('Got error response, but was expected')
if self._possibly_unsupported and not resp:
# We have found an unsupported attribute. Parsed test did specify that it might be
# unsupported, so nothing left to validate.
# TODO Currently there are no checks that this indexing won't fail. Need to add some
# initial validity checks. Coming soon an a future PR.
parsed_resp = resp[endpoint][self._cluster_object][self._request_object]
if self._variable_to_save is not None:
if self._constraints and not self._constraints.are_constrains_met(parsed_resp):
logger.error(f'Constraints check failed')
# TODO how should we fail the test here?
if self._expected_response is not None:
class WriteAttributeAction(BaseAction):
'''Single write attribute action to be executed including validation.'''
def __init__(self, item: dict, cluster: str, context: _ExecutionContext):
'''Parse write attribute action from yaml test configuration.
'item': Dictionary contains single write attribute action to be parsed.
'cluster': Name of cluster write attribute action is targeting.
'context': Contains test-wide common objects such as DataModelLookup instance, storage
for device responses and top level test configurations variable.
ParsingError: Raised if there is a benign error, and there is currently no
action to perform for this write attribute.
UnexpectedParsingError: Raised if there is an unexpected parsing error.
self._attribute_name = stringcase.pascalcase(item['attribute'])
self._cluster = cluster
self._request_object = None
attribute = context.data_model_lookup.get_attribute(
self._cluster, self._attribute_name)
if attribute is None:
raise ParsingError(
f'WriteAttribute failed to find cluster:{self._cluster} '
if (item.get('arguments')):
args = item['arguments']['value']
request_data = Converter.convert_yaml_type(
args, attribute.attribute_type.Type)
except ValueError:
raise ParsingError('Could not covert yaml type')
# Create a cluster object for the request from the provided YAML data.
self._request_object = attribute(request_data)
raise UnexpectedParsingError(f'WriteAttribute action does have arguments {self.label}')
def run_action(self, dev_ctrl: ChipDeviceCtrl, endpoint: int, node_id: int):
resp =
dev_ctrl.WriteAttribute(node_id, [(endpoint, self._request_object)]))
except chip.interaction_model.InteractionModelError:
if (self.expected_raw_response is not None and
logger.debug('Got error, but was expected')
# TODO: confirm resp give a Success value, although not all write action are expected
# to succeed, hence why this is a todo and not simply just done. Below is example of
# what success check might look like.
# asserts.assert_equal(resp[0].Status, StatusEnum.Success, 'label write must succeed')
class YamlTestParser:
'''Parses the test YAMLs and converts to a more natural Pythonic representation.
The parser also permits execution of those tests there-after.
def __init__(self, yaml_path: str):
'''Constructor that parser the given a path to YAML test file.'''
with open(yaml_path, 'r') as stream:
self._raw_data = yaml.safe_load(stream)
except yaml.YAMLError as exc:
raise exc
if 'name' not in self._raw_data:
raise UnexpectedParsingError("YAML expected to have 'name'")
self._name = self._raw_data['name']
if 'config' not in self._raw_data:
raise UnexpectedParsingError("YAML expected to have 'config'")
self._config = self._raw_data['config']
self._config.setdefault('nodeId', _NODE_ID_DEFAULT)
self._config.setdefault('endpoint', _ENDPOINT_DETAULT)
self._config.setdefault('cluster', _CLUSTER_DEFAULT)
# TODO timeout is currently not used
self._config.setdefault('timeout', _TIMEOUT_DEFAULT)
self._config['cluster'] = self._config['cluster'].replace(' ', '').replace('/', '')
self._base_action_test_list = []
self._context = _ExecutionContext(data_model_lookup=PreDefinedDataModelLookup(),
for item in self._raw_data['tests']:
# This currently behaves differently than the c++ version. We are evaluating if test
# is disabled before anything else, allowing for incorrectly named commands.
if item.get('disabled'):"Test is disabled, skipping {item['label']}")
action = None
cluster = self._config['cluster']
# Some of the tests contain 'cluster over-rides' that refer to a different
# cluster than that specified in 'config'.
if (item.get('cluster')):
cluster = item.get('cluster').replace(' ', '').replace('/', '')
if item['command'] == 'writeAttribute':
action = self._attribute_write_action_factory(item, cluster)
elif item['command'] == 'readAttribute':
action = self._attribute_read_action_factory(item, cluster)
action = self._invoke_action_factory(item, cluster)
if action is not None:
logger.warn(f"Failed to parse {item['label']}")
def _invoke_action_factory(self, item: dict, cluster: str):
'''Parse cluster command from yaml test configuration.
'item': Dictionary contains single cluster action test to be parsed
'cluster': Name of cluster action is targeting.
InvokeAction if 'item' is a valid action to be executed.
None if 'item' was not parsed for a known reason that is not fatal.
return InvokeAction(item, cluster, self._context)
except ParsingError:
return None
def _attribute_read_action_factory(self, item: dict, cluster: str):
'''Parse read attribute action from yaml test configuration.
'item': Dictionary contains single read attribute action to be parsed.
'cluster': Name of cluster read attribute action is targeting.
ReadAttributeAction if 'item' is a valid action to be executed.
None if 'item' was not parsed for a known reason that is not fatal.
return ReadAttributeAction(item, cluster, self._context)
except ParsingError:
return None
def _attribute_write_action_factory(self, item: dict, cluster: str):
'''Parse write attribute action from yaml test configuration.
'item': Dictionary contains single write attribute action to be parsed.
'cluster': Name of cluster write attribute action is targeting.
WriteAttributeAction if 'item' is a valid action to be executed.
None if 'item' was not parsed for a known reason that is not fatal.
return WriteAttributeAction(item, cluster, self._context)
except ParsingError:
return None
def execute_tests(self, dev_ctrl: ChipDeviceCtrl):
'''Executes parsed YAML tests.'''
for idx, action in enumerate(self._base_action_test_list):'test: {idx} -- Executing{action.label}')
action.run_action(dev_ctrl, self._config['endpoint'], self._config['nodeId'])