blob: 557b7606ccbea3ee2aaac7e3a4ed09191b7e45f6 [file] [log] [blame]
#
# Copyright (c) 2024 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
import queue
import time
from dataclasses import dataclass
from typing import Any
import chip.clusters as Clusters
import psutil
from chip.clusters import ClusterObjects as ClusterObjects
from chip.clusters.Attribute import EventReadResult, SubscriptionTransaction
from chip.clusters.Types import NullValue
from chip.interaction_model import InteractionModelError, Status
from chip.testing.matter_testing import ClusterAttributeChangeAccumulator, EventChangeCallback, TestStep
from mobly import asserts
def get_pid(name):
pid = None
for proc in psutil.process_iter():
if name in proc.name():
pid = proc.pid
break
return pid
@dataclass
class TestInfo:
pics_code: str
cluster: Clusters
class EventSpecificChangeCallback:
def __init__(self, expected_event: ClusterObjects.ClusterEvent):
"""This class creates a queue to store received event callbacks, that can be checked by the test script
expected_event: is the expected event
"""
self._q = queue.Queue()
self._expected_cluster_id = expected_event.cluster_id
self._expected_event = expected_event
async def start(self, dev_ctrl, node_id: int, endpoint: int):
"""This starts a subscription for events on the specified node_id and endpoint. The event is specified when the class instance is created."""
self._subscription = await dev_ctrl.ReadEvent(node_id,
events=[(endpoint, self._expected_event, True)], reportInterval=(1, 5),
fabricFiltered=False, keepSubscriptions=True, autoResubscribe=False)
self._subscription.SetEventUpdateCallback(self.__call__)
def __call__(self, res: EventReadResult, transaction: SubscriptionTransaction):
"""This is the subscription callback when an event is received.
It checks the if the event is the expected one and then posts it into the queue for later processing."""
if res.Status == Status.Success and res.Header.ClusterId == self._expected_cluster_id and res.Header.EventId == self._expected_event.event_id:
logging.info(
f'Got subscription report for event {self._expected_event.event_id} on cluster {self._expected_cluster_id}: {res.Data}')
self._q.put(res)
def wait_for_event_report(self, timeout: int = 10):
"""This function allows a test script to block waiting for the specific event to arrive with a timeout.
It returns the event data so that the values can be checked."""
try:
res = self._q.get(block=True, timeout=timeout)
except queue.Empty:
asserts.fail(f"Failed to receive a report for the event {self._expected_event}")
asserts.assert_equal(res.Header.ClusterId, self._expected_cluster_id, "Expected cluster ID not found in event report")
asserts.assert_equal(res.Header.EventId, self._expected_event.event_id, "Expected event ID not found in event report")
return res.Data
class TC_OPSTATE_BASE():
def setup_base(self, test_info=None, app_pipe="/tmp/chip_all_clusters_fifo_"):
asserts.assert_true(test_info is not None,
"You shall define the test info!")
self.test_info = test_info
self.app_pipe = app_pipe
if self.test_info.cluster == Clusters.OperationalState:
self.device = "Generic"
elif self.test_info.cluster == Clusters.OvenCavityOperationalState:
self.device = "Oven"
else:
asserts.fail(f"This provided cluster ({self.test_info.cluster}) is not supported!")
def init_test(self):
self.is_ci = self.check_pics("PICS_SDK_CI_ONLY")
if self.is_ci:
app_pid = self.matter_test_config.app_pid
if app_pid == 0:
app_pid = get_pid("chip-all-clusters-app")
if app_pid is None:
asserts.fail("The --app-pid flag must be set when PICS_SDK_CI_ONLY is set")
self.app_pipe = self.app_pipe + str(app_pid)
def send_raw_manual_or_pipe_command(self, command: dict, msg: str):
if self.is_ci:
self.write_to_app_pipe(command)
time.sleep(0.1)
else:
prompt = msg if msg is not None else "Press Enter when ready."
prompt += '\n'
self.wait_for_user_input(prompt_msg=prompt)
def send_manual_or_pipe_command(self, device: str, name: str, operation: str, param: Any = None, msg=None):
command = {
"Name": name,
"Device": device,
"Operation": operation,
}
if param is not None:
command["Param"] = param
self.send_raw_manual_or_pipe_command(command, msg)
async def send_cmd(self, endpoint, cmd, timedRequestTimeoutMs=None):
logging.info(f"##### Command {cmd}")
try:
return await self.send_single_cmd(cmd=cmd,
endpoint=endpoint,
timedRequestTimeoutMs=timedRequestTimeoutMs)
except InteractionModelError as e:
asserts.fail(f"Unexpected error returned: {e.status}")
async def send_cmd_expect_response(self, endpoint, cmd, expected_response, timedRequestTimeoutMs=None):
ret = await self.send_cmd(endpoint=endpoint,
cmd=cmd,
timedRequestTimeoutMs=timedRequestTimeoutMs)
asserts.assert_equal(ret.commandResponseState.errorStateID,
expected_response,
f"Command response ({ret.commandResponseState}) mismatched from expectation for {cmd} on {endpoint}")
async def read_expect_success(self, endpoint, attribute):
logging.info(f"##### Read {attribute}")
attr_value = await self.read_single_attribute_check_success(endpoint=endpoint,
cluster=self.test_info.cluster,
attribute=attribute)
logging.info(f"## {attribute}: {attr_value}")
return attr_value
async def read_and_expect_value(self, endpoint, attribute, expected_value):
attr_value = await self.read_expect_success(
endpoint=endpoint,
attribute=attribute)
asserts.assert_equal(attr_value, expected_value,
f"Value mismatched from expectation for {attribute} on {endpoint}")
async def read_and_expect_property_value(self, endpoint, attribute, attr_property, expected_value):
attr_value = await self.read_expect_success(
endpoint=endpoint,
attribute=attribute)
field_value = getattr(attr_value, attr_property)
asserts.assert_equal(field_value, expected_value,
f"Property '{attr_property}' value mismatched from expectation for {attribute} on {endpoint}")
async def read_and_expect_array_contains(self, endpoint, attribute, expected_contains):
attr_value = await self.read_expect_success(
endpoint=endpoint,
attribute=attribute)
attr_value.sort()
expected_contains.sort()
logging.info("## Current value: [%s]" % attr_value)
logging.info("## Expected value: [%s]" % expected_contains)
for item in expected_contains:
if item not in attr_value:
asserts.fail("Entry (%s), not found! The returned value SHALL include all the entries: [%s]!" % (
item, expected_contains))
############################
# TEST CASE 1.1
############################
def STEPS_TC_OPSTATE_BASE_1_1(self) -> list[TestStep]:
steps = [TestStep(1, "Commissioning, already done", is_commissioning=True),
TestStep(2, "TH reads from the DUT the ClusterRevision attribute"),
TestStep(3, "TH reads from the DUT the FeatureMap attribute"),
TestStep(4, "TH reads from the DUT the AttributeList attribute"),
TestStep(5, "TH reads from the DUT the EventList attribute"),
TestStep(6, "TH reads from the DUT the AcceptedCommandList attribute"),
TestStep(7, "TH reads from the DUT the GeneratedCommandList attribute")
]
return steps
async def TEST_TC_OPSTATE_BASE_1_1(self, endpoint=1, cluster_revision=1, feature_map=0):
cluster = self.test_info.cluster
attributes = cluster.Attributes
events = cluster.Events
commands = cluster.Commands
self.init_test()
# STEP 1: Commission DUT to TH (can be skipped if done in a preceding test)
self.step(1)
# STEP 2: TH reads from the DUT the ClusterRevision attribute
self.step(2)
await self.read_and_expect_value(endpoint=endpoint,
attribute=attributes.ClusterRevision,
expected_value=cluster_revision)
# STEP 3: TH reads from the DUT the FeatureMap attribute
self.step(3)
await self.read_and_expect_value(endpoint=endpoint,
attribute=attributes.FeatureMap,
expected_value=feature_map)
# STEP 4: TH reads from the DUT the AttributeList attribute
self.step(4)
expected_value = [
attributes.PhaseList.attribute_id,
attributes.CurrentPhase.attribute_id,
attributes.OperationalStateList.attribute_id,
attributes.OperationalState.attribute_id,
attributes.OperationalError.attribute_id,
attributes.GeneratedCommandList.attribute_id,
attributes.AcceptedCommandList.attribute_id,
attributes.AttributeList.attribute_id,
attributes.FeatureMap.attribute_id,
attributes.ClusterRevision.attribute_id
]
if self.check_pics(f"{self.test_info.pics_code}.S.A0002"):
expected_value.append(attributes.CountdownTime.attribute_id)
await self.read_and_expect_array_contains(endpoint=endpoint,
attribute=attributes.AttributeList,
expected_contains=expected_value)
# STEP 5: TH reads from the DUT the EventList attribute
self.step(5)
if self.pics_guard(self.check_pics("PICS_EVENT_LIST_ENABLED")):
expected_value = [
events.OperationalError.event_id,
]
if self.check_pics(f"{self.test_info.pics_code}.S.E01"):
expected_value.append(events.OperationCompletion.event_id)
await self.read_and_expect_array_contains(endpoint=endpoint,
attribute=attributes.EventList,
expected_contains=expected_value)
# STEP 6: TH reads from the DUT the AcceptedCommandList attribute
self.step(6)
expected_value = []
if (self.check_pics(f"{self.test_info.pics_code}.S.C00.Rsp") or
self.check_pics(f"{self.test_info.pics_code}.S.C03.Rsp")):
expected_value.append(commands.Pause.command_id)
if (self.check_pics(f"{self.test_info.pics_code}.S.C01.Rsp") or
self.check_pics(f"{self.test_info.pics_code}.S.C02.Rsp")):
expected_value.append(commands.Stop.command_id)
if self.check_pics(f"{self.test_info.pics_code}.S.C02.Rsp"):
expected_value.append(commands.Start.command_id)
if (self.check_pics(f"{self.test_info.pics_code}.S.C03.Rsp") or
self.check_pics(f"{self.test_info.pics_code}.S.C00.Rsp")):
expected_value.append(commands.Resume.command_id)
await self.read_and_expect_array_contains(endpoint=endpoint,
attribute=attributes.AcceptedCommandList,
expected_contains=expected_value)
# STEP 7: TH reads from the DUT the AcceptedCommandList attribute
self.step(7)
expected_value = []
if (self.check_pics(f"{self.test_info.pics_code}.S.C00.Rsp") or
self.check_pics(f"{self.test_info.pics_code}.S.C01.Rsp") or
self.check_pics(f"{self.test_info.pics_code}.S.C02.Rsp") or
self.check_pics(f"{self.test_info.pics_code}.S.C03.Rsp")):
expected_value.append(commands.OperationalCommandResponse.command_id)
await self.read_and_expect_array_contains(endpoint=endpoint,
attribute=attributes.GeneratedCommandList,
expected_contains=expected_value)
############################
# TEST CASE 2.1
############################
def STEPS_TC_OPSTATE_BASE_2_1(self) -> list[TestStep]:
steps = [TestStep(1, "Commissioning, already done", is_commissioning=True),
TestStep(2, "TH reads from the DUT the PhaseList attribute"),
TestStep(3, "TH reads from the DUT the CurrentPhase attribute"),
TestStep(4, "TH reads from the DUT the CountdownTime attribute"),
TestStep(5, "TH reads from the DUT the OperationalStateList attribute"),
TestStep(6, "TH reads from the DUT the OperationalState attribute"),
TestStep("6a", "Manually put the device in the Stopped(0x00) operational state"),
TestStep("6b", "TH reads from the DUT the OperationalState attribute"),
TestStep("6c", "Manually put the device in the Running(0x01) operational state"),
TestStep("6d", "TH reads from the DUT the OperationalState attribute"),
TestStep("6e", "Manually put the device in the Paused(0x02) operational state"),
TestStep("6f", "TH reads from the DUT the OperationalState attribute"),
TestStep("6g", "Manually put the device in the Error(0x03) operational state"),
TestStep("6h", "TH reads from the DUT the OperationalState attribute"),
TestStep(7, "TH reads from the DUT the OperationalError attribute"),
TestStep("7a", "Manually put the device in the NoError(0x00) error state"),
TestStep("7b", "TH reads from the DUT the OperationalError attribute"),
TestStep("7c", "Manually put the device in the UnableToStartOrResume(0x01) error state"),
TestStep("7d", "TH reads from the DUT the OperationalError attribute"),
TestStep("7e", "Manually put the device in the UnableToCompleteOperation(0x02) error state"),
TestStep("7f", "TH reads from the DUT the OperationalError attribute"),
TestStep("7g", "Manually put the device in the CommandInvalidInState(0x03) error state"),
TestStep("7h", "TH reads from the DUT the OperationalError attribute")
]
return steps
async def TEST_TC_OPSTATE_BASE_2_1(self, endpoint=1):
cluster = self.test_info.cluster
attributes = cluster.Attributes
self.init_test()
# STEP 1: Commission DUT to TH (can be skipped if done in a preceding test)
self.step(1)
# STEP 2: TH reads from the DUT the PhaseList attribute
self.step(2)
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0000")):
phase_list = await self.read_expect_success(endpoint=endpoint,
attribute=attributes.PhaseList)
if phase_list is not NullValue:
phase_list_len = len(phase_list)
asserts.assert_less_equal(phase_list_len, 32,
f"PhaseList length({phase_list_len}) must be at most 32 entries!")
# STEP 3: TH reads from the DUT the CurrentPhase attribute
self.step(3)
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0001")):
current_phase = await self.read_expect_success(endpoint=endpoint,
attribute=attributes.CurrentPhase)
if (phase_list == NullValue) or (not phase_list):
asserts.assert_true(current_phase == NullValue, f"CurrentPhase({current_phase}) should be null")
else:
asserts.assert_greater_equal(current_phase, 0, f"CurrentPhase({current_phase}) must be >= 0")
asserts.assert_less(current_phase, phase_list_len,
f"CurrentPhase({current_phase}) must be less than {phase_list_len}")
# STEP 4: TH reads from the DUT the CountdownTime attribute
self.step(4)
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0002")):
countdown_time = await self.read_expect_success(endpoint=endpoint,
attribute=attributes.CountdownTime)
if countdown_time is not NullValue:
asserts.assert_true(0 <= countdown_time <= 259200,
f"CountdownTime({countdown_time}) must be between 0 and 259200")
# STEP 5: TH reads from the DUT the OperationalStateList attribute
self.step(5)
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0003")):
operational_state_list = await self.read_expect_success(endpoint=endpoint,
attribute=attributes.OperationalStateList)
defined_states = [state.value for state in cluster.Enums.OperationalStateEnum
if state != cluster.Enums.OperationalStateEnum.kUnknownEnumValue]
for state in operational_state_list:
in_range = (0x80 <= state.operationalStateID <= 0xBF)
asserts.assert_true(state.operationalStateID in defined_states or in_range,
"Found a OperationalStateList entry with invalid ID value!")
if in_range:
asserts.assert_true(state.operationalStateLabel is not None,
"The OperationalStateLabel should be populated")
if state.operationalStateID == cluster.Enums.OperationalStateEnum.kError:
error_state_present = True
asserts.assert_true(error_state_present, "The OperationalStateList does not have an ID entry of Error(0x03)")
# STEP 6: TH reads from the DUT the OperationalState attribute
self.step(6)
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0004")):
operational_state = await self.read_expect_success(endpoint=endpoint,
attribute=attributes.OperationalState)
in_range = (0x80 <= operational_state <= 0xBF)
asserts.assert_true(operational_state in defined_states or in_range,
"OperationalState has an invalid ID value!")
# STEP 6a: Manually put the device in the Stopped(0x00) operational state
self.step("6a")
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.M.ST_STOPPED")):
self.send_manual_or_pipe_command(name="OperationalStateChange",
device=self.device,
operation="Stop")
# STEP 6b: TH reads from the DUT the OperationalState attribute
self.step("6b")
await self.read_and_expect_value(endpoint=endpoint,
attribute=attributes.OperationalState,
expected_value=cluster.Enums.OperationalStateEnum.kStopped)
else:
self.skip_step("6b")
# STEP 6c: Manually put the device in the Running(0x01) operational state
self.step("6c")
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.M.ST_RUNNING")):
self.send_manual_or_pipe_command(name="OperationalStateChange",
device=self.device,
operation="Start")
# STEP 6d: TH reads from the DUT the OperationalState attribute
self.step("6d")
await self.read_and_expect_value(endpoint=endpoint,
attribute=attributes.OperationalState,
expected_value=cluster.Enums.OperationalStateEnum.kRunning)
else:
self.skip_step("6d")
# STEP 6e: Manually put the device in the Paused(0x02) operational state
self.step("6e")
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.M.ST_PAUSED")):
self.send_manual_or_pipe_command(name="OperationalStateChange",
device=self.device,
operation="Pause")
# STEP 6f: TH reads from the DUT the OperationalState attribute
self.step("6f")
await self.read_and_expect_value(endpoint=endpoint,
attribute=attributes.OperationalState,
expected_value=cluster.Enums.OperationalStateEnum.kPaused)
else:
self.skip_step("6f")
# STEP 6g: Manually put the device in the Error(0x03) operational state
self.step("6g")
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.M.ST_ERROR")):
self.send_manual_or_pipe_command(name="OperationalStateChange",
device=self.device,
operation="OnFault",
param=cluster.Enums.ErrorStateEnum.kUnableToStartOrResume)
# STEP 6h: TH reads from the DUT the OperationalState attribute
self.step("6h")
await self.read_and_expect_value(endpoint=endpoint,
attribute=attributes.OperationalState,
expected_value=cluster.Enums.OperationalStateEnum.kError)
else:
self.skip_step("6h")
# STEP 7: TH reads from the DUT the OperationalError attribute
self.step(7)
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0005")):
operational_error = await self.read_expect_success(endpoint=endpoint,
attribute=attributes.OperationalError)
# Defined Errors
defined_errors = [error.value for error in cluster.Enums.ErrorStateEnum
if error != cluster.Enums.ErrorStateEnum.kUnknownEnumValue]
in_range = (0x80 <= operational_error.errorStateID <= 0xBF)
asserts.assert_true(operational_error.errorStateID in defined_errors
or in_range, "OperationalError has an invalid ID value!")
if in_range:
asserts.assert_true(operational_error.errorStateLabel is not None, "ErrorStateLabel should be populated")
# STEP 7a: Manually put the device in the NoError(0x00) error state
self.step("7a")
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.M.ERR_NO_ERROR")):
self.send_manual_or_pipe_command(name="OperationalStateChange",
device=self.device,
operation="OnFault",
param=cluster.Enums.ErrorStateEnum.kNoError)
# STEP 7b: TH reads from the DUT the OperationalError attribute
self.step("7b")
await self.read_and_expect_property_value(endpoint=endpoint,
attribute=attributes.OperationalError,
attr_property="errorStateID",
expected_value=cluster.Enums.ErrorStateEnum.kNoError)
else:
self.skip_step("7b")
# STEP 7c: Manually put the device in the UnableToStartOrResume(0x01) error state
self.step("7c")
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.M.ERR_UNABLE_TO_START_OR_RESUME")):
self.send_manual_or_pipe_command(name="OperationalStateChange",
device=self.device,
operation="OnFault",
param=cluster.Enums.ErrorStateEnum.kUnableToStartOrResume)
# STEP 7d: TH reads from the DUT the OperationalError attribute
self.step("7d")
await self.read_and_expect_property_value(endpoint=endpoint,
attribute=attributes.OperationalError,
attr_property="errorStateID",
expected_value=cluster.Enums.ErrorStateEnum.kUnableToStartOrResume)
else:
self.skip_step("7d")
# STEP 7e: Manually put the device in the UnableToCompleteOperation(0x02) error state
self.step("7e")
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.M.ERR_UNABLE_TO_COMPLETE_OPERATION")):
self.send_manual_or_pipe_command(name="OperationalStateChange",
device=self.device,
operation="OnFault",
param=cluster.Enums.ErrorStateEnum.kUnableToCompleteOperation)
# STEP 7f: TH reads from the DUT the OperationalError attribute
self.step("7f")
await self.read_and_expect_property_value(endpoint=endpoint,
attribute=attributes.OperationalError,
attr_property="errorStateID",
expected_value=cluster.Enums.ErrorStateEnum.kUnableToCompleteOperation)
else:
self.skip_step("7f")
# STEP 7g: Manually put the device in the CommandInvalidInState(0x03) error state
self.step("7g")
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.M.ERR_COMMAND_INVALID_IN_STATE")):
self.send_manual_or_pipe_command(name="OperationalStateChange",
device=self.device,
operation="OnFault",
param=cluster.Enums.ErrorStateEnum.kCommandInvalidInState)
# STEP 7h: TH reads from the DUT the OperationalError attribute
self.step("7h")
await self.read_and_expect_property_value(endpoint=endpoint,
attribute=attributes.OperationalError,
attr_property="errorStateID",
expected_value=cluster.Enums.ErrorStateEnum.kCommandInvalidInState)
else:
self.skip_step("7h")
############################
# TEST CASE 2.2
############################
def STEPS_TC_OPSTATE_BASE_2_2(self) -> list[TestStep]:
steps = [TestStep(1, "Commissioning, already done", is_commissioning=True),
TestStep(2, "Manually put the DUT into a state wherein it can receive a Start Command"),
TestStep(3, "TH reads from the DUT the OperationalStateList attribute"),
TestStep(4, "TH sends Start command to the DUT"),
TestStep(5, "TH reads from the DUT the OperationalState attribute"),
TestStep(6, "TH reads from the DUT the OperationalError attribute"),
TestStep(7, "TH reads from the DUT the CountdownTime attribute"),
TestStep(8, "TH reads from the DUT the PhaseList attribute"),
TestStep(9, "TH reads from the DUT the CurrentPhase attribute"),
TestStep(10, "TH waits for {PIXIT.WAITTIME.COUNTDOWN}"),
TestStep(11, "TH reads from the DUT the CountdownTime attribute"),
TestStep(12, "TH sends Start command to the DUT"),
TestStep(13, "TH sends Stop command to the DUT"),
TestStep(14, "TH reads from the DUT the OperationalState attribute"),
TestStep(15, "TH sends Stop command to the DUT"),
TestStep(16, "Manually put the DUT into a state wherein it cannot receive a Start Command"),
TestStep(17, "TH sends Start command to the DUT")
]
return steps
async def TEST_TC_OPSTATE_BASE_2_2(self, endpoint=1):
cluster = self.test_info.cluster
attributes = cluster.Attributes
commands = cluster.Commands
self.init_test()
asserts.assert_true('PIXIT.WAITTIME.COUNTDOWN' in self.matter_test_config.global_test_params,
"PIXIT.WAITTIME.COUNTDOWN must be included on the command line in "
"the --int-arg flag as PIXIT.WAITTIME.COUNTDOWN:<wait time>")
wait_time = self.matter_test_config.global_test_params['PIXIT.WAITTIME.COUNTDOWN']
if wait_time == 0:
asserts.fail("PIXIT.WAITTIME.COUNTDOWN shall be higher than 0.")
# STEP 1: Commission DUT to TH (can be skipped if done in a preceding test)
self.step(1)
# STEP 2: Manually put the DUT into a state wherein it can receive a Start Command
self.step(2)
self.send_manual_or_pipe_command(name="OperationalStateChange",
device=self.device,
operation="OnFault",
param=cluster.Enums.ErrorStateEnum.kNoError)
self.send_manual_or_pipe_command(name="OperationalStateChange",
device=self.device,
operation="Stop")
# STEP 3: TH reads from the DUT the OperationalStateList attribute
self.step(3)
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0003")):
operational_state_list = await self.read_expect_success(endpoint=endpoint,
attribute=attributes.OperationalStateList)
operational_state_list_ids = [op_state.operationalStateID for op_state in operational_state_list]
defined_states = [state.value for state in cluster.Enums.OperationalStateEnum
if state != cluster.Enums.OperationalStateEnum.kUnknownEnumValue]
for state in defined_states:
if state not in operational_state_list_ids:
asserts.fail(f"The list shall include structs with the following OperationalStateIds: {defined_states}")
# STEP 4: TH sends Start command to the DUT
self.step(4)
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.C02.Rsp") and
self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")):
await self.send_cmd_expect_response(endpoint=endpoint,
cmd=commands.Start(),
expected_response=cluster.Enums.ErrorStateEnum.kNoError)
# STEP 5: TH reads from the DUT the OperationalState attribute
self.step(5)
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0004")):
await self.read_and_expect_value(endpoint=endpoint,
attribute=attributes.OperationalState,
expected_value=cluster.Enums.OperationalStateEnum.kRunning)
# STEP 6: TH reads from the DUT the OperationalError attribute
self.step(6)
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0005")):
await self.read_and_expect_property_value(endpoint=endpoint,
attribute=attributes.OperationalError,
attr_property="errorStateID",
expected_value=cluster.Enums.ErrorStateEnum.kNoError)
# STEP 7: TH reads from the DUT the CountdownTime attribute
self.step(7)
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0002")):
initial_countdown_time = await self.read_expect_success(endpoint=endpoint,
attribute=attributes.CountdownTime)
if initial_countdown_time is not NullValue:
asserts.assert_true(0 <= initial_countdown_time <= 259200,
f"CountdownTime({initial_countdown_time}) must be between 0 and 259200")
# STEP 8: TH reads from the DUT the PhaseList attribute
self.step(8)
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0000")):
phase_list = await self.read_expect_success(endpoint=endpoint,
attribute=attributes.PhaseList)
phase_list_len = 0
if phase_list is not NullValue:
phase_list_len = len(phase_list)
asserts.assert_less_equal(phase_list_len, 32,
f"PhaseList length({phase_list_len}) must be at most 32 entries!")
# STEP 9: TH reads from the DUT the CurrentPhase attribute
self.step(9)
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0001")):
current_phase = await self.read_expect_success(endpoint=endpoint,
attribute=attributes.CurrentPhase)
if (phase_list == NullValue) or (not phase_list):
asserts.assert_equal(current_phase, NullValue, f"CurrentPhase({current_phase}) should be null")
else:
asserts.assert_greater_equal(current_phase, 0,
f"CurrentPhase({current_phase}) must be greater or equal to 0")
asserts.assert_less(current_phase, phase_list_len,
f"CurrentPhase({current_phase}) must be less than {(phase_list_len)}")
# STEP 10: TH waits for {PIXIT.WAITTIME.COUNTDOWN}
self.step(10)
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0002")):
time.sleep(wait_time)
# STEP 11: TH reads from the DUT the CountdownTime attribute
self.step(11)
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0002")):
countdown_time = await self.read_expect_success(endpoint=endpoint,
attribute=attributes.CountdownTime)
if (countdown_time is not NullValue) and (initial_countdown_time is not NullValue):
logging.info(f" -> Initial countdown time: {initial_countdown_time}")
logging.info(f" -> New countdown time: {countdown_time}")
asserts.assert_less_equal(countdown_time, (initial_countdown_time - wait_time),
f"The countdown time shall have decreased at least {wait_time:.1f} since start command")
# STEP 12: TH sends Start command to the DUT
self.step(12)
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.C02.Rsp") and
self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")):
await self.send_cmd_expect_response(endpoint=endpoint,
cmd=commands.Start(),
expected_response=cluster.Enums.ErrorStateEnum.kNoError)
# STEP 13: TH sends Stop command to the DUT
self.step(13)
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.C01.Rsp") and
self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")):
await self.send_cmd_expect_response(endpoint=endpoint,
cmd=commands.Stop(),
expected_response=cluster.Enums.ErrorStateEnum.kNoError)
# STEP 14: TH reads from the DUT the OperationalState attribute
self.step(14)
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0004")):
await self.read_and_expect_value(endpoint=endpoint,
attribute=attributes.OperationalState,
expected_value=cluster.Enums.OperationalStateEnum.kStopped)
# STEP 15: TH sends Stop command to the DUT
self.step(15)
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.C01.Rsp") and
self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")):
await self.send_cmd_expect_response(endpoint=endpoint,
cmd=commands.Stop(),
expected_response=cluster.Enums.ErrorStateEnum.kNoError)
# STEP 16: Manually put the DUT into a state wherein it cannot receive a Start Command
self.step(16)
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.M.ERR_UNABLE_TO_START_OR_RESUME")):
self.send_manual_or_pipe_command(name="OperationalStateChange",
device=self.device,
operation="OnFault",
param=cluster.Enums.ErrorStateEnum.kUnableToCompleteOperation)
# STEP 17: TH sends Start command to the DUT
self.step(17)
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.M.ERR_UNABLE_TO_START_OR_RESUME") and
self.check_pics(f"{self.test_info.pics_code}.S.C02.Rsp") and
self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")):
await self.send_cmd_expect_response(endpoint=endpoint,
cmd=commands.Start(),
expected_response=cluster.Enums.ErrorStateEnum.kUnableToStartOrResume)
############################
# TEST CASE 2.3
############################
def STEPS_TC_OPSTATE_BASE_2_3(self) -> list[TestStep]:
steps = [TestStep(1, "Commissioning, already done", is_commissioning=True),
TestStep(2, "Manually put the DUT into a state wherein it can receive a Pause Command"),
TestStep(3, "TH reads from the DUT the OperationalStateList attribute"),
TestStep(4, "TH sends Pause command to the DUT"),
TestStep(5, "TH reads from the DUT the OperationalState attribute"),
TestStep(6, "TH reads from the DUT the CountdownTime attribute"),
TestStep(7, "TH waits for {PIXIT.WAITTIME.COUNTDOWN}"),
TestStep(8, "TH reads from the DUT the CountdownTime attribute"),
TestStep(9, "TH sends Pause command to the DUT"),
TestStep(10, "TH sends Resume command to the DUT"),
TestStep(11, "TH reads from the DUT the OperationalState attribute"),
TestStep(12, "TH sends Resume command to the DUT"),
TestStep(13, "Manually put the device in the Stopped(0x00) operational state"),
TestStep(14, "TH sends Pause command to the DUT"),
TestStep(15, "TH sends Resume command to the DUT"),
TestStep(16, "Manually put the device in the Error(0x03) operational state"),
TestStep(17, "TH sends Pause command to the DUT"),
TestStep(18, "TH sends Resume command to the DUT")
]
return steps
async def TEST_TC_OPSTATE_BASE_2_3(self, endpoint=1):
cluster = self.test_info.cluster
attributes = cluster.Attributes
commands = cluster.Commands
self.init_test()
asserts.assert_true('PIXIT.WAITTIME.COUNTDOWN' in self.matter_test_config.global_test_params,
"PIXIT.WAITTIME.COUNTDOWN must be included on the command line in "
"the --int-arg flag as PIXIT.WAITTIME.COUNTDOWN:<wait time>")
wait_time = self.matter_test_config.global_test_params['PIXIT.WAITTIME.COUNTDOWN']
if wait_time == 0:
asserts.fail("PIXIT.WAITTIME.COUNTDOWN shall be higher than 0.")
# STEP 1: Commission DUT to TH (can be skipped if done in a preceding test)
self.step(1)
# STEP 2: Manually put the DUT into a state wherein it can receive a Pause Command
self.step(2)
self.send_manual_or_pipe_command(name="OperationalStateChange",
device=self.device,
operation="OnFault",
param=cluster.Enums.ErrorStateEnum.kNoError)
self.send_manual_or_pipe_command(name="OperationalStateChange",
device=self.device,
operation="Start")
# STEP 3: TH reads from the DUT the OperationalStateList attribute
self.step(3)
if self.pics_guard(self.check_pics((f"{self.test_info.pics_code}.S.A0003"))):
operational_state_list = await self.read_expect_success(endpoint=endpoint,
attribute=attributes.OperationalStateList)
operational_state_list_ids = [op_state.operationalStateID for op_state in operational_state_list]
defined_states = [state.value for state in cluster.Enums.OperationalStateEnum
if state != cluster.Enums.OperationalStateEnum.kUnknownEnumValue]
for state in defined_states:
if state not in operational_state_list_ids:
asserts.fail(f"The list shall include structs with the following OperationalStateIds: {defined_states}")
# STEP 4: TH sends Pause command to the DUT
self.step(4)
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.C00.Rsp") and
self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")):
await self.send_cmd_expect_response(endpoint=endpoint,
cmd=commands.Pause(),
expected_response=cluster.Enums.ErrorStateEnum.kNoError)
# STEP 5: TH reads from the DUT the OperationalState attribute
self.step(5)
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0004")):
await self.read_and_expect_value(endpoint=endpoint,
attribute=attributes.OperationalState,
expected_value=cluster.Enums.OperationalStateEnum.kPaused)
# STEP 6: TH reads from the DUT the CountdownTime attribute
self.step(6)
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0002")):
initial_countdown_time = await self.read_expect_success(endpoint=endpoint,
attribute=attributes.CountdownTime)
if initial_countdown_time is not NullValue:
logging.info(f" -> Initial ountdown time: {initial_countdown_time}")
asserts.assert_true(0 <= initial_countdown_time <= 259200,
f"CountdownTime({initial_countdown_time}) must be between 0 and 259200")
# STEP 7: TH waits for {PIXIT.WAITTIME.COUNTDOWN}
self.step(7)
time.sleep(wait_time)
# STEP 8: TH reads from the DUT the CountdownTime attribute
self.step(8)
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0002")):
countdown_time = await self.read_expect_success(endpoint=endpoint,
attribute=attributes.CountdownTime)
if (countdown_time is not NullValue) and (initial_countdown_time is not NullValue):
logging.info(f" -> Initial countdown time: {initial_countdown_time}")
logging.info(f" -> New countdown time: {countdown_time}")
asserts.assert_equal(countdown_time, initial_countdown_time,
"The countdown time shall be equal since pause command")
# STEP 9: TH sends Pause command to the DUT
self.step(9)
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.C00.Rsp") and
self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")):
await self.send_cmd_expect_response(endpoint=endpoint,
cmd=commands.Pause(),
expected_response=cluster.Enums.ErrorStateEnum.kNoError)
# STEP 10: TH sends Resume command to the DUT
self.step(10)
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.C03.Rsp") and
self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")):
await self.send_cmd_expect_response(endpoint=endpoint,
cmd=commands.Resume(),
expected_response=cluster.Enums.ErrorStateEnum.kNoError)
# STEP 11: TH reads from the DUT the OperationalState attribute
self.step(11)
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0004")):
await self.read_and_expect_value(endpoint=endpoint,
attribute=attributes.OperationalState,
expected_value=cluster.Enums.OperationalStateEnum.kRunning)
# STEP 12: TH sends Resume command to the DUT
self.step(12)
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.C03.Rsp") and
self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")):
await self.send_cmd_expect_response(endpoint=endpoint,
cmd=commands.Resume(),
expected_response=cluster.Enums.ErrorStateEnum.kNoError)
# STEP 13: Manually put the device in the Stopped(0x00) operational state
self.step(13)
if self.pics_guard(self.check_pics((f"{self.test_info.pics_code}.S.M.ST_STOPPED"))):
self.send_manual_or_pipe_command(name="OperationalStateChange",
device=self.device,
operation="Stop")
# STEP 14: TH sends Pause command to the DUT
self.step(14)
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.C00.Rsp") and
self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")):
await self.send_cmd_expect_response(endpoint=endpoint,
cmd=commands.Pause(),
expected_response=cluster.Enums.ErrorStateEnum.kCommandInvalidInState)
# STEP 15: TH sends Resume command to the DUT
self.step(15)
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.C03.Rsp") and
self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")):
await self.send_cmd_expect_response(endpoint=endpoint,
cmd=commands.Resume(),
expected_response=cluster.Enums.ErrorStateEnum.kCommandInvalidInState)
# STEP 16: Manually put the device in the Error(0x03) operational state
self.step(16)
if self.pics_guard(self.check_pics((f"{self.test_info.pics_code}.S.M.ST_ERROR"))):
self.send_manual_or_pipe_command(name="OperationalStateChange",
device=self.device,
operation="OnFault",
param=cluster.Enums.ErrorStateEnum.kUnableToStartOrResume)
# STEP 17: TH sends Pause command to the DUT
self.step(17)
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.C00.Rsp") and
self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")):
await self.send_cmd_expect_response(endpoint=endpoint,
cmd=commands.Pause(),
expected_response=cluster.Enums.ErrorStateEnum.kCommandInvalidInState)
# STEP 18: TH sends Resume command to the DUT
self.step(18)
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.C03.Rsp") and
self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")):
await self.send_cmd_expect_response(endpoint=endpoint,
cmd=commands.Resume(),
expected_response=cluster.Enums.ErrorStateEnum.kCommandInvalidInState)
############################
# TEST CASE 2.4
############################
def STEPS_TC_OPSTATE_BASE_2_4(self) -> list[TestStep]:
steps = [TestStep(1, "Commissioning, already done", is_commissioning=True),
TestStep(2, "Set up a subscription to the OperationalError event"),
TestStep(3, "At the DUT take the vendor defined action to generate an OperationalError event"),
TestStep(4, "TH reads from the DUT the OperationalState attribute")
]
return steps
async def TEST_TC_OPSTATE_BASE_2_4(self, endpoint=1):
cluster = self.test_info.cluster
attributes = cluster.Attributes
events = cluster.Events
self.init_test()
pixit_var_name = f'PIXIT.{self.test_info.pics_code}.ErrorEventGen'
print(pixit_var_name in self.matter_test_config.global_test_params)
asserts.assert_true(pixit_var_name in self.matter_test_config.global_test_params,
f"{pixit_var_name} must be included on the command line in the --int-arg flag as {pixit_var_name}:<0 or 1>")
error_event_gen = self.matter_test_config.global_test_params[pixit_var_name]
# STEP 1: Commission DUT to TH (can be skipped if done in a preceding test)
self.step(1)
if self.pics_guard(error_event_gen):
# STEP 2: Set up a subscription to the OperationalError event
self.step(2)
# Subscribe to Events and when they are sent push them to a queue for checking later
events_callback = EventChangeCallback(cluster)
await events_callback.start(self.default_controller,
self.dut_node_id,
endpoint)
# STEP 3: At the DUT take the vendor defined action to generate an OperationalError event
self.step(3)
self.send_manual_or_pipe_command(name="OperationalStateChange",
device=self.device,
operation="OnFault",
param=cluster.Enums.ErrorStateEnum.kUnableToStartOrResume)
event_data = events_callback.wait_for_event_report(events.OperationalError).errorState
# Defined Errors
defined_errors = [error.value for error in cluster.Enums.ErrorStateEnum
if (error != cluster.Enums.ErrorStateEnum.kUnknownEnumValue or
error != cluster.Enums.ErrorStateEnum.kNoError)]
in_range = (0x80 <= event_data.errorStateID <= 0xBF)
asserts.assert_true(event_data.errorStateID in defined_errors
or in_range, "Event has an invalid ID value!")
if in_range:
asserts.assert_true(event_data.errorStateLabel is not None, "ErrorStateLabel should be populated")
# STEP 4: TH reads from the DUT the OperationalState attribute
self.step(4)
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0004")):
await self.read_and_expect_value(endpoint=endpoint,
attribute=attributes.OperationalState,
expected_value=cluster.Enums.OperationalStateEnum.kError)
else:
self.skip_step(2)
self.skip_step(3)
self.skip_step(4)
############################
# TEST CASE 2.5
############################
def STEPS_TC_OPSTATE_BASE_2_5(self) -> list[TestStep]:
steps = [TestStep(1, "Commissioning, already done", is_commissioning=True),
TestStep(2, "Set up a subscription to the OperationCompletion event"),
TestStep(3, "Manually put the DUT into a state wherein it can receive a Start Command"),
TestStep(4, "TH sends Start command to the DUT"),
TestStep(5, "TH reads from the DUT the CountdownTime attribute"),
TestStep(6, "TH reads from the DUT the OperationalState attribute"),
TestStep(7, "TH waits for initial-countdown-time"),
TestStep(8, "TH sends Stop command to the DUT"),
TestStep(9, "TH waits for OperationCompletion event"),
TestStep(10, "TH reads from the DUT the OperationalState attribute"),
TestStep(11, "Restart DUT"),
TestStep(12, "TH waits for {PIXIT.WAITTIME.REBOOT}"),
TestStep(13, "TH sends Start command to the DUT"),
TestStep(14, "TH reads from the DUT the OperationalState attribute"),
TestStep(15, "TH sends Pause command to the DUT"),
TestStep(16, "TH reads from the DUT the OperationalState attribute"),
TestStep(17, "TH waits for half of initial-countdown-time"),
TestStep(18, "TH sends Resume command to the DUT"),
TestStep(19, "TH reads from the DUT the OperationalState attribute"),
TestStep(20, "TH waits for initial-countdown-time"),
TestStep(21, "TH sends Stop command to the DUT"),
TestStep(22, "TH waits for OperationCompletion event")
]
return steps
async def TEST_TC_OPSTATE_BASE_2_5(self, endpoint=1):
cluster = self.test_info.cluster
attributes = cluster.Attributes
commands = cluster.Commands
events = cluster.Events
self.init_test()
asserts.assert_true('PIXIT.WAITTIME.REBOOT' in self.matter_test_config.global_test_params,
"PIXIT.WAITTIME.REBOOT must be included on the command line in "
"the --int-arg flag as PIXIT.WAITTIME.REBOOT:<wait time>")
wait_time_reboot = self.matter_test_config.global_test_params['PIXIT.WAITTIME.REBOOT']
if wait_time_reboot == 0:
asserts.fail("PIXIT.WAITTIME.REBOOT shall be higher than 0.")
# STEP 1: Commission DUT to TH (can be skipped if done in a preceding test)
self.step(1)
# STEP 2: Set up a subscription to the OperationCompletion event
self.step(2)
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.E01")):
# Subscribe to Events and when they are sent push them to a queue for checking later
events_callback = EventSpecificChangeCallback(events.OperationCompletion)
await events_callback.start(self.default_controller,
self.dut_node_id,
endpoint)
# STEP 3: Manually put the DUT into a state wherein it can receive a Start Command
self.step(3)
self.send_manual_or_pipe_command(name="OperationalStateChange",
device=self.device,
operation="OnFault",
param=cluster.Enums.ErrorStateEnum.kNoError,
msg="Ensure the DUT is not in an error state.")
self.send_manual_or_pipe_command(name="OperationalStateChange",
device=self.device,
operation="Stop",
msg="Put the DUT in a state where it can receive a start command")
# STEP 4: TH sends Start command to the DUT
self.step(4)
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.C02.Rsp") and
self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")):
await self.send_cmd_expect_response(endpoint=endpoint,
cmd=commands.Start(),
expected_response=cluster.Enums.ErrorStateEnum.kNoError)
# STEP 5: TH reads from the DUT the CountdownTime attribute
self.step(5)
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0002")):
initial_countdown_time = await self.read_expect_success(endpoint=endpoint,
attribute=attributes.CountdownTime)
if initial_countdown_time is not NullValue:
# STEP 6: TH reads from the DUT the OperationalState attribute
self.step(6)
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0004")):
await self.read_and_expect_value(endpoint=endpoint,
attribute=attributes.OperationalState,
expected_value=cluster.Enums.OperationalStateEnum.kRunning)
# STEP 7: TH waits for initial-countdown-time
self.step(7)
logging.info(f'Sleeping for {initial_countdown_time:.1f} seconds.')
time.sleep(initial_countdown_time)
# STEP 8: TH sends Stop command to the DUT
self.step(8)
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.C01.Rsp") and
self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")):
await self.send_cmd_expect_response(endpoint=endpoint,
cmd=commands.Stop(),
expected_response=cluster.Enums.ErrorStateEnum.kNoError)
# STEP 9: TH waits for OperationCompletion event
self.step(9)
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.E01")):
event_data = events_callback.wait_for_event_report()
asserts.assert_equal(event_data.completionErrorCode, cluster.Enums.ErrorStateEnum.kNoError,
f"Completion event error code mismatched from expectation on endpoint {endpoint}.")
if event_data.totalOperationalTime is not NullValue:
time_diff = abs(initial_countdown_time - event_data.totalOperationalTime)
asserts.assert_less_equal(time_diff, 1,
f"The total operation time shall be at least {initial_countdown_time:.1f}")
asserts.assert_equal(0, event_data.pausedTime,
f"Paused time ({event_data.pausedTime}) shall be zero")
# STEP 10: TH reads from the DUT the OperationalState attribute
self.step(10)
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0004")):
await self.read_and_expect_value(endpoint=endpoint,
attribute=attributes.OperationalState,
expected_value=cluster.Enums.OperationalStateEnum.kStopped)
# STEP 11: Restart DUT
self.step(11)
# In CI environment, the STOP command (step 8) already resets the variables. Only ask for
# reboot outside CI environment.
if not self.is_ci:
self.wait_for_user_input(prompt_msg="Restart DUT. Press Enter when ready.\n")
# Expire the session and re-establish the subscription
self.default_controller.ExpireSessions(self.dut_node_id)
if self.check_pics(f"{self.test_info.pics_code}.S.E01"):
# Subscribe to Events and when they are received push them to a queue for checking later
events_callback = EventSpecificChangeCallback(events.OperationCompletion)
await events_callback.start(self.default_controller,
self.dut_node_id,
endpoint)
# STEP 12: TH waits for {PIXIT.WAITTIME.REBOOT}
self.step(12)
time.sleep(wait_time_reboot)
# STEP 13: TH sends Start command to the DUT
self.step(13)
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.C02.Rsp") and
self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")):
await self.send_cmd_expect_response(endpoint=endpoint,
cmd=commands.Start(),
expected_response=cluster.Enums.ErrorStateEnum.kNoError)
# STEP 14: TH reads from the DUT the OperationalState attribute
self.step(14)
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0004")):
await self.read_and_expect_value(endpoint=endpoint,
attribute=attributes.OperationalState,
expected_value=cluster.Enums.OperationalStateEnum.kRunning)
# STEP 15: TH sends Pause command to the DUT
self.step(15)
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.C00.Rsp") and
self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")):
await self.send_cmd_expect_response(endpoint=endpoint,
cmd=commands.Pause(),
expected_response=cluster.Enums.ErrorStateEnum.kNoError)
# STEP 16: TH reads from the DUT the OperationalState attribute
self.step(16)
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0004")):
await self.read_and_expect_value(endpoint=endpoint,
attribute=attributes.OperationalState,
expected_value=cluster.Enums.OperationalStateEnum.kPaused)
# STEP 17: TH waits for half of initial-countdown-time
self.step(17)
time.sleep((initial_countdown_time / 2))
# STEP 18: TH sends Resume command to the DUT
self.step(18)
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.C03.Rsp") and
self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")):
await self.send_cmd_expect_response(endpoint=endpoint,
cmd=commands.Resume(),
expected_response=cluster.Enums.ErrorStateEnum.kNoError)
# STEP 19: TH reads from the DUT the OperationalState attribute
self.step(19)
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.A0004")):
await self.read_and_expect_value(endpoint=endpoint,
attribute=attributes.OperationalState,
expected_value=cluster.Enums.OperationalStateEnum.kRunning)
# STEP 20: TH waits for initial-countdown-time
self.step(20)
time.sleep(initial_countdown_time)
# STEP 21: TH sends Stop command to the DUT
self.step(21)
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.C01.Rsp") and
self.check_pics(f"{self.test_info.pics_code}.S.C04.Tx")):
await self.send_cmd_expect_response(endpoint=endpoint,
cmd=commands.Stop(),
expected_response=cluster.Enums.ErrorStateEnum.kNoError)
# STEP 22: TH waits for OperationCompletion event
self.step(22)
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.E01")):
event_data = events_callback.wait_for_event_report()
asserts.assert_equal(event_data.completionErrorCode, cluster.Enums.ErrorStateEnum.kNoError,
f"Completion event error code mismatched from expectation on endpoint {endpoint}.")
if event_data.totalOperationalTime is not NullValue:
expected_value = (1.5 * initial_countdown_time)
asserts.assert_less_equal(expected_value, event_data.totalOperationalTime,
f"The total operation time shall be at least {expected_value:.1f}")
expected_value = (0.5 * initial_countdown_time)
asserts.assert_less_equal(expected_value, event_data.pausedTime,
f"Paused time ({event_data.pausedTime}) shall be at least {expected_value:.1f}")
else:
self.skip_step(6)
self.skip_step(7)
self.skip_step(8)
self.skip_step(9)
self.skip_step(10)
self.skip_step(11)
self.skip_step(12)
self.skip_step(13)
self.skip_step(14)
self.skip_step(15)
self.skip_step(16)
self.skip_step(17)
self.skip_step(18)
self.skip_step(19)
self.skip_step(20)
self.skip_step(21)
self.skip_step(22)
############################
# TEST CASE 2.6 - Optional Reports with DUT as Server
############################
def steps_TC_OPSTATE_BASE_2_6(self) -> list[TestStep]:
steps = [TestStep(1, "Commissioning, already done", is_commissioning=True),
TestStep(2, "Subscribe to CountdownTime attribute"),
TestStep(3, "Manually put the DUT into a state where it will use the CountdownTime attribute, "
"the initial value of the CountdownTime is greater than 30, "
"and it will begin counting down the CountdownTime attribute. "
"Test harness reads the CountdownTime attribute."),
TestStep(4, "Test harness reads the CountdownTime attribute."),
TestStep(5, "Over a period of 30 seconds, TH counts all report transactions with an attribute "
"report for the CountdownTime attribute in numberOfReportsReceived"),
TestStep(6, "Test harness reads the CountdownTime attribute."),
TestStep(7, "Until the current operation finishes, TH counts all report transactions with "
"an attribute report for the CountdownTime attribute in numberOfReportsReceived and saves up to 5 such reports."),
TestStep(8, "Manually put the DUT into a state where it will use the CountdownTime attribute, "
"the initial value of the CountdownTime is greater than 30, and it will begin counting down the CountdownTime attribute."
"Test harness reads the CountdownTime attribute."),
TestStep(9, "TH reads from the DUT the OperationalState attribute"),
TestStep(10, "Test harness reads the CountdownTime attribute."),
TestStep(11, "Manually put the device in the Paused(0x02) operational state")
]
return steps
async def TEST_TC_OPSTATE_BASE_2_6(self, endpoint=1):
cluster = self.test_info.cluster
attributes = cluster.Attributes
self.init_test()
# commission
self.step(1)
# Note that this does a subscribe-all instead of subscribing only to the CountdownTime attribute.
# To-Do: Update the TP to subscribe-all.
self.step(2)
sub_handler = ClusterAttributeChangeAccumulator(cluster)
await sub_handler.start(self.default_controller, self.dut_node_id, endpoint)
self.step(3)
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.M.ST_RUNNING")):
self.send_manual_or_pipe_command(name="OperationalStateChange",
device=self.device,
operation="Start",
msg="Put DUT in running state")
time.sleep(1)
await self.read_and_expect_value(endpoint=endpoint,
attribute=attributes.OperationalState,
expected_value=cluster.Enums.OperationalStateEnum.kRunning)
countdownTime = await self.read_expect_success(endpoint=endpoint, attribute=attributes.CountdownTime)
if countdownTime is not NullValue:
count = sub_handler.attribute_report_counts[attributes.CountdownTime]
asserts.assert_greater(count, 0, "Did not receive any reports for CountdownTime")
sub_handler.reset()
self.step(4)
countdownTime = await self.read_expect_success(endpoint=endpoint, attribute=attributes.CountdownTime)
if countdownTime is not NullValue:
self.step(5)
logging.info('Test will now collect data for 10 seconds')
time.sleep(10)
count = sub_handler.attribute_report_counts[attributes.CountdownTime]
sub_handler.reset()
asserts.assert_less_equal(count, 5, "Received more than 5 reports for CountdownTime")
asserts.assert_greater_equal(count, 0, "Did not receive any reports for CountdownTime")
else:
self.skip_step(5)
self.step(6)
countdownTime = await self.read_expect_success(endpoint=endpoint, attribute=attributes.CountdownTime)
attr_value = await self.read_expect_success(
endpoint=endpoint,
attribute=attributes.OperationalState)
if attr_value == cluster.Enums.OperationalStateEnum.kRunning and countdownTime is not NullValue:
self.step(7)
wait_count = 0
while (attr_value != cluster.Enums.OperationalStateEnum.kStopped) and (wait_count < 20):
time.sleep(1)
wait_count = wait_count + 1
attr_value = await self.read_expect_success(
endpoint=endpoint,
attribute=attributes.OperationalState)
count = sub_handler.attribute_report_counts[attributes.CountdownTime]
asserts.assert_less_equal(count, 5, "Received more than 5 reports for CountdownTime")
asserts.assert_greater(count, 0, "Did not receive any reports for CountdownTime")
else:
self.skip_step(7)
sub_handler.reset()
self.step(8)
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.M.ST_RUNNING")):
self.send_manual_or_pipe_command(name="OperationalStateChange",
device=self.device,
operation="Start",
msg="Put DUT in running state")
time.sleep(1)
await self.read_and_expect_value(endpoint=endpoint,
attribute=attributes.OperationalState,
expected_value=cluster.Enums.OperationalStateEnum.kRunning)
countdownTime = await self.read_expect_success(endpoint=endpoint, attribute=attributes.CountdownTime)
if countdownTime is not NullValue:
count = sub_handler.attribute_report_counts[attributes.CountdownTime]
asserts.assert_greater(count, 0, "Did not receive any reports for CountdownTime")
self.step(9)
await self.read_and_expect_value(endpoint=endpoint,
attribute=attributes.OperationalState,
expected_value=cluster.Enums.OperationalStateEnum.kRunning)
sub_handler.reset()
self.step(10)
countdownTime = await self.read_expect_success(endpoint=endpoint, attribute=attributes.CountdownTime)
if self.pics_guard(self.check_pics(f"{self.test_info.pics_code}.S.M.ST_PAUSED")) and countdownTime is not NullValue:
self.step(11)
self.send_manual_or_pipe_command(name="OperationalStateChange",
device=self.device,
operation="Pause",
msg="Put DUT in paused state")
time.sleep(1)
count = sub_handler.attribute_report_counts[attributes.CountdownTime]
asserts.assert_greater(count, 0, "Did not receive any reports for CountdownTime")
else:
self.skip_step(11)