| # |
| # Copyright (c) 2023 Project CHIP Authors |
| # All rights reserved. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| import logging |
| from time import sleep |
| |
| import chip.clusters as Clusters |
| from matter_testing_support import MatterBaseTest, async_test_body, default_matter_test_main, type_matches |
| from mobly import asserts |
| |
| |
| # Takes an OpState or RvcOpState state enum and returns a string representation |
| def state_enum_to_text(state_enum): |
| try: |
| return f'{Clusters.RvcOperationalState.Enums.OperationalStateEnum(state_enum).name[1:]}(0x{state_enum:02x})' |
| except AttributeError: |
| return f'{Clusters.OperationalState.Enums.OperationalStateEnum(state_enum).name[1:]}(0x{state_enum:02x})' |
| |
| |
| # Takes an OpState or RvcOpState error enum and returns a string representation |
| def error_enum_to_text(error_enum): |
| try: |
| return f'{Clusters.RvcOperationalState.Enums.ErrorStateEnum(error_enum).name[1:]}(0x{error_enum:02x})' |
| except AttributeError: |
| return f'{Clusters.OperationalState.Enums.ErrorStateEnum(error_enum).name[1:]}(0x{error_enum:02x})' |
| |
| |
| class TC_RVCOPSTATE_2_4(MatterBaseTest): |
| def __init__(self, *args): |
| super().__init__(*args) |
| self.endpoint = None |
| self.is_ci = False |
| self.app_pipe = "/tmp/chip_rvc_fifo_" |
| |
| async def read_mod_attribute_expect_success(self, endpoint, attribute): |
| cluster = Clusters.Objects.RvcOperationalState |
| return await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=attribute) |
| |
| async def send_go_home_cmd(self) -> Clusters.Objects.RvcOperationalState.Commands.OperationalCommandResponse: |
| ret = await self.send_single_cmd(cmd=Clusters.Objects.RvcOperationalState.Commands.GoHome(), endpoint=self.endpoint) |
| asserts.assert_true(type_matches(ret, Clusters.Objects.RvcOperationalState.Commands.OperationalCommandResponse), |
| "Unexpected return type for GoHome") |
| return ret |
| |
| # Sends the GoHome command and checks that the returned error matches the expected_error |
| async def send_go_home_cmd_with_check(self, step_number, expected_error): |
| self.print_step(step_number, "Send GoHome command") |
| ret = await self.send_go_home_cmd() |
| asserts.assert_equal(ret.commandResponseState.errorStateID, expected_error, |
| "errorStateID(%s) should be %s" % (ret.commandResponseState.errorStateID, |
| error_enum_to_text(expected_error))) |
| |
| # Prints the step number, reads the operational state attribute and checks if it matches with expected_state |
| async def read_operational_state_with_check(self, step_number, expected_state): |
| self.print_step(step_number, "Read OperationalState") |
| operational_state = await self.read_mod_attribute_expect_success( |
| endpoint=self.endpoint, attribute=Clusters.RvcOperationalState.Attributes.OperationalState) |
| logging.info("OperationalState: %s" % operational_state) |
| asserts.assert_equal(operational_state, expected_state, |
| "OperationalState(%s) should be %s" % (operational_state, state_enum_to_text(expected_state))) |
| |
| # Sends an RvcRunMode Change to mode command |
| async def send_run_change_to_mode_cmd(self, new_mode): |
| await self.send_single_cmd(cmd=Clusters.Objects.RvcRunMode.Commands.ChangeToMode(newMode=new_mode), |
| endpoint=self.endpoint) |
| |
| # Sends an out-of-band command to the rvc-app |
| def write_to_app_pipe(self, command): |
| with open(self.app_pipe, "w") as app_pipe: |
| app_pipe.write(command + "\n") |
| # Delay for pipe command to be processed (otherwise tests are flaky) |
| # TODO(#31239): centralize pipe write logic and remove the need of sleep |
| sleep(0.001) |
| |
| def pics_TC_RVCOPSTATE_2_4(self) -> list[str]: |
| return ["RVCOPSTATE.S"] |
| |
| @async_test_body |
| async def test_TC_RVCOPSTATE_2_4(self): |
| self.endpoint = self.matter_test_config.endpoint |
| asserts.assert_false(self.endpoint is None, "--endpoint <endpoint> must be included on the command line in.") |
| self.is_ci = self.check_pics("PICS_SDK_CI_ONLY") |
| if self.is_ci: |
| app_pid = self.matter_test_config.app_pid |
| if app_pid == 0: |
| asserts.fail("The --app-pid flag must be set when PICS_SDK_CI_ONLY is set.c") |
| self.app_pipe = self.app_pipe + str(app_pid) |
| |
| asserts.assert_true(self.check_pics("RVCOPSTATE.S.A0004"), "RVCOPSTATE.S.A0004 must be supported") |
| asserts.assert_true(self.check_pics("RVCOPSTATE.S.C04.Tx"), "RVCOPSTATE.S.C04.Tx must be supported") |
| asserts.assert_true(self.check_pics("RVCOPSTATE.S.C128.Rsp"), "RVCOPSTATE.S.C128.Rsp must be supported") |
| |
| op_states = Clusters.OperationalState.Enums.OperationalStateEnum |
| rvc_op_states = Clusters.RvcOperationalState.Enums.OperationalStateEnum |
| op_errors = Clusters.OperationalState.Enums.ErrorStateEnum |
| |
| # These are the mode values used by the RVC example app that is used in CI. |
| rvc_app_run_mode_idle = 0 |
| rvc_app_run_mode_cleaning = 1 |
| |
| self.print_step(1, "Commissioning, already done") |
| |
| # Ensure that the device is in the correct state |
| if self.is_ci: |
| self.write_to_app_pipe('{"Name": "Reset"}') |
| |
| if self.check_pics("RVCOPSTATE.S.M.ST_ERROR"): |
| step_name = "Manually put the device in the ERROR operational state" |
| self.print_step(2, step_name) |
| if self.is_ci: |
| self.write_to_app_pipe('{"Name": "ErrorEvent", "Error": "UnableToStartOrResume"}') |
| else: |
| self.wait_for_user_input(step_name) |
| |
| await self.read_operational_state_with_check(3, op_states.kError) |
| |
| await self.send_go_home_cmd_with_check(4, op_errors.kCommandInvalidInState) |
| |
| if self.check_pics("RVCOPSTATE.S.M.ST_CHARGING"): |
| step_name = "Manually put the device in the CHARGING operational state" |
| self.print_step(5, step_name) |
| if self.is_ci: |
| self.write_to_app_pipe('{"Name": "Reset"}') |
| self.write_to_app_pipe('{"Name": "Docked"}') |
| self.write_to_app_pipe('{"Name": "Charging"}') |
| else: |
| self.wait_for_user_input(step_name) |
| |
| await self.read_operational_state_with_check(6, rvc_op_states.kCharging) |
| |
| await self.send_go_home_cmd_with_check(7, op_errors.kCommandInvalidInState) |
| |
| if self.check_pics("RVCOPSTATE.S.M.ST_DOCKED"): |
| step_name = "Manually put the device in the DOCKED operational state" |
| self.print_step(8, step_name) |
| if self.is_ci: |
| self.write_to_app_pipe('{"Name": "Charged"}') |
| else: |
| self.wait_for_user_input(step_name) |
| |
| await self.read_operational_state_with_check(9, rvc_op_states.kDocked) |
| |
| await self.send_go_home_cmd_with_check(10, op_errors.kCommandInvalidInState) |
| |
| if self.check_pics("PICS_M_ST_SEEKING_CHARGER"): |
| step_name = "Manually put the device in the SEEKING CHARGER operational state" |
| self.print_step(8, step_name) |
| if self.is_ci: |
| await self.send_run_change_to_mode_cmd(rvc_app_run_mode_cleaning) |
| await self.send_run_change_to_mode_cmd(rvc_app_run_mode_idle) |
| else: |
| self.wait_for_user_input(step_name) |
| |
| await self.read_operational_state_with_check(9, rvc_op_states.kSeekingCharger) |
| |
| await self.send_go_home_cmd_with_check(10, op_errors.kNoError) |
| |
| |
| if __name__ == "__main__": |
| default_matter_test_main() |