blob: f41e6bcb0de8b17b77eaffdb7539ddcd8ea89aff [file] [log] [blame]
#
# Copyright (c) 2023 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
from time import sleep
import chip.clusters as Clusters
from chip.clusters.Types import NullValue
from matter_testing_support import MatterBaseTest, async_test_body, default_matter_test_main, type_matches
from mobly import asserts
# Takes an OpState or RvcOpState state enum and returns a string representation
def state_enum_to_text(state_enum):
if state_enum == Clusters.OperationalState.Enums.OperationalStateEnum.kStopped:
return "Stopped(0x00)"
elif state_enum == Clusters.OperationalState.Enums.OperationalStateEnum.kRunning:
return "Running(0x01)"
elif state_enum == Clusters.OperationalState.Enums.OperationalStateEnum.kPaused:
return "Paused(0x02)"
elif state_enum == Clusters.OperationalState.Enums.OperationalStateEnum.kError:
return "Error(0x03)"
elif state_enum == Clusters.RvcOperationalState.Enums.OperationalStateEnum.kSeekingCharger:
return "SeekingCharger(0x40)"
elif state_enum == Clusters.RvcOperationalState.Enums.OperationalStateEnum.kCharging:
return "Charging(0x41)"
elif state_enum == Clusters.RvcOperationalState.Enums.OperationalStateEnum.kDocked:
return "Docked(0x42)"
else:
return "UnknownEnumValue"
# Takes an OpState or RvcOpState error enum and returns a string representation
def error_enum_to_text(error_enum):
if error_enum == Clusters.OperationalState.Enums.ErrorStateEnum.kNoError:
return "NoError(0x00)"
elif error_enum == Clusters.OperationalState.Enums.ErrorStateEnum.kUnableToStartOrResume:
return "UnableToStartOrResume(0x01)"
elif error_enum == Clusters.OperationalState.Enums.ErrorStateEnum.kUnableToCompleteOperation:
return "UnableToCompleteOperation(0x02)"
elif error_enum == Clusters.OperationalState.Enums.ErrorStateEnum.kCommandInvalidInState:
return "CommandInvalidInState(0x03)"
elif error_enum == Clusters.RvcOperationalState.Enums.ErrorStateEnum.kFailedToFindChargingDock:
return "FailedToFindChargingDock(0x40)"
elif error_enum == Clusters.RvcOperationalState.Enums.ErrorStateEnum.kStuck:
return "Stuck(0x41)"
elif error_enum == Clusters.RvcOperationalState.Enums.ErrorStateEnum.kDustBinMissing:
return "DustBinMissing(0x42)"
elif error_enum == Clusters.RvcOperationalState.Enums.ErrorStateEnum.kDustBinFull:
return "DustBinFull(0x43)"
elif error_enum == Clusters.RvcOperationalState.Enums.ErrorStateEnum.kWaterTankEmpty:
return "WaterTankEmpty(0x44)"
elif error_enum == Clusters.RvcOperationalState.Enums.ErrorStateEnum.kWaterTankMissing:
return "WaterTankMissing(0x45)"
elif error_enum == Clusters.RvcOperationalState.Enums.ErrorStateEnum.kWaterTankLidOpen:
return "WaterTankLidOpen(0x46)"
elif error_enum == Clusters.RvcOperationalState.Enums.ErrorStateEnum.kMopCleaningPadMissing:
return "MopCleaningPadMissing(0x47)"
class TC_RVCOPSTATE_2_3(MatterBaseTest):
def __init__(self, *args):
super().__init__(*args)
self.endpoint = None
self.is_ci = False
self.app_pipe = "/tmp/chip_rvc_fifo_"
async def read_mod_attribute_expect_success(self, endpoint, attribute):
cluster = Clusters.Objects.RvcOperationalState
return await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=attribute)
async def send_pause_cmd(self) -> Clusters.Objects.RvcOperationalState.Commands.OperationalCommandResponse:
ret = await self.send_single_cmd(cmd=Clusters.Objects.RvcOperationalState.Commands.Pause(), endpoint=self.endpoint)
asserts.assert_true(type_matches(ret, Clusters.Objects.RvcOperationalState.Commands.OperationalCommandResponse),
"Unexpected return type for Pause")
return ret
async def send_resume_cmd(self) -> Clusters.Objects.RvcOperationalState.Commands.OperationalCommandResponse:
ret = await self.send_single_cmd(cmd=Clusters.Objects.RvcOperationalState.Commands.Resume(), endpoint=self.endpoint)
asserts.assert_true(type_matches(ret, Clusters.Objects.RvcOperationalState.Commands.OperationalCommandResponse),
"Unexpected return type for Resume")
return ret
# Prints the step number, reads the operational state attribute and checks if it matches with expected_state
async def read_operational_state_with_check(self, step_number, expected_state):
self.print_step(step_number, "Read OperationalState")
operational_state = await self.read_mod_attribute_expect_success(
endpoint=self.endpoint, attribute=Clusters.RvcOperationalState.Attributes.OperationalState)
logging.info("OperationalState: %s" % operational_state)
asserts.assert_equal(operational_state, expected_state,
"OperationalState(%s) should be %s" % (operational_state, state_enum_to_text(expected_state)))
# Sends the Pause command and checks that the returned error matches the expected_error
async def send_pause_cmd_with_check(self, step_number, expected_error):
self.print_step(step_number, "Send Pause command")
ret = await self.send_pause_cmd()
asserts.assert_equal(ret.commandResponseState.errorStateID, expected_error,
"errorStateID(%s) should be %s" % (ret.commandResponseState.errorStateID,
error_enum_to_text(expected_error)))
# Sends the Resume command and checks that the returned error matches the expected_error
async def send_resume_cmd_with_check(self, step_number, expected_error):
self.print_step(step_number, "Send Pause command")
ret = await self.send_resume_cmd()
asserts.assert_equal(ret.commandResponseState.errorStateID, expected_error,
"errorStateID(%s) should be %s" % (ret.commandResponseState.errorStateID,
error_enum_to_text(expected_error)))
async def send_run_change_to_mode_cmd(self, new_mode) -> Clusters.Objects.RvcRunMode.Commands.ChangeToModeResponse:
ret = await self.send_single_cmd(cmd=Clusters.Objects.RvcRunMode.Commands.ChangeToMode(newMode=new_mode),
endpoint=self.endpoint)
return ret
# Sends and out-of-band command to the rvc-app
def write_to_app_pipe(self, command):
with open(self.app_pipe, "w") as app_pipe:
app_pipe.write(command + "\n")
# Delay for pipe command to be processed (otherwise tests are flaky)
# TODO(#31239): centralize pipe write logic and remove the need of sleep
sleep(0.001)
# Prints the instruction and waits for a user input to continue
def print_instruction(self, step_number, instruction):
self.print_step(step_number, instruction)
input("Press Enter when done.\n")
def pics_TC_RVCOPSTATE_2_3(self) -> list[str]:
return ["RVCOPSTATE.S"]
@async_test_body
async def test_TC_RVCOPSTATE_2_3(self):
self.endpoint = self.matter_test_config.endpoint
asserts.assert_false(self.endpoint is None, "--endpoint <endpoint> must be included on the command line in.")
self.is_ci = self.check_pics("PICS_SDK_CI_ONLY")
if self.is_ci:
app_pid = self.matter_test_config.app_pid
if app_pid == 0:
asserts.fail("The --app-pid flag must be set when PICS_SDK_CI_ONLY is set.c")
self.app_pipe = self.app_pipe + str(app_pid)
asserts.assert_true(self.check_pics("RVCOPSTATE.S.A0003"), "RVCOPSTATE.S.A0003 must be supported")
asserts.assert_true(self.check_pics("RVCOPSTATE.S.A0004"), "RVCOPSTATE.S.A0004 must be supported")
asserts.assert_true(self.check_pics("RVCOPSTATE.S.C00.Rsp"), "RVCOPSTATE.S.C00.Rsp must be supported")
asserts.assert_true(self.check_pics("RVCOPSTATE.S.C03.Rsp"), "RVCOPSTATE.S.C03.Rsp must be supported")
# This command SHALL be supported by an implementation if any of the other commands are supported (6.5)
asserts.assert_true(self.check_pics("RVCOPSTATE.S.C04.Tx"), "RVCOPSTATE.S.C04.Tx must be supported")
attributes = Clusters.RvcOperationalState.Attributes
op_states = Clusters.OperationalState.Enums.OperationalStateEnum
rvc_op_states = Clusters.RvcOperationalState.Enums.OperationalStateEnum
op_errors = Clusters.OperationalState.Enums.ErrorStateEnum
self.print_step(1, "Commissioning, already done")
# Ensure that the device is in the correct state
if self.is_ci:
self.write_to_app_pipe('{"Name": "Reset"}')
self.print_step(2, "Manually put the device in a state where it can receive a Pause command")
if self.is_ci:
await self.send_run_change_to_mode_cmd(1)
else:
input("Press Enter when done.\n")
self.print_step(3, "Read OperationalStateList attribute")
op_state_list = await self.read_mod_attribute_expect_success(endpoint=self.endpoint,
attribute=attributes.OperationalStateList)
logging.info("OperationalStateList: %s" % (op_state_list))
defined_states = [state.value for state in Clusters.OperationalState.Enums.OperationalStateEnum
if state is not Clusters.OperationalState.Enums.OperationalStateEnum.kUnknownEnumValue]
state_ids = set([s.operationalStateID for s in op_state_list])
asserts.assert_true(all(id in state_ids for id in defined_states), "OperationalStateList is missing a required entry")
self.print_step(4, "Read OperationalState")
old_opstate_dut = await self.read_mod_attribute_expect_success(endpoint=self.endpoint,
attribute=attributes.OperationalState)
logging.info("OperationalState: %s" % old_opstate_dut)
await self.send_pause_cmd_with_check(5, op_errors.kNoError)
await self.read_operational_state_with_check(6, op_states.kPaused)
if self.check_pics("RVCOPSTATE.S.A0002"):
self.print_step(7, "Read CountdownTime attribute")
initial_countdown_time = await self.read_mod_attribute_expect_success(endpoint=self.endpoint,
attribute=attributes.CountdownTime)
logging.info("CountdownTime: %s" % initial_countdown_time)
if initial_countdown_time is not NullValue:
in_range = (1 <= initial_countdown_time <= 259200)
asserts.assert_true(initial_countdown_time is NullValue or in_range,
"invalid CountdownTime(%s). Must be in between 1 and 259200, or null " % initial_countdown_time)
self.print_step(8, "Waiting for 5 seconds")
sleep(5)
self.print_step(9, "Read CountdownTime attribute")
countdown_time = await self.read_mod_attribute_expect_success(endpoint=self.endpoint, attribute=attributes.CountdownTime)
logging.info("CountdownTime: %s" % countdown_time)
asserts.assert_true(countdown_time != 0 or countdown_time == NullValue,
"invalid CountdownTime(%s). Must be a non zero integer, or null" % countdown_time)
asserts.assert_equal(countdown_time, initial_countdown_time, "CountdownTime(%s) not equal to the initial CountdownTime(%s)"
% (countdown_time, initial_countdown_time))
await self.send_pause_cmd_with_check(10, op_errors.kNoError)
await self.send_resume_cmd_with_check(11, op_errors.kNoError)
self.print_step(12, "Read OperationalState attribute")
operational_state = await self.read_mod_attribute_expect_success(endpoint=self.endpoint,
attribute=attributes.OperationalState)
logging.info("OperationalState: %s" % operational_state)
asserts.assert_equal(operational_state, old_opstate_dut,
"OperationalState(%s) should be the state before pause (%s)" % (operational_state, old_opstate_dut))
await self.send_resume_cmd_with_check(13, op_errors.kNoError)
if self.check_pics("RVCOPSTATE.S.M.RESUME_AFTER_ERR"):
self.print_instruction(16, "Manually put the device in the Running state")
await self.read_operational_state_with_check(17, op_states.kRunning)
self.print_instruction(
18, "Manually cause the device to pause running due to an error, and be able to resume after clearing the error")
await self.read_operational_state_with_check(19, op_states.kError)
self.print_instruction(20, "Manually clear the error")
await self.read_operational_state_with_check(21, op_states.kPaused)
await self.send_resume_cmd_with_check(22, op_errors.kNoError)
await self.read_operational_state_with_check(23, op_states.kRunning)
if self.check_pics("RVCOPSTATE.S.M.ST_STOPPED"):
self.print_step(24, "Manually put the device in the Stopped(0x00) operational state")
if self.is_ci:
self.write_to_app_pipe('{"Name": "Reset"}')
else:
input("Press Enter when done.\n")
await self.read_operational_state_with_check(25, op_states.kStopped)
await self.send_pause_cmd_with_check(26, op_errors.kCommandInvalidInState)
await self.send_resume_cmd_with_check(27, op_errors.kCommandInvalidInState)
if self.check_pics("RVCOPSTATE.S.M.ST_ERROR"):
self.print_step(28, "Manually put the device in the Error(0x03) operational state")
if self.is_ci:
self.write_to_app_pipe('{"Name": "ErrorEvent", "Error": "Stuck"}')
else:
input("Press Enter when done.\n")
await self.read_operational_state_with_check(29, op_states.kError)
await self.send_pause_cmd_with_check(30, op_errors.kCommandInvalidInState)
await self.send_resume_cmd_with_check(31, op_errors.kCommandInvalidInState)
if self.check_pics("RVCOPSTATE.S.M.ST_CHARGING"):
self.print_step(32, "Manually put the device in the Charging(0x41) operational state")
if self.is_ci:
self.write_to_app_pipe('{"Name": "Reset"}')
await self.send_run_change_to_mode_cmd(1)
await self.send_run_change_to_mode_cmd(0)
self.write_to_app_pipe('{"Name": "ChargerFound"}')
else:
input("Press Enter when done.\n")
await self.read_operational_state_with_check(33, rvc_op_states.kCharging)
await self.send_pause_cmd_with_check(34, op_errors.kCommandInvalidInState)
self.print_step(
35, "Manually put the device in the Charging(0x41) operational state and RVC Run Mode cluster's CurrentMode attribute set to a mode with the Idle mode tag")
if not self.is_ci:
input("Press Enter when done.\n")
await self.read_operational_state_with_check(36, rvc_op_states.kCharging)
await self.send_resume_cmd_with_check(37, op_errors.kCommandInvalidInState)
if self.check_pics("RVCOPSTATE.S.M.ST_DOCKED"):
self.print_step(38, "Manually put the device in the Docked(0x42) operational state")
if self.is_ci:
self.write_to_app_pipe('{"Name": "Charged"}')
else:
input("Press Enter when done.\n")
await self.read_operational_state_with_check(39, rvc_op_states.kDocked)
await self.send_pause_cmd_with_check(40, op_errors.kCommandInvalidInState)
self.print_step(
41, "Manually put the device in the Docked(0x42) operational state and RVC Run Mode cluster's CurrentMode attribute set to a mode with the Idle mode tag")
if not self.is_ci:
input("Press Enter when done.\n")
await self.send_resume_cmd_with_check(42, op_errors.kCommandInvalidInState)
if self.check_pics("RVCOPSTATE.S.M.ST_SEEKING_CHARGER"):
self.print_step(43, "Manually put the device in the SeekingCharger(0x40) operational state")
if self.is_ci:
await self.send_run_change_to_mode_cmd(1)
await self.send_run_change_to_mode_cmd(0)
else:
input("Press Enter when done.\n")
await self.read_operational_state_with_check(44, rvc_op_states.kSeekingCharger)
await self.send_resume_cmd_with_check(45, op_errors.kCommandInvalidInState)
if __name__ == "__main__":
default_matter_test_main()