| # Copyright (c) 2023 Project CHIP Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import base64 |
| import json |
| |
| # These constants represent the vocabulary used for the incoming JSON. |
| _CLUSTER_ID = 'clusterId' |
| _ENDPOINT_ID = 'endpointId' |
| _RESPONSE_ID = 'commandId' |
| _ATTRIBUTE_ID = 'attributeId' |
| _EVENT_ID = 'eventId' |
| |
| # These constants represent the vocabulary used for the outgoing data. |
| _CLUSTER = 'cluster' |
| _ENDPOINT = 'endpoint' |
| _RESPONSE = 'command' |
| _ATTRIBUTE = 'attribute' |
| _EVENT = 'event' |
| |
| |
| # These constants represent the common vocabulary between input and output. |
| _ERROR = 'error' |
| _CLUSTER_ERROR = 'clusterError' |
| _VALUE = 'value' |
| |
| # FabricIndex is a special case where the field is added as a struct field by the SDK |
| # if needed but is not part of the XML definition of the struct. |
| # These constants are used to map from the field code (254) to the field name if such |
| # a field is used for a fabric scoped struct. |
| _FABRIC_INDEX_FIELD_CODE = '254' |
| _FABRIC_INDEX_FIELD_NAME = 'FabricIndex' |
| _FABRIC_INDEX_FIELD_TYPE = 'int8u' |
| |
| |
| class Decoder: |
| """ |
| This class implement decoding a test step response from the adapter format to the |
| matter_yamltests format. |
| """ |
| |
| def __init__(self, specifications): |
| self.__specs = specifications |
| self.__converter = Converter(specifications) |
| |
| def decode(self, payload): |
| payload, logs = self.__get_payload_content(payload) |
| payload = self.__translate_names(payload) |
| payload = self.__converter.convert(payload) |
| |
| if len(payload) == 0: |
| payload = [{}] |
| elif len(payload) > 1 and payload[-1] == {'error': 'FAILURE'}: |
| payload = payload[:-1] |
| return payload, logs |
| |
| def __translate_names(self, payloads): |
| translated_payloads = [] |
| specs = self.__specs |
| |
| for payload in payloads: |
| translated_payload = {} |
| for key, value in payload.items(): |
| if key == _CLUSTER_ID: |
| key = _CLUSTER |
| value = specs.get_cluster_name(value) |
| elif key == _ENDPOINT_ID: |
| key = _ENDPOINT |
| elif key == _RESPONSE_ID: |
| key = _RESPONSE |
| value = specs.get_response_name( |
| payload[_CLUSTER_ID], value) |
| elif key == _ATTRIBUTE_ID: |
| key = _ATTRIBUTE |
| value = specs.get_attribute_name( |
| payload[_CLUSTER_ID], value) |
| elif key == _EVENT_ID: |
| key = _EVENT |
| value = specs.get_event_name(payload[_CLUSTER_ID], value) |
| elif key == _VALUE or key == _ERROR or key == _CLUSTER_ERROR: |
| pass |
| else: |
| # Raise an error since the other fields probably needs to be translated too. |
| raise KeyError(f'Error: field "{key}" not supported') |
| |
| if value is None and (key == _CLUSTER or key == _RESPONSE or key == _ATTRIBUTE or key == _EVENT): |
| # If the definition for this cluster/command/attribute/event is missing, there is not |
| # much we can do to convert the response to the proper format. It usually indicates that |
| # the cluster definition is missing something. So we just raise an exception to tell the |
| # user something is wrong and the cluster definition needs to be updated. |
| cluster_code = hex(payload[_CLUSTER_ID]) |
| if key == _CLUSTER: |
| raise KeyError( |
| f'Error: The cluster ({cluster_code}) definition can not be found. Please update the cluster definition.') |
| else: |
| value_code = hex(payload[key + 'Id']) |
| raise KeyError( |
| f'Error: The cluster ({cluster_code}) {key} ({value_code}) definition can not be found. Please update the cluster definition.') |
| |
| translated_payload[key] = value |
| translated_payloads.append(translated_payload) |
| |
| return translated_payloads |
| |
| def __get_payload_content(self, payload): |
| json_payload = json.loads(payload) |
| results = json_payload.get('results') |
| logs = MatterLog.decode_logs(json_payload.get('logs')) |
| return results, logs |
| |
| |
| class MatterLog: |
| def __init__(self, log): |
| self.module = log['module'] |
| self.level = log['category'] |
| |
| base64_message = log["message"].encode('utf-8') |
| decoded_message_bytes = base64.b64decode(base64_message) |
| # TODO We do assume utf-8 encoding is used, it may not be true though. |
| self.message = decoded_message_bytes.decode('utf-8') |
| |
| def decode_logs(logs): |
| return list(map(MatterLog, logs)) |
| |
| |
| class Converter(): |
| """ |
| This class converts between the JSON representation used by chip-tool to transmit |
| information and the response format expected by the test suite. |
| |
| There is not much differences and ideally we won't have to do any conversion. |
| For example chip-tool could do the field name mapping directly instead of relying on |
| the adapter, or floats can be converted to the right format directly. But in the |
| meantime the conversion is done here. |
| """ |
| |
| def __init__(self, specifications): |
| self.__specs = specifications |
| self.__converters = [ |
| StructFieldsNameConverter(), |
| FloatConverter(), |
| OctetStringConverter() |
| ] |
| |
| def convert(self, payloads): |
| return [self._convert(payload) for payload in payloads] |
| |
| def _convert(self, rv): |
| if _VALUE not in rv or _CLUSTER not in rv: |
| return rv |
| |
| if _RESPONSE in rv: |
| out_value = self.__convert_command(rv) |
| elif _ATTRIBUTE in rv: |
| out_value = self.__convert_attribute(rv) |
| elif _EVENT in rv: |
| out_value = self.__convert_event(rv) |
| else: |
| out_value = rv[_VALUE] |
| |
| rv[_VALUE] = out_value |
| return rv |
| |
| def __convert_command(self, rv): |
| specs = self.__specs |
| cluster_name = rv[_CLUSTER] |
| response_name = rv[_RESPONSE] |
| value = rv[_VALUE] |
| |
| response = specs.get_response_by_name(cluster_name, response_name) |
| if not response: |
| raise KeyError(f'Error: response "{response_name}" not found.') |
| |
| typename = response.name |
| array = False |
| return self.__run(value, cluster_name, typename, array) |
| |
| def __convert_attribute(self, rv): |
| specs = self.__specs |
| cluster_name = rv[_CLUSTER] |
| attribute_name = rv[_ATTRIBUTE] |
| value = rv[_VALUE] |
| |
| attribute = specs.get_attribute_by_name(cluster_name, attribute_name) |
| if not attribute: |
| raise KeyError(f'Error: attribute "{attribute_name}" not found.') |
| |
| typename = attribute.definition.data_type.name |
| array = attribute.definition.is_list |
| return self.__run(value, cluster_name, typename, array) |
| |
| def __convert_event(self, rv): |
| specs = self.__specs |
| cluster_name = rv[_CLUSTER] |
| event_name = rv[_EVENT] |
| value = rv[_VALUE] |
| |
| event = specs.get_event_by_name(cluster_name, event_name) |
| if not event: |
| raise KeyError(f'Error: event "{event_name}" not found.') |
| |
| typename = event.name |
| array = False |
| return self.__run(value, cluster_name, typename, array) |
| |
| def __run(self, value, cluster_name: str, typename: str, array: bool): |
| for converter in self.__converters: |
| value = converter.run(self.__specs, value, |
| cluster_name, typename, array) |
| return value |
| |
| |
| class BaseConverter: |
| def run(self, specs, value, cluster_name: str, typename: str, array: bool): |
| if isinstance(value, dict) and not array: |
| struct = specs.get_struct_by_name( |
| cluster_name, typename) or specs.get_event_by_name(cluster_name, typename) |
| for field in struct.fields: |
| field_name = field.name |
| field_type = field.data_type.name |
| field_array = field.is_list |
| if field_name in value: |
| value[field_name] = self.run( |
| specs, value[field_name], cluster_name, field_type, field_array) |
| elif isinstance(value, list) and array: |
| value = [self.run(specs, v, cluster_name, typename, False) |
| for v in value] |
| elif value is not None: |
| value = self.maybe_convert(typename.lower(), value) |
| |
| return value |
| |
| def maybe_convert(self, typename: str, value): |
| return value |
| |
| |
| class FloatConverter(BaseConverter): |
| """ |
| Jsoncpp stores floats as double. |
| For float values that are just stored as an approximation it ends up with |
| a different output than expected when reading them back |
| """ |
| |
| def maybe_convert(self, typename, value): |
| if typename == 'single': |
| value = float('%g' % value) |
| return value |
| |
| |
| class OctetStringConverter(BaseConverter): |
| def maybe_convert(self, typename, value): |
| if typename == 'octet_string' or typename == 'long_octet_string': |
| if value == '': |
| value = bytes() |
| elif value.startswith('base64:'): |
| value = base64.b64decode(value.removeprefix('base64:')) |
| |
| return value |
| |
| |
| class StructFieldsNameConverter(): |
| """ |
| Converts fields identifiers to the field names specified in the cluster definition. |
| """ |
| |
| def run(self, specs, value, cluster_name: str, typename: str, array: bool): |
| if isinstance(value, dict) and not array: |
| struct = specs.get_struct_by_name( |
| cluster_name, typename) or specs.get_event_by_name(cluster_name, typename) |
| for field in struct.fields: |
| field_code = field.code |
| field_name = field.name |
| field_type = field.data_type.name |
| field_array = field.is_list |
| # chip-tool returns the field code as an integer but the test suite expects |
| # a field name. |
| # To not confuse the test suite, the field code is replaced by its field name |
| # equivalent and then removed. |
| if str(field_code) in value: |
| value[field_name] = self.run( |
| specs, |
| value[str(field_code)], |
| cluster_name, |
| field_type, |
| field_array |
| ) |
| del value[str(field_code)] |
| |
| if specs.is_fabric_scoped(struct): |
| value[_FABRIC_INDEX_FIELD_NAME] = self.run( |
| specs, |
| value[_FABRIC_INDEX_FIELD_CODE], |
| cluster_name, |
| _FABRIC_INDEX_FIELD_TYPE, |
| False) |
| del value[_FABRIC_INDEX_FIELD_CODE] |
| |
| elif isinstance(value, list) and array: |
| value = [self.run(specs, v, cluster_name, typename, False) |
| for v in value] |
| |
| return value |