blob: b011b8817d692c842d82f9d7150c8be3ea21200c [file] [log] [blame]
#
# Copyright (c) 2022 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import asyncio as asyncio
import logging
import queue
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import Enum, IntEnum
import chip.interaction_model
import chip.yaml.format_converter as Converter
import stringcase
from chip import ChipDeviceCtrl
from chip.clusters.Attribute import AttributeStatus, SubscriptionTransaction, TypedAttributePath, ValueDecodeFailure
from chip.yaml.errors import ParsingError, UnexpectedParsingError
from .data_model_lookup import *
logger = logging.getLogger('YamlParser')
class _ActionStatus(Enum):
SUCCESS = 'success',
ERROR = 'error'
class _TestFabricId(IntEnum):
ALPHA = 1,
BETA = 2,
GAMMA = 3
@dataclass
class _ActionResult:
status: _ActionStatus
response: object
@dataclass
class _AttributeSubscriptionCallbackResult:
name: str
attribute_path: TypedAttributePath
result: _ActionResult
@dataclass
class _ExecutionContext:
''' Objects that is commonly passed around this file that are vital to test execution.'''
# Data model lookup to get python attribute, cluster, command object.
data_model_lookup: DataModelLookup = None
# List of subscriptions.
subscriptions: list = field(default_factory=list)
# The key is the attribute/event name, and the value is a queue of subscription callback results
# that been sent by device under test. For attribute subscription the queue is of type
# _AttributeSubscriptionCallbackResult.
subscription_callback_result_queue: dict = field(default_factory=dict)
class BaseAction(ABC):
'''Interface for a single YAML action that is to be executed.'''
def __init__(self, label, identity):
self._label = label
self._identity = identity
@property
def label(self):
return self._label
@property
def identity(self):
return self._identity
@abstractmethod
def run_action(self, dev_ctrl: ChipDeviceCtrl) -> _ActionResult:
pass
class InvokeAction(BaseAction):
'''Single invoke action to be executed.'''
def __init__(self, test_step, cluster: str, context: _ExecutionContext):
'''Converts 'test_step' to invoke command action that can execute with ChipDeviceCtrl.
Args:
'test_step': Step containing information required to run invoke command action.
'cluster': Name of cluster which to invoke action is targeting.
'context': Contains test-wide common objects such as DataModelLookup instance.
Raises:
ParsingError: Raised if there is a benign error, and there is currently no
action to perform for this write attribute.
UnexpectedParsingError: Raised if there is an unexpected parsing error.
'''
super().__init__(test_step.label, test_step.identity)
self._busy_wait_ms = test_step.busy_wait_ms
self._command_name = stringcase.pascalcase(test_step.command)
self._cluster = cluster
self._interation_timeout_ms = test_step.timed_interaction_timeout_ms
self._request_object = None
self._expected_response_object = None
self._endpoint = test_step.endpoint
self._node_id = test_step.node_id
command = context.data_model_lookup.get_command(self._cluster, self._command_name)
if command is None:
raise ParsingError(
f'Failed to find cluster:{self._cluster} Command:{self._command_name}')
command_object = command()
if (test_step.arguments):
args = test_step.arguments['values']
request_data_as_dict = Converter.convert_list_of_name_value_pair_to_dict(args)
try:
request_data = Converter.convert_to_data_model_type(
request_data_as_dict, type(command_object))
except ValueError:
# TODO after allowing out of bounds enums to be written this should be changed to
# UnexpectedParsingError.
raise ParsingError('Could not covert yaml type')
self._request_object = command_object.FromDict(request_data)
else:
self._request_object = command_object
def run_action(self, dev_ctrl: ChipDeviceCtrl) -> _ActionResult:
try:
resp = asyncio.run(dev_ctrl.SendCommand(
self._node_id, self._endpoint, self._request_object,
timedRequestTimeoutMs=self._interation_timeout_ms,
busyWaitMs=self._busy_wait_ms))
except chip.interaction_model.InteractionModelError as error:
return _ActionResult(status=_ActionStatus.ERROR, response=error)
# Commands with no response give a None response. In those cases we return a success
return _ActionResult(status=_ActionStatus.SUCCESS, response=resp)
class ReadAttributeAction(BaseAction):
'''Single read attribute action to be executed.'''
def __init__(self, test_step, cluster: str, context: _ExecutionContext):
'''Converts 'test_step' to read attribute action that can execute with ChipDeviceCtrl.
Args:
'test_step': Step containing information required to run read attribute action.
'cluster': Name of cluster read attribute action is targeting.
'context': Contains test-wide common objects such as DataModelLookup instance.
Raises:
ParsingError: Raised if there is a benign error, and there is currently no
action to perform for this read attribute.
UnexpectedParsingError: Raised if there is an unexpected parsing error.
'''
super().__init__(test_step.label, test_step.identity)
self._attribute_name = stringcase.pascalcase(test_step.attribute)
self._cluster = cluster
self._endpoint = test_step.endpoint
self._node_id = test_step.node_id
self._cluster_object = None
self._request_object = None
self._fabric_filtered = True
if test_step.fabric_filtered is not None:
self._fabric_filtered = test_step.fabric_filtered
self._possibly_unsupported = bool(test_step.optional)
self._cluster_object = context.data_model_lookup.get_cluster(self._cluster)
if self._cluster_object is None:
raise UnexpectedParsingError(
f'ReadAttribute failed to find cluster object:{self._cluster}')
self._request_object = context.data_model_lookup.get_attribute(
self._cluster, self._attribute_name)
if self._request_object is None:
raise ParsingError(
f'ReadAttribute failed to find cluster:{self._cluster} '
f'Attribute:{self._attribute_name}')
if test_step.arguments:
raise UnexpectedParsingError(
f'ReadAttribute should not contain arguments. {self.label}')
if self._request_object.attribute_type is None:
raise UnexpectedParsingError(
f'ReadAttribute doesnt have valid attribute_type. {self.label}')
def run_action(self, dev_ctrl: ChipDeviceCtrl) -> _ActionResult:
try:
raw_resp = asyncio.run(dev_ctrl.ReadAttribute(self._node_id,
[(self._endpoint, self._request_object)],
fabricFiltered=self._fabric_filtered))
except chip.interaction_model.InteractionModelError as error:
return _ActionResult(status=_ActionStatus.ERROR, response=error)
return self.parse_raw_response(raw_resp)
def parse_raw_response(self, raw_resp) -> _ActionResult:
if self._possibly_unsupported and not raw_resp:
# We have found an unsupported attribute. TestStep provided did specify that it might be
# unsupported, so nothing left to validate. We just return a failure here.
return _ActionResult(status=_ActionStatus.ERROR, response=None)
# TODO Currently there are no checks that this indexing won't fail. Need to add some
# initial validity checks. Coming soon in a future PR.
resp = raw_resp[self._endpoint][self._cluster_object][self._request_object]
if isinstance(resp, ValueDecodeFailure):
# response.Reason is of type chip.interaction_model.Status.
return _ActionResult(status=_ActionStatus.ERROR, response=resp.Reason)
# decode() is expecting to get a DataModelLookup Object type to grab certain attributes
# like cluster id.
return_val = self._request_object(resp)
return _ActionResult(status=_ActionStatus.SUCCESS, response=return_val)
class WaitForCommissioneeAction(BaseAction):
''' Wait for commissionee action to be executed.'''
def __init__(self, test_step):
super().__init__(test_step.label, test_step.identity)
self._node_id = test_step.node_id
self._expire_existing_session = False
# This is the default when no timeout is provided.
_DEFAULT_TIMEOUT_MS = 10 * 1000
self._timeout_ms = _DEFAULT_TIMEOUT_MS
if test_step.arguments is None:
# Nothing left for us to do the default values are what we want
return
args = test_step.arguments['values']
request_data_as_dict = Converter.convert_list_of_name_value_pair_to_dict(args)
self._expire_existing_session = request_data_as_dict.get('expireExistingSession', False)
if 'timeout' in request_data_as_dict:
# Timeout is provided in seconds we need to conver to milliseconds.
self._timeout_ms = request_data_as_dict['timeout'] * 1000
def run_action(self, dev_ctrl: ChipDeviceCtrl) -> _ActionResult:
try:
if self._expire_existing_session:
dev_ctrl.ExpireSessions(self._node_id)
dev_ctrl.GetConnectedDeviceSync(self._node_id, timeoutMs=self._timeout_ms)
except TimeoutError:
return _ActionResult(status=_ActionStatus.ERROR, response=None)
return _ActionResult(status=_ActionStatus.SUCCESS, response=None)
class AttributeChangeAccumulator:
def __init__(self, name: str, expected_attribute: Clusters.ClusterAttributeDescriptor,
output_queue: queue.SimpleQueue):
self._name = name
self._expected_attribute = expected_attribute
self._output_queue = output_queue
def __call__(self, path: TypedAttributePath, transaction: SubscriptionTransaction):
if path.AttributeType == self._expected_attribute:
data = transaction.GetAttribute(path)
result = _ActionResult(status=_ActionStatus.SUCCESS, response=path.AttributeType(data))
item = _AttributeSubscriptionCallbackResult(self._name, path, result)
logging.debug(
f'Got subscription report on client {self.name} for {path.AttributeType}: {data}')
self._output_queue.put(item)
@property
def name(self) -> str:
return self._name
class SubscribeAttributeAction(ReadAttributeAction):
'''Single subscribe attribute action to be executed.'''
def __init__(self, test_step, cluster: str, context: _ExecutionContext):
'''Converts 'test_step' to subscribe attribute action that can execute with ChipDeviceCtrl.
Args:
'test_step': Step containing information required to run write attribute action.
'cluster': Name of cluster write attribute action is targeting.
'context': Contains test-wide common objects such as DataModelLookup instance.
Raises:
ParsingError: Raised if there is a benign error, and there is currently no
action to perform for this write attribute.
UnexpectedParsingError: Raised if there is an unexpected parsing error.
'''
super().__init__(test_step, cluster, context)
self._context = context
if test_step.min_interval is None:
raise UnexpectedParsingError(
f'SubscribeAttribute action does not have min_interval {self.label}')
self._min_interval = test_step.min_interval
if test_step.max_interval is None:
raise UnexpectedParsingError(
f'SubscribeAttribute action does not have max_interval {self.label}')
self._max_interval = test_step.max_interval
def run_action(self, dev_ctrl: ChipDeviceCtrl) -> _ActionResult:
try:
subscription = asyncio.run(
dev_ctrl.ReadAttribute(self._node_id, [(self._endpoint, self._request_object)],
reportInterval=(self._min_interval, self._max_interval),
keepSubscriptions=False))
except chip.interaction_model.InteractionModelError as error:
return _ActionResult(status=_ActionStatus.ERROR, response=error)
self._context.subscriptions.append(subscription)
output_queue = self._context.subscription_callback_result_queue.get(self._attribute_name,
None)
if output_queue is None:
output_queue = queue.SimpleQueue()
self._context.subscription_callback_result_queue[self._attribute_name] = output_queue
while not output_queue.empty():
output_queue.get(block=False)
subscription_handler = AttributeChangeAccumulator(self.label, self._request_object,
output_queue)
subscription.SetAttributeUpdateCallback(subscription_handler)
raw_resp = subscription.GetAttributes()
return self.parse_raw_response(raw_resp)
class WriteAttributeAction(BaseAction):
'''Single write attribute action to be executed.'''
def __init__(self, test_step, cluster: str, context: _ExecutionContext):
'''Converts 'test_step' to write attribute action that can execute with ChipDeviceCtrl.
Args:
'test_step': Step containing information required to run write attribute action.
'cluster': Name of cluster write attribute action is targeting.
'context': Contains test-wide common objects such as DataModelLookup instance.
Raises:
ParsingError: Raised if there is a benign error, and there is currently no
action to perform for this write attribute.
UnexpectedParsingError: Raised if there is an unexpected parsing error.
'''
super().__init__(test_step.label, test_step.identity)
self._attribute_name = stringcase.pascalcase(test_step.attribute)
self._busy_wait_ms = test_step.busy_wait_ms
self._cluster = cluster
self._endpoint = test_step.endpoint
self._interation_timeout_ms = test_step.timed_interaction_timeout_ms
self._node_id = test_step.node_id
self._request_object = None
attribute = context.data_model_lookup.get_attribute(
self._cluster, self._attribute_name)
if attribute is None:
raise ParsingError(
f'WriteAttribute failed to find cluster:{self._cluster} '
f'Attribute:{self._attribute_name}')
if not test_step.arguments:
raise UnexpectedParsingError(f'WriteAttribute action does have arguments {self.label}')
args = test_step.arguments['values']
if len(args) != 1:
raise UnexpectedParsingError(f'WriteAttribute is trying to write multiple values')
request_data_as_dict = args[0]
try:
# TODO this is an ugly hack
request_data = Converter.convert_to_data_model_type(
request_data_as_dict['value'], attribute.attribute_type.Type)
except ValueError:
raise ParsingError('Could not covert yaml type')
# Create a cluster object for the request from the provided YAML data.
self._request_object = attribute(request_data)
def run_action(self, dev_ctrl: ChipDeviceCtrl) -> _ActionResult:
try:
resp = asyncio.run(
dev_ctrl.WriteAttribute(self._node_id, [(self._endpoint, self._request_object)],
timedRequestTimeoutMs=self._interation_timeout_ms,
busyWaitMs=self._busy_wait_ms))
except chip.interaction_model.InteractionModelError as error:
return _ActionResult(status=_ActionStatus.ERROR, response=error)
if len(resp) == 1 and isinstance(resp[0], AttributeStatus):
if resp[0].Status == chip.interaction_model.Status.Success:
return _ActionResult(status=_ActionStatus.SUCCESS, response=None)
else:
return _ActionResult(status=_ActionStatus.ERROR, response=resp[0].Status)
# We always expecte the response to be a list of length 1, for that reason we return error
# here.
return _ActionResult(status=_ActionStatus.ERROR, response=None)
class WaitForReportAction(BaseAction):
'''Single WaitForReport action to be executed.'''
def __init__(self, test_step, context: _ExecutionContext):
'''Converts 'test_step' to wait for report action.
Args:
'test_step': Step containing information required to run wait for report action.
'context': Contains test-wide common objects such as DataModelLookup instance.
Raises:
UnexpectedParsingError: Raised if the expected queue does not exist.
'''
super().__init__(test_step.label, test_step.identity)
self._attribute_name = stringcase.pascalcase(test_step.attribute)
self._output_queue = context.subscription_callback_result_queue.get(self._attribute_name,
None)
if self._output_queue is None:
raise UnexpectedParsingError(f'Could not find output queue')
def run_action(self, dev_ctrl: ChipDeviceCtrl) -> _ActionResult:
try:
# While there should be a timeout here provided by the test, the current codegen version
# of YAML tests doesn't have a per test step timeout, only a global timeout for the
# entire test. For that reason we default to a 30 second timeout.
item = self._output_queue.get(block=True, timeout=30)
except queue.Empty:
return _ActionResult(status=_ActionStatus.ERROR, response=None)
return item.result
class CommissionerCommandAction(BaseAction):
'''Single Commissioner Command action to be executed.'''
def __init__(self, test_step):
'''Converts 'test_step' to commissioner command action.
Args:
'test_step': Step containing information required to run wait for report action.
Raises:
UnexpectedParsingError: Raised if the expected queue does not exist.
'''
super().__init__(test_step.label, test_step.identity)
if test_step.command != 'PairWithCode':
raise UnexpectedParsingError(f'Unexpected CommisionerCommand {test_step.command}')
args = test_step.arguments['values']
request_data_as_dict = Converter.convert_list_of_name_value_pair_to_dict(args)
self._setup_payload = request_data_as_dict['payload']
self._node_id = request_data_as_dict['nodeId']
def run_action(self, dev_ctrl: ChipDeviceCtrl) -> _ActionResult:
resp = dev_ctrl.CommissionWithCode(self._setup_payload, self._node_id)
if resp:
return _ActionResult(status=_ActionStatus.SUCCESS, response=None)
else:
return _ActionResult(status=_ActionStatus.ERROR, response=None)
class ReplTestRunner:
'''Test runner to encode/decode values from YAML test Parser for executing the TestStep.
Uses ChipDeviceCtrl from chip-repl to execute parsed YAML TestSteps.
'''
def __init__(self, test_spec_definition, certificate_authority_manager, alpha_dev_ctrl):
self._test_spec_definition = test_spec_definition
self._context = _ExecutionContext(data_model_lookup=PreDefinedDataModelLookup())
self._certificate_authority_manager = certificate_authority_manager
self._dev_ctrls = {}
self._dev_ctrls['alpha'] = alpha_dev_ctrl
def _invoke_action_factory(self, test_step, cluster: str):
'''Creates cluster invoke action command from TestStep.
Args:
'test_step': Step containing information required to run an invoke command action.
'cluster': Name of cluster action is targeting.
Returns:
InvokeAction if 'test_step' is a valid action to be executed.
None if we were unable to use the provided 'test_step' for a known reason that is not
fatal to test execution.
'''
try:
return InvokeAction(test_step, cluster, self._context)
except ParsingError:
return None
def _attribute_read_action_factory(self, test_step, cluster: str):
'''Creates read attribute command TestStep.
Args:
'test_step': Step containing information required to run read attribute action.
'cluster': Name of cluster read attribute action is targeting.
Returns:
ReadAttributeAction if 'test_step' is a valid read attribute to be executed.
None if we were unable to use the provided 'test_step' for a known reason that is not
fatal to test execution.
'''
try:
return ReadAttributeAction(test_step, cluster, self._context)
except ParsingError:
return None
def _attribute_subscribe_action_factory(self, test_step, cluster: str):
'''Creates subscribe attribute command from TestStep provided.
Args:
'test_step': Step containing information required to run subscribe attribute action.
'cluster': Name of cluster write attribute action is targeting.
Returns:
SubscribeAttributeAction if 'test_step' is a valid subscribe attribute to be executed.
None if we were unable to use the provided 'test_step' for a known reason that is not
fatal to test execution.
'''
try:
return SubscribeAttributeAction(test_step, cluster, self._context)
except ParsingError:
# TODO For now, ParsingErrors are largely issues that will be addressed soon. Once this
# runner has matched parity of the codegen YAML test, this exception should be
# propogated.
return None
def _attribute_write_action_factory(self, test_step, cluster: str):
'''Creates write attribute command TestStep.
Args:
'test_step': Step containing information required to run write attribute action.
'cluster': Name of cluster write attribute action is targeting.
Returns:
WriteAttributeAction if 'test_step' is a valid write attribute to be executed.
None if we were unable to use the provided 'test_step' for a known reason that is not
fatal to test execution.
'''
try:
return WriteAttributeAction(test_step, cluster, self._context)
except ParsingError:
return None
def _wait_for_commissionee_action_factory(self, test_step):
try:
return WaitForCommissioneeAction(test_step)
except ParsingError:
# TODO For now, ParsingErrors are largely issues that will be addressed soon. Once this
# runner has matched parity of the codegen YAML test, this exception should be
# propogated.
return None
def _wait_for_report_action_factory(self, test_step):
try:
return WaitForReportAction(test_step, self._context)
except ParsingError:
# TODO For now, ParsingErrors are largely issues that will be addressed soon. Once this
# runner has matched parity of the codegen YAML test, this exception should be
# propogated.
return None
def _commissioner_command_action_factory(self, test_step):
try:
return CommissionerCommandAction(test_step)
except ParsingError:
return None
def encode(self, request) -> BaseAction:
action = None
cluster = request.cluster.replace(' ', '').replace('/', '')
command = request.command
if cluster == 'CommissionerCommands':
return self._commissioner_command_action_factory(request)
# Some of the tests contain 'cluster over-rides' that refer to a different
# cluster than that specified in 'config'.
if cluster == 'DelayCommands' and command == 'WaitForCommissionee':
action = self._wait_for_commissionee_action_factory(request)
elif command == 'writeAttribute':
action = self._attribute_write_action_factory(request, cluster)
elif command == 'readAttribute':
action = self._attribute_read_action_factory(request, cluster)
elif command == 'readEvent':
# TODO need to implement _event_read_action_factory
# action = self._event_read_action_factory(request, cluster)
pass
elif command == 'subscribeAttribute':
action = self._attribute_subscribe_action_factory(request, cluster)
elif command == 'waitForReport':
action = self._wait_for_report_action_factory(request)
else:
action = self._invoke_action_factory(request, cluster)
if action is None:
logger.warn(f"Failed to parse {request.label}")
return action
def decode(self, result: _ActionResult):
# If this is a generic response, there is nothing to do.
if result.response is None:
# TODO Once yamltest and idl python packages are properly packaged as a single module
# the type we are returning will be formalized. For now TestStep.post_process_response
# expects this particular case to be sent as a string.
return 'success' if result.status == _ActionStatus.SUCCESS else 'failure'
response = result.response
decoded_response = {}
if isinstance(response, chip.interaction_model.InteractionModelError):
decoded_response['error'] = stringcase.snakecase(response.status.name).upper()
return decoded_response
if isinstance(response, chip.interaction_model.Status):
decoded_response['error'] = stringcase.snakecase(response.name).upper()
return decoded_response
cluster_name = self._test_spec_definition.get_cluster_name(response.cluster_id)
decoded_response['clusterId'] = cluster_name
if hasattr(response, 'command_id'):
decoded_response['command'] = self._test_spec_definition.get_response_name(
response.cluster_id, response.command_id)
response_definition = self._test_spec_definition.get_response_by_name(
cluster_name, decoded_response['command'])
decoded_response['value'] = Converter.from_data_model_to_test_definition(
self._test_spec_definition, cluster_name, response_definition.fields, response)
if hasattr(response, 'attribute_id'):
decoded_response['attribute'] = self._test_spec_definition.get_attribute_name(
response.cluster_id, response.attribute_id)
attribute = self._test_spec_definition.get_attribute_by_name(
cluster_name, decoded_response['attribute'])
# TODO Once we fix the issue of not being able to find the global attribute properly
# we should be able to remove this if/else statement below.
if attribute is None:
# When we cannot find the attribute it is because it is a global attribute like
# FeatureMap. Fortunately for these types we can get away with using
# 'response.value' directly if it is a list and mapping to int if not a list.
if isinstance(response.value, list):
decoded_response['value'] = response.value
else:
decoded_response['value'] = Converter.from_data_model_to_test_definition(
self._test_spec_definition, cluster_name, int, response.value)
else:
decoded_response['value'] = Converter.from_data_model_to_test_definition(
self._test_spec_definition, cluster_name, attribute.definition, response.value)
return decoded_response
def _get_fabric_id(self, id):
return _TestFabricId[id.upper()].value
def _get_dev_ctrl(self, action: BaseAction):
if action.identity is not None:
dev_ctrl = self._dev_ctrls.get(action.identity, None)
if dev_ctrl is None:
fabric_id = self._get_fabric_id(action.identity)
certificate_authority = self._certificate_authority_manager.activeCaList[0]
fabric = None
for existing_admin in certificate_authority.adminList:
if existing_admin.fabricId == fabric_id:
fabric = existing_admin
if fabric is None:
fabric = certificate_authority.NewFabricAdmin(vendorId=0xFFF1,
fabricId=fabric_id)
dev_ctrl = fabric.NewController()
self._dev_ctrls[action.identity] = dev_ctrl
else:
dev_ctrl = self._dev_ctrls['alpha']
return dev_ctrl
def execute(self, action: BaseAction):
dev_ctrl = self._get_dev_ctrl(action)
return action.run_action(dev_ctrl)
def shutdown(self):
for subscription in self._context.subscriptions:
subscription.Shutdown()