Update TC_RVCOPSTATE_2_4 following test plan changes (#32072)
* Updated the TC_RVCOPSTATE_2_4 python test script following changes in the test plan.
* Enabled TC_RVCOPSTATE_2_4 to run in CI agianst the rvc-app.
* Future proofed the enum to text functions.
* Restyled by autopep8
* Update src/python_testing/TC_RVCOPSTATE_2_4.py
Co-authored-by: Petru Lauric <81822411+plauric@users.noreply.github.com>
---------
Co-authored-by: Restyled.io <commits@restyled.io>
Co-authored-by: Petru Lauric <81822411+plauric@users.noreply.github.com>
diff --git a/src/python_testing/TC_RVCOPSTATE_2_4.py b/src/python_testing/TC_RVCOPSTATE_2_4.py
index ddcb134..3314eaa 100644
--- a/src/python_testing/TC_RVCOPSTATE_2_4.py
+++ b/src/python_testing/TC_RVCOPSTATE_2_4.py
@@ -24,59 +24,26 @@
# Takes an OpState or RvcOpState state enum and returns a string representation
def state_enum_to_text(state_enum):
- if state_enum == Clusters.OperationalState.Enums.OperationalStateEnum.kStopped:
- return "Stopped(0x00)"
- elif state_enum == Clusters.OperationalState.Enums.OperationalStateEnum.kRunning:
- return "Running(0x01)"
- elif state_enum == Clusters.OperationalState.Enums.OperationalStateEnum.kPaused:
- return "Paused(0x02)"
- elif state_enum == Clusters.OperationalState.Enums.OperationalStateEnum.kError:
- return "Error(0x03)"
- elif state_enum == Clusters.RvcOperationalState.Enums.OperationalStateEnum.kSeekingCharger:
- return "SeekingCharger(0x40)"
- elif state_enum == Clusters.RvcOperationalState.Enums.OperationalStateEnum.kCharging:
- return "Charging(0x41)"
- elif state_enum == Clusters.RvcOperationalState.Enums.OperationalStateEnum.kDocked:
- return "Docked(0x42)"
- else:
- return "UnknownEnumValue"
+ try:
+ return f'{Clusters.RvcOperationalState.Enums.OperationalStateEnum(state_enum).name[1:]}(0x{state_enum:02x})'
+ except AttributeError:
+ return f'{Clusters.OperationalState.Enums.OperationalStateEnum(state_enum).name[1:]}(0x{state_enum:02x})'
# Takes an OpState or RvcOpState error enum and returns a string representation
def error_enum_to_text(error_enum):
- if error_enum == Clusters.OperationalState.Enums.ErrorStateEnum.kNoError:
- return "NoError(0x00)"
- elif error_enum == Clusters.OperationalState.Enums.ErrorStateEnum.kUnableToStartOrResume:
- return "UnableToStartOrResume(0x01)"
- elif error_enum == Clusters.OperationalState.Enums.ErrorStateEnum.kUnableToCompleteOperation:
- return "UnableToCompleteOperation(0x02)"
- elif error_enum == Clusters.OperationalState.Enums.ErrorStateEnum.kCommandInvalidInState:
- return "CommandInvalidInState(0x03)"
- elif error_enum == Clusters.RvcOperationalState.Enums.ErrorStateEnum.kFailedToFindChargingDock:
- return "FailedToFindChargingDock(0x40)"
- elif error_enum == Clusters.RvcOperationalState.Enums.ErrorStateEnum.kStuck:
- return "Stuck(0x41)"
- elif error_enum == Clusters.RvcOperationalState.Enums.ErrorStateEnum.kDustBinMissing:
- return "DustBinMissing(0x42)"
- elif error_enum == Clusters.RvcOperationalState.Enums.ErrorStateEnum.kDustBinFull:
- return "DustBinFull(0x43)"
- elif error_enum == Clusters.RvcOperationalState.Enums.ErrorStateEnum.kWaterTankEmpty:
- return "WaterTankEmpty(0x44)"
- elif error_enum == Clusters.RvcOperationalState.Enums.ErrorStateEnum.kWaterTankMissing:
- return "WaterTankMissing(0x45)"
- elif error_enum == Clusters.RvcOperationalState.Enums.ErrorStateEnum.kWaterTankLidOpen:
- return "WaterTankLidOpen(0x46)"
- elif error_enum == Clusters.RvcOperationalState.Enums.ErrorStateEnum.kMopCleaningPadMissing:
- return "MopCleaningPadMissing(0x47)"
-
- def pics_TC_RVCOPSTATE_2_4(self) -> list[str]:
- return ["RVCOPSTATE.S"]
+ try:
+ return f'{Clusters.RvcOperationalState.Enums.ErrorStateEnum(error_enum).name[1:]}(0x{error_enum:02x})'
+ except AttributeError:
+ return f'{Clusters.OperationalState.Enums.ErrorStateEnum(error_enum).name[1:]}(0x{error_enum:02x})'
class TC_RVCOPSTATE_2_4(MatterBaseTest):
def __init__(self, *args):
super().__init__(*args)
self.endpoint = None
+ self.is_ci = False
+ self.app_pipe = "/tmp/chip_rvc_fifo_"
async def read_mod_attribute_expect_success(self, endpoint, attribute):
cluster = Clusters.Objects.RvcOperationalState
@@ -105,15 +72,29 @@
asserts.assert_equal(operational_state, expected_state,
"OperationalState(%s) should be %s" % (operational_state, state_enum_to_text(expected_state)))
- # Prints the instruction and waits for a user input to continue
- def print_instruction(self, step_number, instruction):
- self.print_step(step_number, instruction)
- input("Press Enter when done.\n")
+ # Sends an RvcRunMode Change to mode command
+ async def send_run_change_to_mode_cmd(self, new_mode):
+ await self.send_single_cmd(cmd=Clusters.Objects.RvcRunMode.Commands.ChangeToMode(newMode=new_mode),
+ endpoint=self.endpoint)
+
+ # Sends an out-of-band command to the rvc-app
+ def write_to_app_pipe(self, command):
+ with open(self.app_pipe, "w") as app_pipe:
+ app_pipe.write(command + "\n")
+
+ def pics_TC_RVCOPSTATE_2_4(self) -> list[str]:
+ return ["RVCOPSTATE.S"]
@async_test_body
async def test_TC_RVCOPSTATE_2_4(self):
self.endpoint = self.matter_test_config.endpoint
asserts.assert_false(self.endpoint is None, "--endpoint <endpoint> must be included on the command line in.")
+ self.is_ci = self.check_pics("PICS_SDK_CI_ONLY")
+ if self.is_ci:
+ app_pid = self.matter_test_config.app_pid
+ if app_pid == 0:
+ asserts.fail("The --app-pid flag must be set when PICS_SDK_CI_ONLY is set.c")
+ self.app_pipe = self.app_pipe + str(app_pid)
asserts.assert_true(self.check_pics("RVCOPSTATE.S.A0004"), "RVCOPSTATE.S.A0004 must be supported")
asserts.assert_true(self.check_pics("RVCOPSTATE.S.C04.Tx"), "RVCOPSTATE.S.C04.Tx must be supported")
@@ -123,55 +104,62 @@
rvc_op_states = Clusters.RvcOperationalState.Enums.OperationalStateEnum
op_errors = Clusters.OperationalState.Enums.ErrorStateEnum
+ # These are the mode values used by the RVC example app that is used in CI.
+ rvc_app_run_mode_idle = 0
+ rvc_app_run_mode_cleaning = 1
+
self.print_step(1, "Commissioning, already done")
- if self.check_pics("RVCOPSTATE.S.M.ST_STOPPED"):
- self.print_instruction(2, "Manually put the device in the STOPPED operational state")
+ # Ensure that the device is in the correct state
+ if self.is_ci:
+ self.write_to_app_pipe('{"Name": "Reset"}')
- await self.read_operational_state_with_check(3, op_states.kStopped)
+ if self.check_pics("RVCOPSTATE.S.M.ST_ERROR"):
+ self.print_step(2, "Manually put the device in the ERROR operational state")
+ if self.is_ci:
+ self.write_to_app_pipe('{"Name": "ErrorEvent", "Error": "UnableToStartOrResume"}')
+ else:
+ input("Press Enter when done.\n")
- await self.send_go_home_cmd_with_check(4, op_errors.kNoError)
+ await self.read_operational_state_with_check(3, op_states.kError)
- await self.read_operational_state_with_check(5, rvc_op_states.kSeekingCharger)
+ await self.send_go_home_cmd_with_check(4, op_errors.kCommandInvalidInState)
- if self.check_pics("RVCOPSTATE.S.M.ST_RUNNING"):
- self.print_instruction(6, "Manually put the device in the RUNNING operational state")
+ if self.check_pics("RVCOPSTATE.S.M.ST_CHARGING"):
+ self.print_step(5, "Manually put the device in the CHARGING operational state")
+ if self.is_ci:
+ self.write_to_app_pipe('{"Name": "Reset"}')
+ self.write_to_app_pipe('{"Name": "Docked"}')
+ self.write_to_app_pipe('{"Name": "Charging"}')
+ else:
+ input("Press Enter when done.\n")
- await self.read_operational_state_with_check(7, op_states.kRunning)
+ await self.read_operational_state_with_check(6, rvc_op_states.kCharging)
- await self.send_go_home_cmd_with_check(8, op_errors.kNoError)
+ await self.send_go_home_cmd_with_check(7, op_errors.kCommandInvalidInState)
+
+ if self.check_pics("RVCOPSTATE.S.M.ST_DOCKED"):
+ self.print_step(8, "Manually put the device in the DOCKED operational state")
+ if self.is_ci:
+ self.write_to_app_pipe('{"Name": "Charged"}')
+ else:
+ input("Press Enter when done.\n")
+
+ await self.read_operational_state_with_check(9, rvc_op_states.kDocked)
+
+ await self.send_go_home_cmd_with_check(10, op_errors.kCommandInvalidInState)
+
+ if self.check_pics("PICS_M_ST_SEEKING_CHARGER"):
+ self.print_step(8, "Manually put the device in the SEEKING CHARGER operational state")
+ if self.is_ci:
+ await self.send_run_change_to_mode_cmd(rvc_app_run_mode_cleaning)
+ await self.send_run_change_to_mode_cmd(rvc_app_run_mode_idle)
+ else:
+ input("Press Enter when done.\n")
await self.read_operational_state_with_check(9, rvc_op_states.kSeekingCharger)
- if self.check_pics("RVCOPSTATE.S.M.ST_PAUSED"):
- self.print_instruction(10, "Manually put the device in the PAUSED operational state")
-
- await self.read_operational_state_with_check(11, op_states.kPaused)
-
- await self.send_go_home_cmd_with_check(12, op_errors.kNoError)
-
- await self.read_operational_state_with_check(13, rvc_op_states.kSeekingCharger)
-
- if self.check_pics("RVCOPSTATE.S.M.ST_ERROR"):
- self.print_instruction(14, "Manually put the device in the ERROR operational state")
-
- await self.read_operational_state_with_check(15, op_states.kError)
-
- await self.send_go_home_cmd_with_check(16, op_errors.kCommandInvalidInState)
-
- if self.check_pics("RVCOPSTATE.S.M.ST_CHARGING"):
- self.print_instruction(17, "Manually put the device in the CHARGING operational state")
-
- await self.read_operational_state_with_check(18, rvc_op_states.kCharging)
-
- await self.send_go_home_cmd_with_check(19, op_errors.kCommandInvalidInState)
-
- if self.check_pics("RVCOPSTATE.S.M.ST_DOCKED"):
- self.print_instruction(20, "Manually put the device in the DOCKED operational state")
-
- await self.read_operational_state_with_check(21, rvc_op_states.kDocked)
-
- await self.send_go_home_cmd_with_check(22, op_errors.kCommandInvalidInState)
+ await self.send_go_home_cmd_with_check(10, op_errors.kNoError)
if __name__ == "__main__":