blob: 9c07fd15de042f339f2efc176c4790f23fda1b19 [file] [log] [blame]
#
# Copyright (c) 2023 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
from time import sleep
import chip.clusters as Clusters
from matter_testing_support import MatterBaseTest, async_test_body, default_matter_test_main, type_matches
from mobly import asserts
# Takes an OpState or RvcOpState state enum and returns a string representation
def state_enum_to_text(state_enum):
try:
return f'{Clusters.RvcOperationalState.Enums.OperationalStateEnum(state_enum).name[1:]}(0x{state_enum:02x})'
except AttributeError:
return f'{Clusters.OperationalState.Enums.OperationalStateEnum(state_enum).name[1:]}(0x{state_enum:02x})'
# Takes an OpState or RvcOpState error enum and returns a string representation
def error_enum_to_text(error_enum):
try:
return f'{Clusters.RvcOperationalState.Enums.ErrorStateEnum(error_enum).name[1:]}(0x{error_enum:02x})'
except AttributeError:
return f'{Clusters.OperationalState.Enums.ErrorStateEnum(error_enum).name[1:]}(0x{error_enum:02x})'
class TC_RVCOPSTATE_2_4(MatterBaseTest):
def __init__(self, *args):
super().__init__(*args)
self.endpoint = None
self.is_ci = False
self.app_pipe = "/tmp/chip_rvc_fifo_"
async def read_mod_attribute_expect_success(self, endpoint, attribute):
cluster = Clusters.Objects.RvcOperationalState
return await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=attribute)
async def send_go_home_cmd(self) -> Clusters.Objects.RvcOperationalState.Commands.OperationalCommandResponse:
ret = await self.send_single_cmd(cmd=Clusters.Objects.RvcOperationalState.Commands.GoHome(), endpoint=self.endpoint)
asserts.assert_true(type_matches(ret, Clusters.Objects.RvcOperationalState.Commands.OperationalCommandResponse),
"Unexpected return type for GoHome")
return ret
# Sends the GoHome command and checks that the returned error matches the expected_error
async def send_go_home_cmd_with_check(self, step_number, expected_error):
self.print_step(step_number, "Send GoHome command")
ret = await self.send_go_home_cmd()
asserts.assert_equal(ret.commandResponseState.errorStateID, expected_error,
"errorStateID(%s) should be %s" % (ret.commandResponseState.errorStateID,
error_enum_to_text(expected_error)))
# Prints the step number, reads the operational state attribute and checks if it matches with expected_state
async def read_operational_state_with_check(self, step_number, expected_state):
self.print_step(step_number, "Read OperationalState")
operational_state = await self.read_mod_attribute_expect_success(
endpoint=self.endpoint, attribute=Clusters.RvcOperationalState.Attributes.OperationalState)
logging.info("OperationalState: %s" % operational_state)
asserts.assert_equal(operational_state, expected_state,
"OperationalState(%s) should be %s" % (operational_state, state_enum_to_text(expected_state)))
# Sends an RvcRunMode Change to mode command
async def send_run_change_to_mode_cmd(self, new_mode):
await self.send_single_cmd(cmd=Clusters.Objects.RvcRunMode.Commands.ChangeToMode(newMode=new_mode),
endpoint=self.endpoint)
# Sends an out-of-band command to the rvc-app
def write_to_app_pipe(self, command):
with open(self.app_pipe, "w") as app_pipe:
app_pipe.write(command + "\n")
# Delay for pipe command to be processed (otherwise tests are flaky)
# TODO(#31239): centralize pipe write logic and remove the need of sleep
sleep(0.001)
def pics_TC_RVCOPSTATE_2_4(self) -> list[str]:
return ["RVCOPSTATE.S"]
@async_test_body
async def test_TC_RVCOPSTATE_2_4(self):
self.endpoint = self.matter_test_config.endpoint
asserts.assert_false(self.endpoint is None, "--endpoint <endpoint> must be included on the command line in.")
self.is_ci = self.check_pics("PICS_SDK_CI_ONLY")
if self.is_ci:
app_pid = self.matter_test_config.app_pid
if app_pid == 0:
asserts.fail("The --app-pid flag must be set when PICS_SDK_CI_ONLY is set.c")
self.app_pipe = self.app_pipe + str(app_pid)
asserts.assert_true(self.check_pics("RVCOPSTATE.S.A0004"), "RVCOPSTATE.S.A0004 must be supported")
asserts.assert_true(self.check_pics("RVCOPSTATE.S.C04.Tx"), "RVCOPSTATE.S.C04.Tx must be supported")
asserts.assert_true(self.check_pics("RVCOPSTATE.S.C128.Rsp"), "RVCOPSTATE.S.C128.Rsp must be supported")
op_states = Clusters.OperationalState.Enums.OperationalStateEnum
rvc_op_states = Clusters.RvcOperationalState.Enums.OperationalStateEnum
op_errors = Clusters.OperationalState.Enums.ErrorStateEnum
# These are the mode values used by the RVC example app that is used in CI.
rvc_app_run_mode_idle = 0
rvc_app_run_mode_cleaning = 1
self.print_step(1, "Commissioning, already done")
# Ensure that the device is in the correct state
if self.is_ci:
self.write_to_app_pipe('{"Name": "Reset"}')
if self.check_pics("RVCOPSTATE.S.M.ST_ERROR"):
step_name = "Manually put the device in the ERROR operational state"
self.print_step(2, step_name)
if self.is_ci:
self.write_to_app_pipe('{"Name": "ErrorEvent", "Error": "UnableToStartOrResume"}')
else:
self.wait_for_user_input(step_name)
await self.read_operational_state_with_check(3, op_states.kError)
await self.send_go_home_cmd_with_check(4, op_errors.kCommandInvalidInState)
if self.check_pics("RVCOPSTATE.S.M.ST_CHARGING"):
step_name = "Manually put the device in the CHARGING operational state"
self.print_step(5, step_name)
if self.is_ci:
self.write_to_app_pipe('{"Name": "Reset"}')
self.write_to_app_pipe('{"Name": "Docked"}')
self.write_to_app_pipe('{"Name": "Charging"}')
else:
self.wait_for_user_input(step_name)
await self.read_operational_state_with_check(6, rvc_op_states.kCharging)
await self.send_go_home_cmd_with_check(7, op_errors.kCommandInvalidInState)
if self.check_pics("RVCOPSTATE.S.M.ST_DOCKED"):
step_name = "Manually put the device in the DOCKED operational state"
self.print_step(8, step_name)
if self.is_ci:
self.write_to_app_pipe('{"Name": "Charged"}')
else:
self.wait_for_user_input(step_name)
await self.read_operational_state_with_check(9, rvc_op_states.kDocked)
await self.send_go_home_cmd_with_check(10, op_errors.kCommandInvalidInState)
if self.check_pics("PICS_M_ST_SEEKING_CHARGER"):
step_name = "Manually put the device in the SEEKING CHARGER operational state"
self.print_step(8, step_name)
if self.is_ci:
await self.send_run_change_to_mode_cmd(rvc_app_run_mode_cleaning)
await self.send_run_change_to_mode_cmd(rvc_app_run_mode_idle)
else:
self.wait_for_user_input(step_name)
await self.read_operational_state_with_check(9, rvc_op_states.kSeekingCharger)
await self.send_go_home_cmd_with_check(10, op_errors.kNoError)
if __name__ == "__main__":
default_matter_test_main()