blob: b3559a1e431956e6b7522efbfc58b7710e28f64c [file] [log] [blame]
#
# Copyright (c) 2022 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import asyncio as asyncio
import logging
import queue
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import Enum, IntEnum
import chip.interaction_model
import chip.yaml.format_converter as Converter
import stringcase
from chip.ChipDeviceCtrl import ChipDeviceController, discovery
from chip.clusters.Attribute import (AttributeStatus, EventReadResult, SubscriptionTransaction, TypedAttributePath,
ValueDecodeFailure)
from chip.exceptions import ChipStackError
from chip.yaml.errors import ParsingError, UnexpectedParsingError
from matter_yamltests.pseudo_clusters.clusters.delay_commands import DelayCommands
from matter_yamltests.pseudo_clusters.clusters.log_commands import LogCommands
from matter_yamltests.pseudo_clusters.clusters.system_commands import SystemCommands
from matter_yamltests.pseudo_clusters.pseudo_clusters import PseudoClusters
from .data_model_lookup import *
_PSEUDO_CLUSTERS = PseudoClusters([DelayCommands(), LogCommands(), SystemCommands()])
logger = logging.getLogger('YamlParser')
class _ActionStatus(Enum):
SUCCESS = 'success',
ERROR = 'error'
class _TestFabricId(IntEnum):
ALPHA = 1,
BETA = 2,
GAMMA = 3
@dataclass
class _GetCommissionerNodeIdResult:
node_id: int
@dataclass
class EventResponse:
event_result_list: list[EventReadResult]
@dataclass
class _ActionResult:
status: _ActionStatus
response: object
@dataclass
class _AttributeSubscriptionCallbackResult:
name: str
attribute_path: TypedAttributePath
result: _ActionResult
@dataclass
class _EventSubscriptionCallbackResult:
name: str
result: _ActionResult
@dataclass
class _ExecutionContext:
''' Objects that is commonly passed around this file that are vital to test execution.'''
# Data model lookup to get python attribute, cluster, command object.
data_model_lookup: DataModelLookup = None
# List of subscriptions.
subscriptions: list = field(default_factory=list)
# The key is the attribute/event name, and the value is a queue of subscription callback results
# that been sent by device under test. For attribute subscription the queue is of type
# _AttributeSubscriptionCallbackResult, for event the queue is of type
# _EventSubscriptionCallbackResult.
subscription_callback_result_queue: dict = field(default_factory=dict)
class BaseAction(ABC):
'''Interface for a single YAML action that is to be executed.'''
def __init__(self, test_step):
self._label = test_step.label
self._identity = test_step.identity
self._pics_enabled = test_step.is_pics_enabled
@property
def label(self):
return self._label
@property
def identity(self):
return self._identity
@property
def pics_enabled(self):
return self._pics_enabled
@abstractmethod
def run_action(self, dev_ctrl: ChipDeviceController) -> _ActionResult:
pass
class DefaultPseudoCluster(BaseAction):
def __init__(self, test_step):
super().__init__(test_step)
self._test_step = test_step
if not _PSEUDO_CLUSTERS.supports(test_step):
raise ParsingError(f'Default cluster {test_step.cluster} {test_step.command}, not supported')
def run_action(self, dev_ctrl: ChipDeviceController) -> _ActionResult:
resp = asyncio.run(_PSEUDO_CLUSTERS.execute(self._test_step))
return _ActionResult(status=_ActionStatus.SUCCESS, response=None)
class InvokeAction(BaseAction):
'''Single invoke action to be executed.'''
def __init__(self, test_step, cluster: str, context: _ExecutionContext):
'''Converts 'test_step' to invoke command action that can execute with ChipDeviceController.
Args:
'test_step': Step containing information required to run invoke command action.
'cluster': Name of cluster which to invoke action is targeting.
'context': Contains test-wide common objects such as DataModelLookup instance.
Raises:
ParsingError: Raised if there is a benign error, and there is currently no
action to perform for this write attribute.
UnexpectedParsingError: Raised if there is an unexpected parsing error.
'''
super().__init__(test_step)
self._busy_wait_ms = test_step.busy_wait_ms
self._command_name = stringcase.pascalcase(test_step.command)
self._cluster = cluster
self._interation_timeout_ms = test_step.timed_interaction_timeout_ms
self._request_object = None
self._expected_response_object = None
self._endpoint = test_step.endpoint
self._node_id = test_step.node_id
command = context.data_model_lookup.get_command(self._cluster, self._command_name)
if command is None:
raise ParsingError(
f'Failed to find cluster:{self._cluster} Command:{self._command_name}')
command_object = command()
if (test_step.arguments):
args = test_step.arguments['values']
request_data_as_dict = Converter.convert_list_of_name_value_pair_to_dict(args)
try:
request_data = Converter.convert_to_data_model_type(
request_data_as_dict, type(command_object))
except ValueError:
# TODO after allowing out of bounds enums to be written this should be changed to
# UnexpectedParsingError.
raise ParsingError('Could not covert yaml type')
self._request_object = command_object.FromDict(request_data)
else:
self._request_object = command_object
def run_action(self, dev_ctrl: ChipDeviceController) -> _ActionResult:
try:
resp = asyncio.run(dev_ctrl.SendCommand(
self._node_id, self._endpoint, self._request_object,
timedRequestTimeoutMs=self._interation_timeout_ms,
busyWaitMs=self._busy_wait_ms))
except chip.interaction_model.InteractionModelError as error:
return _ActionResult(status=_ActionStatus.ERROR, response=error)
# Commands with no response give a None response. In those cases we return a success
return _ActionResult(status=_ActionStatus.SUCCESS, response=resp)
class ReadAttributeAction(BaseAction):
'''Single read attribute action to be executed.'''
def __init__(self, test_step, cluster: str, context: _ExecutionContext):
'''Converts 'test_step' to read attribute action that can execute with ChipDeviceController.
Args:
'test_step': Step containing information required to run read attribute action.
'cluster': Name of cluster read attribute action is targeting.
'context': Contains test-wide common objects such as DataModelLookup instance.
Raises:
ParsingError: Raised if there is a benign error, and there is currently no
action to perform for this read attribute.
UnexpectedParsingError: Raised if there is an unexpected parsing error.
'''
super().__init__(test_step)
self._attribute_name = stringcase.pascalcase(test_step.attribute)
self._cluster = cluster
self._endpoint = test_step.endpoint
self._node_id = test_step.node_id
self._cluster_object = None
self._request_object = None
self._fabric_filtered = True
if test_step.fabric_filtered is not None:
self._fabric_filtered = test_step.fabric_filtered
self._cluster_object = context.data_model_lookup.get_cluster(self._cluster)
if self._cluster_object is None:
raise UnexpectedParsingError(
f'ReadAttribute failed to find cluster object:{self._cluster}')
self._request_object = context.data_model_lookup.get_attribute(
self._cluster, self._attribute_name)
if self._request_object is None:
raise ParsingError(
f'ReadAttribute failed to find cluster:{self._cluster} '
f'Attribute:{self._attribute_name}')
if test_step.arguments:
raise UnexpectedParsingError(
f'ReadAttribute should not contain arguments. {self.label}')
if self._request_object.attribute_type is None:
raise UnexpectedParsingError(
f'ReadAttribute doesnt have valid attribute_type. {self.label}')
def run_action(self, dev_ctrl: ChipDeviceController) -> _ActionResult:
try:
raw_resp = asyncio.run(dev_ctrl.ReadAttribute(self._node_id,
[(self._endpoint, self._request_object)],
fabricFiltered=self._fabric_filtered))
except chip.interaction_model.InteractionModelError as error:
return _ActionResult(status=_ActionStatus.ERROR, response=error)
except ChipStackError as error:
_CHIP_TIMEOUT_ERROR = 50
if error.err == _CHIP_TIMEOUT_ERROR:
return _ActionResult(status=_ActionStatus.ERROR, response=error)
# For now it is unsure if all ChipStackError are supposed to be intentional.
# As a result we simply re-raise the error.
raise error
return self.parse_raw_response(raw_resp)
def parse_raw_response(self, raw_resp) -> _ActionResult:
# TODO Currently there are no checks that this indexing won't fail. Need to add some
# initial validity checks. Coming soon in a future PR.
resp = raw_resp[self._endpoint][self._cluster_object][self._request_object]
if isinstance(resp, ValueDecodeFailure):
# response.Reason is of type chip.interaction_model.Status.
return _ActionResult(status=_ActionStatus.ERROR, response=resp.Reason)
# decode() is expecting to get a DataModelLookup Object type to grab certain attributes
# like cluster id.
return_val = self._request_object(resp)
return _ActionResult(status=_ActionStatus.SUCCESS, response=return_val)
class ReadEventAction(BaseAction):
''' Read Event action to be executed.'''
def __init__(self, test_step, cluster: str, context: _ExecutionContext):
'''Converts 'test_step' to read event action that can execute with ChipDeviceController.
Args:
'test_step': Step containing information required to run read event action.
'cluster': Name of cluster read event action is targeting.
'context': Contains test-wide common objects such as DataModelLookup instance.
Raises:
UnexpectedParsingError: Raised if there is an unexpected parsing error.
'''
super().__init__(test_step)
self._event_name = stringcase.pascalcase(test_step.event)
self._cluster = cluster
self._endpoint = test_step.endpoint
self._node_id = test_step.node_id
self._cluster_object = None
self._request_object = None
self._event_number_filter = test_step.event_number
self._fabric_filtered = False
if test_step.fabric_filtered is not None:
self._fabric_filtered = test_step.fabric_filtered
self._request_object = context.data_model_lookup.get_event(self._cluster,
self._event_name)
if self._request_object is None:
raise UnexpectedParsingError(
f'ReadEvent failed to find cluster:{self._cluster} Event:{self._event_name}')
if test_step.arguments:
raise UnexpectedParsingError(
f'ReadEvent should not contain arguments. {self.label}')
def run_action(self, dev_ctrl: ChipDeviceController) -> _ActionResult:
try:
urgent = 0
request = [(self._endpoint, self._request_object, urgent)]
resp = asyncio.run(dev_ctrl.ReadEvent(self._node_id, events=request, eventNumberFilter=self._event_number_filter,
fabricFiltered=self._fabric_filtered))
except chip.interaction_model.InteractionModelError as error:
return _ActionResult(status=_ActionStatus.ERROR, response=error)
parsed_resp = EventResponse(event_result_list=resp)
return _ActionResult(status=_ActionStatus.SUCCESS, response=parsed_resp)
class WaitForCommissioneeAction(BaseAction):
''' Wait for commissionee action to be executed.'''
def __init__(self, test_step):
super().__init__(test_step)
self._expire_existing_session = False
# This is the default when no timeout is provided.
_DEFAULT_TIMEOUT_MS = 10 * 1000
self._timeout_ms = _DEFAULT_TIMEOUT_MS
if test_step.arguments is None:
# Nothing left for us to do the default values are what we want
return
args = test_step.arguments['values']
request_data_as_dict = Converter.convert_list_of_name_value_pair_to_dict(args)
# There's a chance the commissionee may have rebooted before this call here as part of a
# test flow or is just starting out fresh outright. Unless expireExistingSession is
# explicitly set, the default behaviour it to make sure we're not re-using any cached CASE
# sessions that will now be stale and mismatched with the peer, causing subsequent
# interactions to fail.
self._expire_existing_session = request_data_as_dict.get('expireExistingSession', True)
self._node_id = request_data_as_dict['nodeId']
if 'timeout' in request_data_as_dict:
# Timeout is provided in seconds we need to conver to milliseconds.
self._timeout_ms = request_data_as_dict['timeout'] * 1000
def run_action(self, dev_ctrl: ChipDeviceController) -> _ActionResult:
try:
if self._expire_existing_session:
dev_ctrl.ExpireSessions(self._node_id)
dev_ctrl.GetConnectedDeviceSync(self._node_id, timeoutMs=self._timeout_ms)
except TimeoutError:
return _ActionResult(status=_ActionStatus.ERROR, response=None)
return _ActionResult(status=_ActionStatus.SUCCESS, response=None)
class AttributeChangeAccumulator:
def __init__(self, name: str, expected_attribute: Clusters.ClusterAttributeDescriptor,
output_queue: queue.SimpleQueue):
self._name = name
self._expected_attribute = expected_attribute
self._output_queue = output_queue
def __call__(self, path: TypedAttributePath, transaction: SubscriptionTransaction):
if path.AttributeType == self._expected_attribute:
data = transaction.GetAttribute(path)
result = _ActionResult(status=_ActionStatus.SUCCESS, response=path.AttributeType(data))
item = _AttributeSubscriptionCallbackResult(self._name, path, result)
logging.debug(
f'Got subscription report on client {self.name} for {path.AttributeType}: {data}')
self._output_queue.put(item)
@property
def name(self) -> str:
return self._name
class EventChangeAccumulator:
def __init__(self, name: str, expected_event, output_queue: queue.SimpleQueue):
self._name = name
self._expected_event = expected_event
self._output_queue = output_queue
def __call__(self, event_result: EventReadResult, transaction: SubscriptionTransaction):
if (self._expected_event.cluster_id == event_result.Header.ClusterId and
self._expected_event.event_id == event_result.Header.EventId):
event_response = EventResponse(event_result_list=[event_result])
result = _ActionResult(status=_ActionStatus.SUCCESS, response=event_response)
item = _EventSubscriptionCallbackResult(self._name, result)
logging.debug(f'Got subscription report on client {self.name}')
self._output_queue.put(item)
@property
def name(self) -> str:
return self._name
class SubscribeAttributeAction(ReadAttributeAction):
'''Single subscribe attribute action to be executed.'''
def __init__(self, test_step, cluster: str, context: _ExecutionContext):
'''Converts 'test_step' to subscribe attribute action that can execute with ChipDeviceController.
Args:
'test_step': Step containing information required to run write attribute action.
'cluster': Name of cluster write attribute action is targeting.
'context': Contains test-wide common objects such as DataModelLookup instance.
Raises:
ParsingError: Raised if there is a benign error, and there is currently no
action to perform for this write attribute.
UnexpectedParsingError: Raised if there is an unexpected parsing error.
'''
super().__init__(test_step, cluster, context)
self._context = context
if test_step.min_interval is None:
raise UnexpectedParsingError(
f'SubscribeAttribute action does not have min_interval {self.label}')
self._min_interval = test_step.min_interval
if test_step.max_interval is None:
raise UnexpectedParsingError(
f'SubscribeAttribute action does not have max_interval {self.label}')
self._max_interval = test_step.max_interval
def run_action(self, dev_ctrl: ChipDeviceController) -> _ActionResult:
try:
subscription = asyncio.run(
dev_ctrl.ReadAttribute(self._node_id, [(self._endpoint, self._request_object)],
reportInterval=(self._min_interval, self._max_interval),
keepSubscriptions=False))
except chip.interaction_model.InteractionModelError as error:
return _ActionResult(status=_ActionStatus.ERROR, response=error)
self._context.subscriptions.append(subscription)
output_queue = self._context.subscription_callback_result_queue.get(self._attribute_name,
None)
if output_queue is None:
output_queue = queue.SimpleQueue()
self._context.subscription_callback_result_queue[self._attribute_name] = output_queue
while not output_queue.empty():
output_queue.get(block=False)
subscription_handler = AttributeChangeAccumulator(self.label, self._request_object,
output_queue)
subscription.SetAttributeUpdateCallback(subscription_handler)
raw_resp = subscription.GetAttributes()
return self.parse_raw_response(raw_resp)
class SubscribeEventAction(ReadEventAction):
'''Single subscribe event action to be executed.'''
def __init__(self, test_step, cluster: str, context: _ExecutionContext):
'''Converts 'test_step' to subscribe event action that can execute with ChipDeviceController.
Args:
'test_step': Step containing information required to run subscribe event action.
'cluster': Name of cluster subscribe event action is targeting.
'context': Contains test-wide common objects such as DataModelLookup instance.
Raises:
ParsingError: Raised if there is a benign error, and there is currently no
action to perform for this subscribe event.
UnexpectedParsingError: Raised if there is an unexpected parsing error.
'''
super().__init__(test_step, cluster, context)
self._context = context
if test_step.min_interval is None:
raise UnexpectedParsingError(
f'SubscribeEvent action does not have min_interval {self.label}')
self._min_interval = test_step.min_interval
if test_step.max_interval is None:
raise UnexpectedParsingError(
f'SubscribeEvent action does not have max_interval {self.label}')
self._max_interval = test_step.max_interval
def run_action(self, dev_ctrl: ChipDeviceController) -> _ActionResult:
try:
urgent = 0
request = [(self._endpoint, self._request_object, urgent)]
subscription = asyncio.run(
dev_ctrl.ReadEvent(self._node_id, events=request, eventNumberFilter=self._event_number_filter,
reportInterval=(self._min_interval, self._max_interval),
keepSubscriptions=False))
except chip.interaction_model.InteractionModelError as error:
return _ActionResult(status=_ActionStatus.ERROR, response=error)
self._context.subscriptions.append(subscription)
output_queue = self._context.subscription_callback_result_queue.get(self._event_name,
None)
if output_queue is None:
output_queue = queue.SimpleQueue()
self._context.subscription_callback_result_queue[self._event_name] = output_queue
while not output_queue.empty():
output_queue.get(block=False)
subscription_handler = EventChangeAccumulator(self.label, self._request_object, output_queue)
subscription.SetEventUpdateCallback(subscription_handler)
events = subscription.GetEvents()
response = EventResponse(event_result_list=events)
return _ActionResult(status=_ActionStatus.SUCCESS, response=response)
class WriteAttributeAction(BaseAction):
'''Single write attribute action to be executed.'''
def __init__(self, test_step, cluster: str, context: _ExecutionContext):
'''Converts 'test_step' to write attribute action that can execute with ChipDeviceController.
Args:
'test_step': Step containing information required to run write attribute action.
'cluster': Name of cluster write attribute action is targeting.
'context': Contains test-wide common objects such as DataModelLookup instance.
Raises:
ParsingError: Raised if there is a benign error, and there is currently no
action to perform for this write attribute.
UnexpectedParsingError: Raised if there is an unexpected parsing error.
'''
super().__init__(test_step)
self._attribute_name = stringcase.pascalcase(test_step.attribute)
self._busy_wait_ms = test_step.busy_wait_ms
self._cluster = cluster
self._endpoint = test_step.endpoint
self._interation_timeout_ms = test_step.timed_interaction_timeout_ms
self._node_id = test_step.node_id
self._request_object = None
attribute = context.data_model_lookup.get_attribute(
self._cluster, self._attribute_name)
if attribute is None:
raise ParsingError(
f'WriteAttribute failed to find cluster:{self._cluster} '
f'Attribute:{self._attribute_name}')
if not test_step.arguments:
raise UnexpectedParsingError(f'WriteAttribute action does have arguments {self.label}')
args = test_step.arguments['values']
if len(args) != 1:
raise UnexpectedParsingError(f'WriteAttribute is trying to write multiple values')
request_data_as_dict = args[0]
try:
# TODO this is an ugly hack
request_data = Converter.convert_to_data_model_type(
request_data_as_dict['value'], attribute.attribute_type.Type)
except ValueError:
raise ParsingError('Could not covert yaml type')
# Create a cluster object for the request from the provided YAML data.
self._request_object = attribute(request_data)
def run_action(self, dev_ctrl: ChipDeviceController) -> _ActionResult:
try:
resp = asyncio.run(
dev_ctrl.WriteAttribute(self._node_id, [(self._endpoint, self._request_object)],
timedRequestTimeoutMs=self._interation_timeout_ms,
busyWaitMs=self._busy_wait_ms))
except chip.interaction_model.InteractionModelError as error:
return _ActionResult(status=_ActionStatus.ERROR, response=error)
if len(resp) == 1 and isinstance(resp[0], AttributeStatus):
if resp[0].Status == chip.interaction_model.Status.Success:
return _ActionResult(status=_ActionStatus.SUCCESS, response=None)
else:
return _ActionResult(status=_ActionStatus.ERROR, response=resp[0].Status)
# We always expecte the response to be a list of length 1, for that reason we return error
# here.
return _ActionResult(status=_ActionStatus.ERROR, response=None)
class WaitForReportAction(BaseAction):
'''Single WaitForReport action to be executed.'''
def __init__(self, test_step, context: _ExecutionContext):
'''Converts 'test_step' to wait for report action.
Args:
'test_step': Step containing information required to run wait for report action.
'context': Contains test-wide common objects such as DataModelLookup instance.
Raises:
UnexpectedParsingError: Raised if the expected queue does not exist.
'''
super().__init__(test_step)
if test_step.attribute is not None:
queue_name = stringcase.pascalcase(test_step.attribute)
elif test_step.event is not None:
queue_name = stringcase.pascalcase(test_step.event)
else:
raise UnexpectedParsingError(
f'WaitForReport needs to wait on either attribute or event, neither were provided')
self._output_queue = context.subscription_callback_result_queue.get(queue_name, None)
if self._output_queue is None:
raise UnexpectedParsingError(f'Could not find output queue')
def run_action(self, dev_ctrl: ChipDeviceController) -> _ActionResult:
try:
# While there should be a timeout here provided by the test, the current codegen version
# of YAML tests doesn't have a per test step timeout, only a global timeout for the
# entire test. For that reason we default to a 30 second timeout.
item = self._output_queue.get(block=True, timeout=30)
except queue.Empty:
return _ActionResult(status=_ActionStatus.ERROR, response=None)
if isinstance(item, _AttributeSubscriptionCallbackResult):
return item.result
return item.result
class CommissionerCommandAction(BaseAction):
'''Single Commissioner Command action to be executed.'''
def __init__(self, test_step):
'''Converts 'test_step' to commissioner command action.
Args:
'test_step': Step containing information required to run wait for report action.
Raises:
UnexpectedParsingError: Raised if the expected queue does not exist.
'''
super().__init__(test_step)
self._command = test_step.command
if test_step.command == 'GetCommissionerNodeId':
# Just setting the self._command is enough for run_action below.
pass
elif test_step.command == 'PairWithCode':
args = test_step.arguments['values']
request_data_as_dict = Converter.convert_list_of_name_value_pair_to_dict(args)
self._setup_payload = request_data_as_dict['payload']
self._node_id = request_data_as_dict['nodeId']
else:
raise UnexpectedParsingError(f'Unexpected CommisionerCommand {test_step.command}')
def run_action(self, dev_ctrl: ChipDeviceController) -> _ActionResult:
if self._command == 'GetCommissionerNodeId':
return _ActionResult(status=_ActionStatus.SUCCESS, response=_GetCommissionerNodeIdResult(dev_ctrl.nodeId))
resp = dev_ctrl.CommissionWithCode(self._setup_payload, self._node_id)
if resp:
return _ActionResult(status=_ActionStatus.SUCCESS, response=None)
else:
return _ActionResult(status=_ActionStatus.ERROR, response=None)
class DiscoveryCommandAction(BaseAction):
"""DiscoveryCommand implementation (FindCommissionable* methods)."""
@staticmethod
def _filter_for_step(test_step) -> (discovery.FilterType, any):
"""Given a test step, figure out the correct filters to give to
DiscoverCommissionableNodes.
"""
if test_step.command == 'FindCommissionable':
return discovery.FilterType.NONE, None
if test_step.command == 'FindCommissionableByCommissioningMode':
# this is just a "_CM" subtype
return discovery.FilterType.COMMISSIONING_MODE, None
# all the items below require a "value" to use for filtering
args = test_step.arguments['values']
request_data_as_dict = Converter.convert_list_of_name_value_pair_to_dict(args)
filter = request_data_as_dict['value']
if test_step.command == 'FindCommissionableByDeviceType':
return discovery.FilterType.DEVICE_TYPE, filter
if test_step.command == 'FindCommissionableByLongDiscriminator':
return discovery.FilterType.LONG_DISCRIMINATOR, filter
if test_step.command == 'FindCommissionableByShortDiscriminator':
return discovery.FilterType.SHORT_DISCRIMINATOR, filter
if test_step.command == 'FindCommissionableByVendorId':
return discovery.FilterType.VENDOR_ID, filter
raise UnexpectedParsingError(f'Invalid command: {test_step.command}')
def __init__(self, test_step):
super().__init__(test_step)
self.filterType, self.filter = DiscoveryCommandAction._filter_for_step(test_step)
def run_action(self, dev_ctrl: ChipDeviceController) -> _ActionResult:
devices = dev_ctrl.DiscoverCommissionableNodes(
filterType=self.filterType, filter=self.filter, stopOnFirst=True, timeoutSecond=5)
# Devices will be a list: [CommissionableNode(), ...]
logging.info("Discovered devices: %r" % devices)
if not devices:
logging.error("No devices found")
return _ActionResult(status=_ActionStatus.ERROR, response="NO DEVICES FOUND")
elif len(devices) > 1:
logging.warning("Commissionable discovery found multiple results!")
return _ActionResult(status=_ActionStatus.SUCCESS, response=devices[0])
class NotImplementedAction(BaseAction):
"""Raises a "NOT YET IMPLEMENTED" exception when run."""
def __init__(self, test_step, cluster, command):
super().__init__(test_step)
self.cluster = cluster
self.command = command
def run_action(self, dev_ctrl: ChipDeviceController) -> _ActionResult:
raise Exception(f"NOT YET IMPLEMENTED: {self.cluster}::{self.command}")
class ReplTestRunner:
'''Test runner to encode/decode values from YAML test Parser for executing the TestStep.
Uses ChipDeviceController from chip-repl to execute parsed YAML TestSteps.
'''
def __init__(self, test_spec_definition, certificate_authority_manager, alpha_dev_ctrl):
self._test_spec_definition = test_spec_definition
self._context = _ExecutionContext(data_model_lookup=PreDefinedDataModelLookup())
self._certificate_authority_manager = certificate_authority_manager
self._dev_ctrls = {}
self._dev_ctrls['alpha'] = alpha_dev_ctrl
def _invoke_action_factory(self, test_step, cluster: str):
'''Creates cluster invoke action command from TestStep.
Args:
'test_step': Step containing information required to run an invoke command action.
'cluster': Name of cluster action is targeting.
Returns:
InvokeAction if 'test_step' is a valid action to be executed.
None if we were unable to use the provided 'test_step' for a known reason that is not
fatal to test execution.
'''
try:
return InvokeAction(test_step, cluster, self._context)
except ParsingError:
return None
def _attribute_read_action_factory(self, test_step, cluster: str):
'''Creates read attribute command TestStep.
Args:
'test_step': Step containing information required to run read attribute action.
'cluster': Name of cluster read attribute action is targeting.
Returns:
ReadAttributeAction if 'test_step' is a valid read attribute to be executed.
'''
try:
return ReadAttributeAction(test_step, cluster, self._context)
except ParsingError:
return None
def _event_read_action_factory(self, test_step, cluster: str):
return ReadEventAction(test_step, cluster, self._context)
def _attribute_subscribe_action_factory(self, test_step, cluster: str):
'''Creates subscribe attribute command from TestStep provided.
Args:
'test_step': Step containing information required to run subscribe attribute action.
'cluster': Name of cluster write attribute action is targeting.
Returns:
SubscribeAttributeAction if 'test_step' is a valid subscribe attribute to be executed.
None if we were unable to use the provided 'test_step' for a known reason that is not
fatal to test execution.
'''
try:
return SubscribeAttributeAction(test_step, cluster, self._context)
except ParsingError:
# TODO For now, ParsingErrors are largely issues that will be addressed soon. Once this
# runner has matched parity of the codegen YAML test, this exception should be
# propogated.
return None
def _attribute_subscribe_event_factory(self, test_step, cluster: str):
'''Creates subscribe event command from TestStep provided.
Args:
'test_step': Step containing information required to run subscribe attribute action.
'cluster': Name of cluster write attribute action is targeting.
Returns:
SubscribeEventAction if 'test_step' is a valid subscribe attribute to be executed.
'''
return SubscribeEventAction(test_step, cluster, self._context)
def _attribute_write_action_factory(self, test_step, cluster: str):
'''Creates write attribute command TestStep.
Args:
'test_step': Step containing information required to run write attribute action.
'cluster': Name of cluster write attribute action is targeting.
Returns:
WriteAttributeAction if 'test_step' is a valid write attribute to be executed.
None if we were unable to use the provided 'test_step' for a known reason that is not
fatal to test execution.
'''
try:
return WriteAttributeAction(test_step, cluster, self._context)
except ParsingError:
return None
def _wait_for_commissionee_action_factory(self, test_step):
try:
return WaitForCommissioneeAction(test_step)
except ParsingError:
# TODO For now, ParsingErrors are largely issues that will be addressed soon. Once this
# runner has matched parity of the codegen YAML test, this exception should be
# propogated.
return None
def _wait_for_report_action_factory(self, test_step):
try:
return WaitForReportAction(test_step, self._context)
except ParsingError:
# TODO For now, ParsingErrors are largely issues that will be addressed soon. Once this
# runner has matched parity of the codegen YAML test, this exception should be
# propogated.
return None
def _commissioner_command_action_factory(self, test_step):
try:
return CommissionerCommandAction(test_step)
except ParsingError:
return None
def _default_pseudo_cluster(self, test_step):
try:
return DefaultPseudoCluster(test_step)
except ParsingError:
return None
def encode(self, request) -> BaseAction:
action = None
cluster = request.cluster.replace(' ', '').replace('/', '')
command = request.command
if cluster == 'CommissionerCommands':
return self._commissioner_command_action_factory(request)
# Some of the tests contain 'cluster over-rides' that refer to a different
# cluster than that specified in 'config'.
elif cluster == 'DiscoveryCommands':
return DiscoveryCommandAction(request)
elif cluster == 'DelayCommands' and command == 'WaitForCommissionee':
action = self._wait_for_commissionee_action_factory(request)
elif command == 'writeAttribute':
action = self._attribute_write_action_factory(request, cluster)
elif command == 'readAttribute':
action = self._attribute_read_action_factory(request, cluster)
elif command == 'readEvent':
action = self._event_read_action_factory(request, cluster)
elif command == 'subscribeAttribute':
action = self._attribute_subscribe_action_factory(request, cluster)
elif command == 'subscribeEvent':
action = self._attribute_subscribe_event_factory(request, cluster)
elif command == 'waitForReport':
action = self._wait_for_report_action_factory(request)
else:
action = self._invoke_action_factory(request, cluster)
if action is None:
# Now we try to create a default pseudo cluster.
action = self._default_pseudo_cluster(request)
if action is None:
logger.warn(f"Failed to parse {request.label}")
return action
def decode(self, result: _ActionResult):
# If this is a generic response, there is nothing to do.
if result.response is None:
# TODO Once yamltest and idl python packages are properly packaged as a single module
# the type we are returning will be formalized. For now TestStep.post_process_response
# expects this particular case to be sent an empty dict or a dict with an error.
return {} if result.status == _ActionStatus.SUCCESS else {'error': 'FAILURE'}
response = result.response
decoded_response = {}
if isinstance(response, chip.interaction_model.InteractionModelError):
decoded_response['error'] = stringcase.snakecase(response.status.name).upper()
return decoded_response
if isinstance(response, chip.interaction_model.Status):
decoded_response['error'] = stringcase.snakecase(response.name).upper()
return decoded_response
if isinstance(response, _GetCommissionerNodeIdResult):
decoded_response['value'] = {'nodeId': response.node_id}
return decoded_response
if isinstance(response, chip.discovery.CommissionableNode):
decoded_response['value'] = {
'instanceName': response.instanceName,
'hostName': response.hostName,
'port': response.port,
'longDiscriminator': response.longDiscriminator,
'vendorId': response.vendorId,
'productId': response.productId,
'commissioningMode': response.commissioningMode,
'deviceType': response.deviceType,
'deviceName': response.deviceName,
'pairingInstruction': response.pairingInstruction,
'pairingHint': response.pairingHint,
'mrpRetryIntervalIdle': response.mrpRetryIntervalIdle,
'mrpRetryIntervalActive': response.mrpRetryIntervalActive,
'supportsTcp': response.supportsTcp,
'addresses': response.addresses,
'rotatingId': response.rotatingId,
# derived values
'rotatingIdLen': 0 if not response.rotatingId else len(response.rotatingId),
'numIPs': len(response.addresses),
}
return decoded_response
if isinstance(response, EventResponse):
if not response.event_result_list:
# This means that the event result we got back was empty, below is how we
# represent this.
decoded_response = [{}]
return decoded_response
decoded_response = []
for event in response.event_result_list:
if event.Status != chip.interaction_model.Status.Success:
error_message = stringcase.snakecase(event.Status.name).upper()
decoded_response.append({'error': error_message})
continue
cluster_id = event.Header.ClusterId
cluster_name = self._test_spec_definition.get_cluster_name(cluster_id)
event_id = event.Header.EventId
event_name = self._test_spec_definition.get_event_name(cluster_id, event_id)
event_definition = self._test_spec_definition.get_event_by_name(cluster_name, event_name)
is_fabric_scoped = bool(event_definition.is_fabric_sensitive)
decoded_event = Converter.from_data_model_to_test_definition(
self._test_spec_definition, cluster_name, event_definition.fields, event.Data, is_fabric_scoped)
decoded_response.append({'value': decoded_event})
return decoded_response
if isinstance(response, ChipStackError):
decoded_response['error'] = 'FAILURE'
return decoded_response
cluster_name = self._test_spec_definition.get_cluster_name(response.cluster_id)
if cluster_name is None:
raise Exception("Cannot find cluster name for id 0x%0X / %d" % (response.cluster_id, response.cluster_id))
decoded_response['clusterId'] = cluster_name
if hasattr(response, 'command_id'):
decoded_response['command'] = self._test_spec_definition.get_response_name(
response.cluster_id, response.command_id)
response_definition = self._test_spec_definition.get_response_by_name(
cluster_name, decoded_response['command'])
decoded_response['value'] = Converter.from_data_model_to_test_definition(
self._test_spec_definition, cluster_name, response_definition.fields, response)
if hasattr(response, 'attribute_id'):
decoded_response['attribute'] = self._test_spec_definition.get_attribute_name(
response.cluster_id, response.attribute_id)
attribute = self._test_spec_definition.get_attribute_by_name(
cluster_name, decoded_response['attribute'])
# TODO Once we fix the issue of not being able to find the global attribute properly
# we should be able to remove this if/else statement below.
if attribute is None:
# When we cannot find the attribute it is because it is a global attribute like
# FeatureMap. Fortunately for these types we can get away with using
# 'response.value' directly if it is a list and mapping to int if not a list.
if isinstance(response.value, list):
decoded_response['value'] = response.value
else:
decoded_response['value'] = Converter.from_data_model_to_test_definition(
self._test_spec_definition, cluster_name, int, response.value)
else:
decoded_response['value'] = Converter.from_data_model_to_test_definition(
self._test_spec_definition, cluster_name, attribute.definition, response.value)
return decoded_response
def _get_fabric_id(self, id):
return _TestFabricId[id.upper()].value
def _get_dev_ctrl(self, action: BaseAction):
if action.identity is not None:
dev_ctrl = self._dev_ctrls.get(action.identity, None)
if dev_ctrl is None:
fabric_id = self._get_fabric_id(action.identity)
certificate_authority = self._certificate_authority_manager.activeCaList[0]
fabric = None
for existing_admin in certificate_authority.adminList:
if existing_admin.fabricId == fabric_id:
fabric = existing_admin
if fabric is None:
fabric = certificate_authority.NewFabricAdmin(vendorId=0xFFF1,
fabricId=fabric_id)
dev_ctrl = fabric.NewController()
self._dev_ctrls[action.identity] = dev_ctrl
else:
dev_ctrl = self._dev_ctrls['alpha']
return dev_ctrl
def execute(self, action: BaseAction):
dev_ctrl = self._get_dev_ctrl(action)
return action.run_action(dev_ctrl)
def shutdown(self):
for subscription in self._context.subscriptions:
subscription.Shutdown()