blob: 7d8558df89e18598e96e233ac374555b441962f0 [file] [log] [blame]
#
# Copyright (c) 2022 Project CHIP Authors
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import logging
import re
from dataclasses import dataclass, field
from enum import Enum, auto
from typing import Optional
from . import fixes
from .constraints import get_constraints, is_typed_constraint
from .definitions import SpecDefinitions
from .errors import (TestStepEnumError, TestStepEnumSpecifierNotUnknownError, TestStepEnumSpecifierWrongError, TestStepError,
TestStepKeyError, TestStepValueNameError)
from .pics_checker import PICSChecker
from .yaml_loader import YamlLoader
ANY_COMMANDS_CLUSTER_NAME = 'AnyCommands'
ANY_COMMANDS_LIST = [
'CommandById',
'ReadById',
'WriteById',
'SubscribeById',
'ReadEventById',
'SubscribeEventById',
'ReadAll',
'SubscribeAll',
]
# If True, enum values should use a valid name instead of a raw value
STRICT_ENUM_VALUE_CHECK = False
class UnknownPathQualifierError(TestStepError):
"""Raise when an attribute/command/event name is not found in the definitions."""
def __init__(self, content, target_type, target_name, candidate_names=[]):
if candidate_names:
message = f'Unknown {target_type}: "{target_name}". Candidates are: "{candidate_names}"'
for candidate_name in candidate_names:
if candidate_name.lower() == target_name.lower():
message = f'Unknown {target_type}: "{target_name}". Did you mean "{candidate_name}" ?'
break
else:
message = f'The cluster does not have any {target_type}s.'
super().__init__(message)
self.tag_key_with_error(content, target_type)
class TestStepAttributeKeyError(UnknownPathQualifierError):
"""Raise when an attribute name is not found in the definitions."""
def __init__(self, content, target_name, candidate_names=[]):
super().__init__(content, 'attribute', target_name, candidate_names)
class TestStepCommandKeyError(UnknownPathQualifierError):
"""Raise when a command name is not found in the definitions."""
def __init__(self, content, target_name, candidate_names=[]):
super().__init__(content, 'command', target_name, candidate_names)
class TestStepEventKeyError(UnknownPathQualifierError):
"""Raise when an event name is not found in the definitions."""
def __init__(self, content, target_name, candidate_names=[]):
super().__init__(content, 'event', target_name, candidate_names)
class PostProcessCheckStatus(Enum):
'''Indicates the post processing check step status.'''
SUCCESS = 'success',
WARNING = 'warning',
ERROR = 'error'
class PostProcessCheckType(Enum):
'''Indicates the post processing check step type.'''
IM_STATUS = auto()
CLUSTER_STATUS = auto()
RESPONSE_VALIDATION = auto()
CONSTRAINT_VALIDATION = auto()
SAVE_AS_VARIABLE = auto()
WAIT_VALIDATION = auto()
class PostProcessCheck:
'''Information about a single post processing operation that was performed.
Each check has a helpful message, indicating what the post processing operation did and whether
it was successful or not.
'''
def __init__(self, state: PostProcessCheckStatus, category: PostProcessCheckType, message: str, exception=None):
self.state = state
self.category = category
self.message = message
self.exception = exception
def is_success(self) -> bool:
return self.state == PostProcessCheckStatus.SUCCESS
def is_warning(self) -> bool:
return self.state == PostProcessCheckStatus.WARNING
def is_error(self) -> bool:
return self.state == PostProcessCheckStatus.ERROR
class PostProcessResponseResult:
'''Post processing response result information.
There are multiple operations that occur when post processing a response. This contains all the
results for each operation performed. Note that the number and types of steps performed is
dependant on test step itself.
'''
def __init__(self):
self.entries = []
self.successes = 0
self.warnings = 0
self.errors = 0
def success(self, category: PostProcessCheckType, message: str):
'''Adds a success entry that occured when post processing response to results.'''
self._insert(PostProcessCheckStatus.SUCCESS, category, message)
self.successes += 1
def warning(self, category: PostProcessCheckType, message: str):
'''Adds a warning entry that occured when post processing response to results.'''
self._insert(PostProcessCheckStatus.WARNING, category, message)
self.warnings += 1
def error(self, category: PostProcessCheckType, message: str, exception: TestStepError = None):
'''Adds an error entry that occured when post processing response to results.'''
self._insert(PostProcessCheckStatus.ERROR,
category, message, exception)
self.errors += 1
def is_success(self):
# It is possible that post processing a response doesn't have any success entires added
# that is why we explicitly only search for if an error occurred.
return self.errors == 0
def is_failure(self):
return self.errors != 0
def _insert(self, state: PostProcessCheckStatus, category: PostProcessCheckType, message: str, exception: Exception = None):
log = PostProcessCheck(state, category, message, exception)
self.entries.append(log)
def _value_or_none(data, key):
return data[key] if key in data else None
def _value_or_config(data, key, config):
return data[key] if key in data else config.get(key)
class EnumType:
def __init__(self, enum: Enum):
self.type = enum.name
self.base_type = enum.base_type
self._codes = {}
self.entries_by_name = {}
self.entries_by_code = {}
self._compute_entries(enum)
def translate(self, key: str, value) -> int:
if self._codes.get(key) is not None and self._codes.get(key) == value:
return self._codes.get(key)
if type(value) is str:
code = self._get_code_by_name(value)
else:
code = self._get_code_by_value(value)
if code is None:
raise TestStepEnumError(value, self.entries_by_name)
self._codes[key] = code
return code
def _get_code_by_name(self, value):
# For readability the name could sometimes be written as "enum_name(enum_code)" instead of "enum_name"
# In this case the enum_code should be checked to ensure that it is correct, unless enum_name is UnknownEnumValue
# in which case only invalid enum_code are allowed.
specified_name, specified_code = self._extract_name_and_code(value)
if specified_name not in self.entries_by_name:
return None
enum_code = self.entries_by_name.get(specified_name)
if specified_code is None or specified_code == enum_code:
return enum_code
if specified_name != f'{self.type}.UnknownEnumValue':
raise TestStepEnumSpecifierWrongError(
specified_code, specified_name, enum_code)
enum_name = self.entries_by_code.get(specified_code)
if enum_name:
raise TestStepEnumSpecifierNotUnknownError(value, enum_name)
return specified_code
def _get_code_by_value(self, value):
enum_name = self.entries_by_code.get(value)
if not enum_name:
return None
if STRICT_ENUM_VALUE_CHECK:
raise TestStepEnumError(value, self.entries_by_name)
return value
def _compute_entries(self, enum: Enum):
enum_codes = []
for enum_entry in enum.entries:
name = f'{self.type}.{enum_entry.name}'
code = enum_entry.code
self.entries_by_name[name] = code
self.entries_by_code[code] = name
enum_codes.append(code)
# search for the first invalid entry if any
max_code = 0xFF + 1
if self.base_type == 'enum16':
max_code = 0xFFFF + 1
for code in range(0, max_code):
if code not in enum_codes:
name = f'{self.type}.UnknownEnumValue'
self.entries_by_name[name] = code
self.entries_by_code[code] = name
break
def _extract_name_and_code(self, enum_name: str):
match = re.match(r"([\w.]+)(?:\((\w+)\))?", enum_name)
if match:
name = match.group(1)
code = int(match.group(2)) if match.group(2) else None
return name, code
return None, None
@staticmethod
def is_valid_type(target_type: str):
return target_type == 'enum8' or target_type == 'enum16'
class _TestStepWithPlaceholders:
'''A single YAML test parsed, as is, from YAML.
Some YAML test steps contain placeholders for variable subsitution. The value of the variable
is only known after an earlier test step's has executed and the result successfully post
processed.
'''
def __init__(self, test: dict, config: dict, definitions: SpecDefinitions, pics_checker: PICSChecker):
# Disabled tests are not parsed in order to allow the test to be added to the test
# suite even if the feature is not implemented yet.
self.is_enabled = not ('disabled' in test and test['disabled'])
if not self.is_enabled:
return
self._parsing_config_variable_storage = config
self.label = _value_or_none(test, 'label')
self.node_id = _value_or_config(test, 'nodeId', config)
self.group_id = _value_or_config(test, 'groupId', config)
self.cluster = _value_or_config(test, 'cluster', config)
self.command = _value_or_config(test, 'command', config)
if not self.command:
self.command = _value_or_config(test, 'wait', config)
self.attribute = _value_or_none(test, 'attribute')
self.event = _value_or_none(test, 'event')
self.endpoint = _value_or_config(test, 'endpoint', config)
self.pics = _value_or_none(test, 'PICS')
self.is_pics_enabled = pics_checker.check(_value_or_none(test, 'PICS'))
self.identity = _value_or_none(test, 'identity')
self.fabric_filtered = _value_or_none(test, 'fabricFiltered')
self.min_interval = _value_or_none(test, 'minInterval')
self.max_interval = _value_or_none(test, 'maxInterval')
self.keep_subscriptions = _value_or_none(test, 'keepSubscriptions')
self.timed_interaction_timeout_ms = _value_or_none(
test, 'timedInteractionTimeoutMs')
self.timeout = _value_or_none(test, 'timeout')
self.data_version = _value_or_none(
test, 'dataVersion')
self.busy_wait_ms = _value_or_none(test, 'busyWaitMs')
self.wait_for = _value_or_none(test, 'wait')
self.event_number = _value_or_none(test, 'eventNumber')
self.run_if = _value_or_none(test, 'runIf')
self.save_response_as = _value_or_none(test, 'saveResponseAs')
self.is_attribute = self.__is_attribute_command()
self.is_event = self.__is_event_command()
arguments = _value_or_none(test, 'arguments')
self._convert_single_value_to_values(arguments)
self.arguments_with_placeholders = arguments
responses = _value_or_none(test, 'response')
# Test may expect multiple responses. For example reading events may
# trigger multiple event responses. Or reading multiple attributes
# at the same time, may trigger multiple responses too.
if responses is None:
# If no response is specified at all, it implies that the step expect
# a success with any associatied value(s). So the empty response is effectively
# replace by an array that contains an empty object to represent that.
responses = [{}]
elif not isinstance(responses, list):
# If a single response is specified, it is converted to a list of responses.
responses = [responses]
for response in responses:
self._convert_single_value_to_values(response)
self.responses_with_placeholders = responses
self._update_mappings(test, definitions)
self.update_arguments(self.arguments_with_placeholders)
self.update_responses(self.responses_with_placeholders)
# This performs a very basic sanity parse time check of constraints. This parsing happens
# again inside post processing response since at that time we will have required variables
# to substitute in. This parsing check here has value since some test can take a really
# long time to run so knowing earlier on that the test step would have failed at parsing
# time before the test step run occurs save developer time that building yaml tests.
for response in self.responses_with_placeholders:
for value in response:
if 'constraints' not in value:
continue
get_constraints(value['constraints'])
def _update_mappings(self, test: dict, definitions: SpecDefinitions):
cluster_name = self.cluster
if definitions is None or (not definitions.has_cluster_by_name(cluster_name) and cluster_name != ANY_COMMANDS_CLUSTER_NAME):
self.argument_mapping = None
self.response_mapping = None
self.response_mapping_name = None
return
argument_mapping = None
response_mapping = None
response_mapping_name = None
if self.is_attribute:
attribute_name = self.attribute
attribute = definitions.get_attribute_by_name(
cluster_name,
attribute_name
)
if not attribute:
targets = definitions.get_attribute_names(cluster_name)
raise TestStepAttributeKeyError(test, attribute_name, targets)
attribute_mapping = self._as_mapping(
definitions,
cluster_name,
attribute.definition.data_type.name
)
argument_mapping = attribute_mapping
response_mapping = attribute_mapping
response_mapping_name = attribute.definition.data_type.name
elif self.is_event:
event_name = self.event
event = definitions.get_event_by_name(
cluster_name,
event_name
)
if not event:
targets = definitions.get_event_names(cluster_name)
raise TestStepEventKeyError(test, event_name, targets)
event_mapping = self._as_mapping(
definitions,
cluster_name,
event_name
)
argument_mapping = event_mapping
response_mapping = event_mapping
response_mapping_name = event.name
elif cluster_name == ANY_COMMANDS_CLUSTER_NAME or self.command in ANY_COMMANDS_LIST:
# When the cluster is ANY_COMMANDS_CLUSTER_NAME the test step does not contain the direct mapping
# for the response in the cluster/command/attribute/event fields.
#
# When the command is part of ANY_COMMANDS_LIST the test step does not contain the direct mapping
# for the response in the command/attribute/event fields.
#
# NOTE: The logic for this paragraph has not yet be implemented.
# In some cases the response type can be inferred from the argument fields, if for example the command
# is a ReadById targetting a specific ClusterId/AttributeId that exists in the definitions.
#
# For the other cases, the response can NOT be inferred directly from the argumment fields, if for exammple
# the command is a ReadById using a wildcard in one of its fields. For this type of case, the test writer
# can add additional specifiers in the expected response type to help determine the response mapping.
mapping_names = []
for response in self.responses_with_placeholders:
for value in response.get('values'):
if 'constraints' not in value:
continue
mapping_name = None
cluster_name = self.cluster if self.cluster != ANY_COMMANDS_CLUSTER_NAME else response.get(
'cluster')
if cluster_name is not None:
attribute_name = response.get('attribute')
event_name = response.get('event')
command_name = response.get('command')
if attribute_name:
attribute = definitions.get_attribute_by_name(
cluster_name, attribute_name)
if not attribute:
targets = definitions.get_attribute_names(
cluster_name)
test['response'] = ['...', response, '...']
raise TestStepAttributeKeyError(
response, attribute_name, targets)
mapping_name = attribute.definition.data_type.name
elif event_name:
event = definitions.get_event_by_name(
cluster_name, event_name)
if not event:
targets = definitions.get_event_names(
cluster_name)
test['response'] = ['...', response, '...']
raise TestStepEventKeyError(
response, attribute_name, targets)
mapping_name = event.name
elif command_name:
command = definitions.get_command_by_name(
cluster_name, command_name)
if not command:
targets = definitions.get_command_names(
cluster_name)
test['response'] = ['...', response, '...']
raise TestStepCommandKeyError(
response, command_name, targets)
mapping_name = command.output_param
mapping_names.append(mapping_name)
# TODO: For now only the response_mapping_name is inferred, it allows to use the type constraint
# on the responses.
argument_mapping = None
response_mapping = None
response_mapping_name = mapping_names
else:
command_name = self.command
command = definitions.get_command_by_name(
cluster_name,
command_name
)
if not command:
targets = definitions.get_command_names(cluster_name)
raise TestStepCommandKeyError(test, command_name, targets)
if command.input_param is None:
argument_mapping = {}
else:
argument_mapping = self._as_mapping(
definitions,
cluster_name,
command.input_param
)
response_mapping = self._as_mapping(
definitions,
cluster_name,
command.output_param
)
response_mapping_name = command.output_param
self.argument_mapping = argument_mapping
self.response_mapping = response_mapping
self.response_mapping_name = response_mapping_name
def _convert_single_value_to_values(self, container):
if container is None or 'values' in container:
return
# Attribute tests pass a single value argument that does not carry a name but
# instead uses a generic 'value' keyword. Convert to keyword to be the single
# members of the 'values' array which is what is used for other tests.
value = {}
known_keys_to_copy = ['value', 'constraints',
'saveAs', 'saveDataVersionAs']
known_keys_to_allow = ['error', 'clusterError']
for key, item in list(container.items()):
if key in known_keys_to_copy:
value[key] = item
del container[key]
elif key in known_keys_to_allow:
# Nothing to do for those keys.
pass
else:
raise TestStepKeyError(item, key)
container['values'] = [value]
def _as_mapping(self, definitions, cluster_name, target_name):
element = definitions.get_type_by_name(cluster_name, target_name)
if hasattr(element, 'base_type'):
if EnumType.is_valid_type(element.base_type):
target_name = EnumType(element)
else:
target_name = element.base_type
elif hasattr(element, 'fields'):
target_name = {f.name: self._as_mapping(
definitions, cluster_name, f.data_type.name) for f in element.fields}
elif target_name:
target_name = target_name.lower()
return target_name
def update_arguments(self, arguments_with_placeholders):
self._update_with_definition(
arguments_with_placeholders, self.argument_mapping)
def update_responses(self, responses_with_placeholders):
for response in responses_with_placeholders:
self._update_with_definition(
response, self.response_mapping)
def _update_with_definition(self, container: dict, mapping_type):
if not container or mapping_type is None:
return
values = container['values']
if values is None:
return
for value in list(values):
for key, item_value in list(value.items()):
if self.is_attribute or self.is_event:
mapping = mapping_type
else:
target_key = value['name']
if mapping_type.get(target_key) is None:
raise TestStepValueNameError(
value, target_key, [key for key in mapping_type])
mapping = mapping_type[target_key]
if key == 'value':
value[key] = self._update_value_with_definition(
value,
key,
item_value,
mapping
)
elif key == 'saveAs' and type(item_value) is str and item_value not in self._parsing_config_variable_storage:
self._parsing_config_variable_storage[item_value] = None
elif key == 'saveDataVersionAs' and type(item_value) is str and item_value not in self._parsing_config_variable_storage:
self._parsing_config_variable_storage[item_value] = None
elif key == 'constraints':
for constraint, constraint_value in item_value.items():
# Only apply update_value_with_definition to constraints that have a value that depends on
# the the value type for the target field.
if is_typed_constraint(constraint):
value[key][constraint] = self._update_value_with_definition(
item_value,
constraint,
constraint_value,
mapping
)
else:
# This key, value pair does not rely on cluster specifications.
pass
def _update_value_with_definition(self, container: dict, key: str, value, mapping_type):
"""
Processes a given value based on a specified mapping type and returns the updated value.
This method does not modify the container in place; rather, it returns a new value that should be
used to update or process further as necessary.
The 'container' and 'key' parameters are primarily used for error tagging. If an error occurs
during the value processing, these parameters allow for the error to be precisely located and
reported, facilitating easier debugging and error tracking.
Parameters:
- container (dict): A dictionary that serves as a context for the operation. It is used for error
tagging if processing fails, by associating errors with specific locations within the data structure.
- key (str): The key related to the value being processed. It is used alongside 'container' to tag
errors, enabling precise identification of the error source.
- value: The value to be processed according to the mapping type.
- mapping_type: Dictates the processing or mapping logic to be applied to 'value'.
Returns:
The processed value, which is the result of applying the specified mapping type to the original 'value'.
This method does not update the 'container'; any necessary updates based on the processed value must
be handled outside this method.
Raises:
- TestStepError: If an error occurs during the processing of the value. The error includes details
from the 'container' and 'key' to facilitate error tracing and debugging.
"""
if not mapping_type:
return value
if type(value) is dict:
rv = {}
for item_key in value:
# FabricIndex is a special case where the framework requires it to be passed even
# if it is not part of the requested arguments per spec and not part of the XML
# definition.
if item_key == 'FabricIndex' or item_key == 'fabricIndex':
rv[item_key] = value[item_key] # int64u
else:
if not mapping_type.get(item_key):
raise TestStepKeyError(value, item_key)
mapping = mapping_type[item_key]
rv[item_key] = self._update_value_with_definition(
value,
item_key,
value[item_key],
mapping
)
return rv
if type(value) is list:
return [self._update_value_with_definition(container, key, entry, mapping_type) for entry in value]
# TODO currently unsure if the check of `value not in config` is sufficant. For
# example let's say value = 'foo + 1' and map type is 'int64u', we would arguably do
# the wrong thing below.
if value is not None and value not in self._parsing_config_variable_storage:
if type(mapping_type) is EnumType:
try:
value = mapping_type.translate(key, value)
except (TestStepEnumError, TestStepEnumSpecifierNotUnknownError, TestStepEnumSpecifierWrongError) as e:
e.tag_key_with_error(container, key)
raise e
elif mapping_type == 'int64u' or mapping_type == 'int64s' or mapping_type == 'bitmap64' or mapping_type == 'epoch_us':
value = fixes.try_apply_float_to_integer_fix(value)
value = fixes.try_apply_yaml_cpp_longlong_limitation_fix(value)
value = fixes.try_apply_yaml_unrepresentable_integer_for_javascript_fixes(
value)
elif mapping_type == 'single' or mapping_type == 'double':
value = fixes.try_apply_yaml_float_written_as_strings(value)
elif isinstance(value, float) and mapping_type != 'single' and mapping_type != 'double':
value = fixes.try_apply_float_to_integer_fix(value)
elif mapping_type == 'octet_string' or mapping_type == 'long_octet_string':
value = fixes.convert_yaml_octet_string_to_bytes(value)
elif mapping_type == 'boolean':
value = bool(value)
return value
def __is_attribute_command(self) -> bool:
commands = {
'readAttribute',
'writeAttribute',
'subscribeAttribute',
'waitForReport',
}
return self.attribute and (self.command in commands or self.wait_for in commands)
def __is_event_command(self) -> bool:
commands = {
'readEvent',
'subscribeEvent',
'waitForReport',
}
return self.event and (self.command in commands or self.wait_for in commands)
class TestStep:
'''A single YAML test action parsed from YAML.
This object contains all the information required for a test runner to execute the test step.
It also provide a function that is expected to be called by the test runner to post process
the recieved response from the accessory. Post processing both validates recieved response
and saves any variables that might be required but test step that have yet to be executed.
'''
def __init__(self, test: _TestStepWithPlaceholders, step_index: int, runtime_config_variable_storage: dict):
self._test = test
self._step_index = step_index
self._runtime_config_variable_storage = runtime_config_variable_storage
self.arguments = copy.deepcopy(test.arguments_with_placeholders)
self.responses = copy.deepcopy(test.responses_with_placeholders)
if test.is_pics_enabled:
self._update_placeholder_values(self.arguments)
self._update_placeholder_values(self.responses)
self._test.data_version = self._config_variable_substitution(
self._test.data_version)
self._test.node_id = self._config_variable_substitution(
self._test.node_id)
self._test.run_if = self._config_variable_substitution(
self._test.run_if)
self._test.event_number = self._config_variable_substitution(
self._test.event_number)
self._test.cluster = self._config_variable_substitution(
self._test.cluster)
self._test.command = self._config_variable_substitution(
self._test.command)
self._test.attribute = self._config_variable_substitution(
self._test.attribute)
self._test.event = self._config_variable_substitution(
self._test.event)
self._test.endpoint = self._config_variable_substitution(
self._test.endpoint)
self._test.group_id = self._config_variable_substitution(
self._test.group_id)
self._test.node_id = self._config_variable_substitution(
self._test.node_id)
test.update_arguments(self.arguments)
test.update_responses(self.responses)
@property
def step_index(self):
return self._step_index
@property
def is_enabled(self):
return self._test.is_enabled
@property
def is_pics_enabled(self):
return self._test.is_pics_enabled and (self._test.run_if is None or self._test.run_if)
@property
def is_attribute(self):
return self._test.is_attribute
@property
def is_event(self):
return self._test.is_event
@property
def label(self):
return self._test.label
@property
def node_id(self):
return self._test.node_id
@property
def group_id(self):
return self._test.group_id
@property
def cluster(self):
return self._test.cluster
@property
def command(self):
return self._test.command
@property
def attribute(self):
return self._test.attribute
@property
def event(self):
return self._test.event
@property
def endpoint(self):
return self._test.endpoint
@property
def identity(self):
return self._test.identity
@property
def fabric_filtered(self):
return self._test.fabric_filtered
@property
def min_interval(self):
return self._test.min_interval
@property
def max_interval(self):
return self._test.max_interval
@property
def keep_subscriptions(self):
return self._test.keep_subscriptions
@property
def timed_interaction_timeout_ms(self):
return self._test.timed_interaction_timeout_ms
@property
def timeout(self):
return self._test.timeout
@property
def data_version(self):
return self._test.data_version
@property
def busy_wait_ms(self):
return self._test.busy_wait_ms
@property
def wait_for(self):
return self._test.wait_for
@property
def event_number(self):
return self._test.event_number
@event_number.setter
def event_number(self, value):
self._test.event_number = value
@property
def pics(self):
return self._test.pics
def _get_last_event_number(self, responses) -> Optional[int]:
if not self.is_event:
return None
# find the largest event number in all responses
# This iterates over everything (not just last element) since some commands like
# `chip-tool any read-all` may return multiple replies
event_number = None
for response in responses:
if not isinstance(response, dict):
continue
received_event_number = response.get('eventNumber')
if not isinstance(received_event_number, int):
continue
if (event_number is None) or (event_number < received_event_number):
event_number = received_event_number
return event_number
def post_process_response(self, received_responses):
result = PostProcessResponseResult()
# A list of responses is what is expected, but for legacy, if the response
# does not comes up as a list, it is converted here.
# TODO It should be removed once all decoders returns a list.
if not isinstance(received_responses, list):
received_responses = [received_responses]
if self._test.save_response_as:
self._runtime_config_variable_storage[self._test.save_response_as] = received_responses
if self.is_event:
last_event_number = self._get_last_event_number(received_responses)
if last_event_number:
if 'LastReceivedEventNumber' in self._runtime_config_variable_storage:
if self._runtime_config_variable_storage['LastReceivedEventNumber'] > last_event_number:
logging.warning(
"Received an older event than expected: received %r < %r",
last_event_number,
self._runtime_config_variable_storage['LastReceivedEventNumber']
)
self._runtime_config_variable_storage['LastReceivedEventNumber'] = last_event_number
if self.wait_for is not None:
self._response_cluster_wait_validation(received_responses, result)
return result
check_type = PostProcessCheckType.RESPONSE_VALIDATION
error_failure_wrong_response_number = (f'The test expects {len(self.responses)} responses '
f'but got {len(received_responses)} responses.')
received_responses_copy = copy.deepcopy(received_responses)
for expected_response in self.responses:
if len(received_responses_copy) == 0:
result.error(check_type, error_failure_wrong_response_number)
return result
received_response = received_responses_copy.pop(0)
self._response_error_validation(
expected_response, received_response, result)
self._response_cluster_error_validation(
expected_response, received_response, result)
self._response_values_source_validation(
expected_response, received_response, result)
self._response_values_validation(
expected_response, received_response, result)
self._response_constraints_validation(
expected_response, received_response, result)
self._maybe_save_as('saveAs', 'value',
expected_response, received_response, result)
self._maybe_save_as('saveDataVersionAs', 'dataVersion',
expected_response, received_response, result)
# An empty response array in a test step (responses: []) implies that the test step does expect a response
# but without any associated value.
if self.responses == [] and received_responses_copy == [{}]:
# if the received responses is a simple success ([{}]), that is valid.
return result
# This is different from the case where no response is specified at all, which implies that the step expect
# a success with any associatied value(s).
elif self.responses == [{'values': [{}]}] and len(received_responses_copy):
# if there are multiple responses and the test specifies that it does not really care
# about which values are returned, that is valid too.
return result
# Anything more complex where the response field as been defined with some values and the number
# of expected responses differs from the number of received responses is an error.
elif len(received_responses_copy) != 0:
result.error(check_type, error_failure_wrong_response_number)
return result
def _response_cluster_wait_validation(self, received_responses, result):
"""Check if the response concrete path matches the configuration of the test step
and validate that the response type (e.g readAttribute/writeAttribute/...) matches
the expectation from the test step."""
check_type = PostProcessCheckType.WAIT_VALIDATION
error_success = 'The test expectation "{wait_for}" for "{cluster}.{wait_type}" on endpoint {endpoint} is true'
error_failure = 'The test expectation "{expected} == {received}" is false'
if len(received_responses) > 1:
result.error(check_type, error_failure.multiple_responses)
return
received_response = received_responses[0]
if self.is_attribute:
expected_wait_type = self.attribute
received_wait_type = received_response.get('attribute')
elif self.is_event:
expected_wait_type = self.event
received_wait_type = received_response.get('event')
else:
expected_wait_type = self.command
received_wait_type = received_response.get('command')
expected_values = [
self.wait_for,
self.endpoint,
# TODO The name in tests does not always use spaces
self.cluster.replace(' ', ''),
expected_wait_type
]
wait_for_str = received_response.get('wait_for')
if not wait_for_str:
wait_for_str = received_response.get('command')
received_values = [
wait_for_str,
received_response.get('endpoint'),
received_response.get('cluster'),
received_wait_type
]
success = True
for expected_value in expected_values:
received_value = received_values.pop(0)
if expected_value != received_value:
result.error(check_type, error_failure.format(
expected=expected_value, received=received_value))
success = False
if success:
result.success(check_type, error_success.format(
wait_for=self.wait_for, cluster=self.cluster, wait_type=expected_wait_type, endpoint=self.endpoint))
def _response_error_validation(self, expected_response, received_response, result):
check_type = PostProcessCheckType.IM_STATUS
error_success = 'The test expects the "{error}" error which occured successfully.'
error_success_no_error = 'The test expects no error and no error occurred.'
error_wrong_error = 'The test expects the "{error}" error but the "{value}" error occured.'
error_unexpected_error = 'The test expects no error but the "{error}" error occured.'
error_unexpected_success = 'The test expects the "{error}" error but no error occured.'
expected_error = expected_response.get(
'error') if expected_response else None
received_error = received_response.get('error')
if expected_error and received_error and expected_error == received_error:
result.success(check_type, error_success.format(
error=expected_error))
elif expected_error and received_error:
result.error(check_type, error_wrong_error.format(
error=expected_error, value=received_error))
elif expected_error and not received_error:
result.error(check_type, error_unexpected_success.format(
error=expected_error))
elif not expected_error and received_error:
result.error(check_type, error_unexpected_error.format(
error=received_error))
elif not expected_error and not received_error:
result.success(check_type, error_success_no_error)
else:
# This should not happens
raise AssertionError('This should not happens.')
def _response_cluster_error_validation(self, expected_response, received_response, result):
check_type = PostProcessCheckType.CLUSTER_STATUS
error_success = 'The test expects the "{error}" error which occured successfully.'
error_unexpected_success = 'The test expects the "{error}" error but no error occured.'
error_wrong_error = 'The test expects the "{error}" error but the "{value}" error occured.'
expected_error = expected_response.get('clusterError')
received_error = received_response.get('clusterError')
if expected_error:
if received_error and expected_error == received_error:
result.success(check_type, error_success.format(
error=expected_error))
elif received_error:
result.error(check_type, error_wrong_error.format(
error=expected_error, value=received_error))
else:
result.error(check_type, error_unexpected_success.format(
error=expected_error))
else:
# Nothing is logged here to not be redundant with the generic error checking code.
pass
def _response_values_source_validation(self, expected_response, received_response, result):
check_type = PostProcessCheckType.RESPONSE_VALIDATION
error_value_wrong_source = 'The test expects a value from {source_name} "{expected}" but it received a value from {source_name} "{received}".'
sources = ['endpoint', 'cluster', 'attribute']
for source_name in sources:
expected = expected_response.get(source_name)
received = received_response.get(source_name)
success = expected is None or received is None or expected == received
if not success:
result.error(check_type, error_value_wrong_source.format(
source_name=source_name, expected=expected, received=received))
def _response_values_validation(self, expected_response, received_response, result):
check_type = PostProcessCheckType.RESPONSE_VALIDATION
error_success = 'The test expectation "{name} == {value}" is true'
error_failure = 'The test expectation "{name} ({actual}) == {value}" is false'
error_name_does_not_exist = 'The test expects a value named "{name}" but it does not exists in the response."'
error_value_does_not_exist = 'The test expects a value but it does not exists in the response."'
for value in expected_response['values']:
if 'value' not in value:
continue
expected_name = 'value'
if expected_name not in received_response:
result.error(
check_type, error_value_does_not_exist)
break
received_value = received_response.get('value')
if not self.is_attribute and not self.is_event and self.command not in ANY_COMMANDS_LIST:
expected_name = value.get('name')
if expected_name not in received_value:
result.error(check_type, error_name_does_not_exist.format(
name=expected_name))
continue
received_value = received_value.get(
expected_name) if received_value else None
expected_value = value.get('value')
if self._response_value_validation(expected_value, received_value):
result.success(check_type, error_success.format(
name=expected_name, value=expected_value))
else:
result.error(check_type, error_failure.format(
name=expected_name, actual=received_value, value=expected_value))
def _response_value_validation(self, expected_value, received_value):
if isinstance(expected_value, list):
if len(expected_value) != len(received_value):
return False
for index, expected_item in enumerate(expected_value):
received_item = received_value[index]
if not self._response_value_validation(expected_item, received_item):
return False
return True
elif isinstance(expected_value, dict):
for key, expected_item in expected_value.items():
received_item = received_value.get(key)
if not self._response_value_validation(expected_item, received_item):
return False
return True
else:
return expected_value == received_value
def _response_constraints_validation(self, expected_response, received_response, result):
check_type = PostProcessCheckType.CONSTRAINT_VALIDATION
error_success = 'Constraints check passed'
error_failure = 'Constraints check failed'
response_type_name = self._test.response_mapping_name
for value in expected_response['values']:
if 'constraints' not in value:
continue
received_value = received_response.get('value')
if self.command in ANY_COMMANDS_LIST:
response_type_name = response_type_name.pop(0)
elif not self.is_attribute and not self.is_event:
expected_name = value.get('name')
if received_value is None or expected_name not in received_value:
received_value = None
else:
received_value = received_value.get(
expected_name) if received_value else None
if self._test.response_mapping:
response_type_name = self._test.response_mapping.get(
expected_name)
else:
# We don't have a mapping for this type. This happens for pseudo clusters.
# If there is a constraint check for the type it is likely an incorrect
# constraint check by the test writter.
response_type_name = None
constraints = get_constraints(value['constraints'])
for constraint in constraints:
try:
constraint.validate(received_value, response_type_name)
result.success(check_type, error_success)
except TestStepError as e:
e.update_context(expected_response, self.step_index)
result.error(check_type, error_failure, e)
def _maybe_save_as(self, key: str, default_target: str, expected_response, received_response, result):
check_type = PostProcessCheckType.SAVE_AS_VARIABLE
error_success = 'The test save the value "{value}" as {name}.'
error_name_does_not_exist = 'The test expects a value named "{name}" but it does not exists in the response."'
for value in expected_response['values']:
if key not in value:
continue
received_value = received_response.get(default_target)
if not self.is_attribute and not self.is_event and self.command not in ANY_COMMANDS_LIST:
expected_name = value.get('name')
if received_value is None or expected_name not in received_value:
result.error(check_type, error_name_does_not_exist.format(
name=expected_name))
continue
received_value = received_value.get(
expected_name) if received_value else None
save_as = value.get(key)
self._runtime_config_variable_storage[save_as] = received_value
result.success(check_type, error_success.format(
value=received_value, name=save_as))
def _update_placeholder_values(self, containers):
if not containers:
return
if not isinstance(containers, list):
containers = [containers]
for container in containers:
values = container['values']
for idx, item in enumerate(values):
if 'value' in item:
values[idx]['value'] = self._config_variable_substitution(
item['value'])
if 'constraints' in item:
for constraint, constraint_value in item['constraints'].items():
values[idx]['constraints'][constraint] = self._config_variable_substitution(
constraint_value)
container['values'] = values
def _config_variable_substitution(self, value):
if type(value) is list:
return [self._config_variable_substitution(entry) for entry in value]
elif type(value) is dict:
mapped_value = {}
for key in value:
mapped_value[key] = self._config_variable_substitution(
value[key])
return mapped_value
elif type(value) is str:
# For most tests, a single config variable is used and it can be replaced as in.
# But some other tests were relying on the fact that the expression was put 'as if' in
# the generated code and was resolved before being sent over the wire. For such
# expressions (e.g 'myVar + 1') we need to compute it before sending it over the wire.
delimiter_regex = "(\ |\(|\)|\+|\-|\*|\/|\%)"
tokens = re.split(delimiter_regex, value)
if len(tokens) == 0:
return value
substitution_occured = False
for idx, token in enumerate(tokens):
if token in self._runtime_config_variable_storage:
variable_info = self._runtime_config_variable_storage[token]
if type(variable_info) is dict and 'defaultValue' in variable_info:
variable_info = variable_info['defaultValue']
tokens[idx] = variable_info
substitution_occured = True
if len(tokens) == 1:
return tokens[0]
tokens = [str(token) for token in tokens]
value = ''.join(tokens)
# TODO we should move away from eval. That will mean that we will need to do extra
# parsing, but it would be safer then just blindly running eval.
return value if not substitution_occured else eval(value)
else:
return value
class YamlTests:
'''Parses YAML tests and becomes an iterator to provide 'TestStep's
The provided TestStep is expected to be used by a runner/adapter to run the test step and
provide the response from the device to the TestStep object.
Currently this is a one time use object. Eventually this should be refactored to take a
runner/adapter as an argument and run through all test steps and should be reusable for
multiple runs.
'''
def __init__(self, parsing_config_variable_storage: dict, definitions: SpecDefinitions, pics_checker: PICSChecker, tests: dict):
self._parsing_config_variable_storage = parsing_config_variable_storage
enabled_tests = []
try:
for step_index, step in enumerate(tests):
test_with_placeholders = _TestStepWithPlaceholders(
step, self._parsing_config_variable_storage, definitions, pics_checker)
if test_with_placeholders.is_enabled:
enabled_tests.append(test_with_placeholders)
except TestStepError as e:
e.update_context(step, step_index)
raise
fixes.try_update_yaml_node_id_test_runner_state(
enabled_tests, self._parsing_config_variable_storage)
self._runtime_config_variable_storage = copy.deepcopy(
parsing_config_variable_storage)
self._tests = enabled_tests
self._index = 0
self.count = len(self._tests)
def __iter__(self):
return self
def __next__(self) -> TestStep:
if self._index < self.count:
test = self._tests[self._index]
test_step = TestStep(test, self._index + 1,
self._runtime_config_variable_storage)
self._index += 1
return test_step
raise StopIteration
@dataclass
class TestParserConfig:
pics: str = None
definitions: SpecDefinitions = None
config_override: dict = field(default_factory=dict)
class TestParser:
def __init__(self, test_file: str, parser_config: TestParserConfig = TestParserConfig()):
yaml_loader = YamlLoader()
filename, name, pics, config, tests = yaml_loader.load(test_file)
self.__apply_legacy_config(config)
self.__apply_config_override(config, parser_config.config_override)
self.filename = filename
self.name = name
self.PICS = pics
self.tests = YamlTests(
config,
parser_config.definitions,
PICSChecker(parser_config.pics),
tests
)
self.timeout = config['timeout']
self.definitions = parser_config.definitions
def __apply_config_override(self, config, config_override):
for key, value in config_override.items():
if value is None or key not in config:
continue
is_node_id = key == 'nodeId' or (isinstance(
config[key], dict) and config[key].get('type') == 'node_id')
if type(value) is str:
if key == 'timeout' or key == 'endpoint':
value = int(value)
elif is_node_id and value.startswith('0x'):
value = int(value, 16)
elif is_node_id:
value = int(value)
if isinstance(config[key], dict) and 'defaultValue' in config[key]:
config[key]['defaultValue'] = value
else:
config[key] = value
def __apply_legacy_config(self, config):
# These are a list of "KnownVariables". These are defaults the codegen used to use. This
# is added for legacy support of tests that expect to uses these "defaults".
self.__apply_legacy_config_if_missing(config, 'nodeId', 0x12345)
self.__apply_legacy_config_if_missing(config, 'endpoint', '')
self.__apply_legacy_config_if_missing(config, 'cluster', '')
self.__apply_legacy_config_if_missing(config, 'timeout', 90)
# These values are default runtime values (non-legacy)
self.__apply_legacy_config_if_missing(
config, 'LastReceivedEventNumber', 0)
def __apply_legacy_config_if_missing(self, config, key, value):
if key not in config:
config[key] = value