Update TC_RVCOPSTATE_2_4 following test plan changes (#32072)

* Updated the TC_RVCOPSTATE_2_4 python test script following changes in the test plan.

* Enabled TC_RVCOPSTATE_2_4 to run in CI agianst the rvc-app.

* Future proofed the enum to text functions.

* Restyled by autopep8

* Update src/python_testing/TC_RVCOPSTATE_2_4.py

Co-authored-by: Petru Lauric <81822411+plauric@users.noreply.github.com>

---------

Co-authored-by: Restyled.io <commits@restyled.io>
Co-authored-by: Petru Lauric <81822411+plauric@users.noreply.github.com>
diff --git a/src/python_testing/TC_RVCOPSTATE_2_4.py b/src/python_testing/TC_RVCOPSTATE_2_4.py
index ddcb134..3314eaa 100644
--- a/src/python_testing/TC_RVCOPSTATE_2_4.py
+++ b/src/python_testing/TC_RVCOPSTATE_2_4.py
@@ -24,59 +24,26 @@
 
 # Takes an OpState or RvcOpState state enum and returns a string representation
 def state_enum_to_text(state_enum):
-    if state_enum == Clusters.OperationalState.Enums.OperationalStateEnum.kStopped:
-        return "Stopped(0x00)"
-    elif state_enum == Clusters.OperationalState.Enums.OperationalStateEnum.kRunning:
-        return "Running(0x01)"
-    elif state_enum == Clusters.OperationalState.Enums.OperationalStateEnum.kPaused:
-        return "Paused(0x02)"
-    elif state_enum == Clusters.OperationalState.Enums.OperationalStateEnum.kError:
-        return "Error(0x03)"
-    elif state_enum == Clusters.RvcOperationalState.Enums.OperationalStateEnum.kSeekingCharger:
-        return "SeekingCharger(0x40)"
-    elif state_enum == Clusters.RvcOperationalState.Enums.OperationalStateEnum.kCharging:
-        return "Charging(0x41)"
-    elif state_enum == Clusters.RvcOperationalState.Enums.OperationalStateEnum.kDocked:
-        return "Docked(0x42)"
-    else:
-        return "UnknownEnumValue"
+    try:
+        return f'{Clusters.RvcOperationalState.Enums.OperationalStateEnum(state_enum).name[1:]}(0x{state_enum:02x})'
+    except AttributeError:
+        return f'{Clusters.OperationalState.Enums.OperationalStateEnum(state_enum).name[1:]}(0x{state_enum:02x})'
 
 
 # Takes an OpState or RvcOpState error enum and returns a string representation
 def error_enum_to_text(error_enum):
-    if error_enum == Clusters.OperationalState.Enums.ErrorStateEnum.kNoError:
-        return "NoError(0x00)"
-    elif error_enum == Clusters.OperationalState.Enums.ErrorStateEnum.kUnableToStartOrResume:
-        return "UnableToStartOrResume(0x01)"
-    elif error_enum == Clusters.OperationalState.Enums.ErrorStateEnum.kUnableToCompleteOperation:
-        return "UnableToCompleteOperation(0x02)"
-    elif error_enum == Clusters.OperationalState.Enums.ErrorStateEnum.kCommandInvalidInState:
-        return "CommandInvalidInState(0x03)"
-    elif error_enum == Clusters.RvcOperationalState.Enums.ErrorStateEnum.kFailedToFindChargingDock:
-        return "FailedToFindChargingDock(0x40)"
-    elif error_enum == Clusters.RvcOperationalState.Enums.ErrorStateEnum.kStuck:
-        return "Stuck(0x41)"
-    elif error_enum == Clusters.RvcOperationalState.Enums.ErrorStateEnum.kDustBinMissing:
-        return "DustBinMissing(0x42)"
-    elif error_enum == Clusters.RvcOperationalState.Enums.ErrorStateEnum.kDustBinFull:
-        return "DustBinFull(0x43)"
-    elif error_enum == Clusters.RvcOperationalState.Enums.ErrorStateEnum.kWaterTankEmpty:
-        return "WaterTankEmpty(0x44)"
-    elif error_enum == Clusters.RvcOperationalState.Enums.ErrorStateEnum.kWaterTankMissing:
-        return "WaterTankMissing(0x45)"
-    elif error_enum == Clusters.RvcOperationalState.Enums.ErrorStateEnum.kWaterTankLidOpen:
-        return "WaterTankLidOpen(0x46)"
-    elif error_enum == Clusters.RvcOperationalState.Enums.ErrorStateEnum.kMopCleaningPadMissing:
-        return "MopCleaningPadMissing(0x47)"
-
-    def pics_TC_RVCOPSTATE_2_4(self) -> list[str]:
-        return ["RVCOPSTATE.S"]
+    try:
+        return f'{Clusters.RvcOperationalState.Enums.ErrorStateEnum(error_enum).name[1:]}(0x{error_enum:02x})'
+    except AttributeError:
+        return f'{Clusters.OperationalState.Enums.ErrorStateEnum(error_enum).name[1:]}(0x{error_enum:02x})'
 
 
 class TC_RVCOPSTATE_2_4(MatterBaseTest):
     def __init__(self, *args):
         super().__init__(*args)
         self.endpoint = None
+        self.is_ci = False
+        self.app_pipe = "/tmp/chip_rvc_fifo_"
 
     async def read_mod_attribute_expect_success(self, endpoint, attribute):
         cluster = Clusters.Objects.RvcOperationalState
@@ -105,15 +72,29 @@
         asserts.assert_equal(operational_state, expected_state,
                              "OperationalState(%s) should be %s" % (operational_state, state_enum_to_text(expected_state)))
 
-    # Prints the instruction and waits for a user input to continue
-    def print_instruction(self, step_number, instruction):
-        self.print_step(step_number, instruction)
-        input("Press Enter when done.\n")
+    # Sends an RvcRunMode Change to mode command
+    async def send_run_change_to_mode_cmd(self, new_mode):
+        await self.send_single_cmd(cmd=Clusters.Objects.RvcRunMode.Commands.ChangeToMode(newMode=new_mode),
+                                   endpoint=self.endpoint)
+
+    # Sends an out-of-band command to the rvc-app
+    def write_to_app_pipe(self, command):
+        with open(self.app_pipe, "w") as app_pipe:
+            app_pipe.write(command + "\n")
+
+    def pics_TC_RVCOPSTATE_2_4(self) -> list[str]:
+        return ["RVCOPSTATE.S"]
 
     @async_test_body
     async def test_TC_RVCOPSTATE_2_4(self):
         self.endpoint = self.matter_test_config.endpoint
         asserts.assert_false(self.endpoint is None, "--endpoint <endpoint> must be included on the command line in.")
+        self.is_ci = self.check_pics("PICS_SDK_CI_ONLY")
+        if self.is_ci:
+            app_pid = self.matter_test_config.app_pid
+            if app_pid == 0:
+                asserts.fail("The --app-pid flag must be set when PICS_SDK_CI_ONLY is set.c")
+            self.app_pipe = self.app_pipe + str(app_pid)
 
         asserts.assert_true(self.check_pics("RVCOPSTATE.S.A0004"), "RVCOPSTATE.S.A0004 must be supported")
         asserts.assert_true(self.check_pics("RVCOPSTATE.S.C04.Tx"), "RVCOPSTATE.S.C04.Tx must be supported")
@@ -123,55 +104,62 @@
         rvc_op_states = Clusters.RvcOperationalState.Enums.OperationalStateEnum
         op_errors = Clusters.OperationalState.Enums.ErrorStateEnum
 
+        # These are the mode values used by the RVC example app that is used in CI.
+        rvc_app_run_mode_idle = 0
+        rvc_app_run_mode_cleaning = 1
+
         self.print_step(1, "Commissioning, already done")
 
-        if self.check_pics("RVCOPSTATE.S.M.ST_STOPPED"):
-            self.print_instruction(2, "Manually put the device in the STOPPED operational state")
+        # Ensure that the device is in the correct state
+        if self.is_ci:
+            self.write_to_app_pipe('{"Name": "Reset"}')
 
-            await self.read_operational_state_with_check(3, op_states.kStopped)
+        if self.check_pics("RVCOPSTATE.S.M.ST_ERROR"):
+            self.print_step(2, "Manually put the device in the ERROR operational state")
+            if self.is_ci:
+                self.write_to_app_pipe('{"Name": "ErrorEvent", "Error": "UnableToStartOrResume"}')
+            else:
+                input("Press Enter when done.\n")
 
-            await self.send_go_home_cmd_with_check(4, op_errors.kNoError)
+            await self.read_operational_state_with_check(3, op_states.kError)
 
-            await self.read_operational_state_with_check(5, rvc_op_states.kSeekingCharger)
+            await self.send_go_home_cmd_with_check(4, op_errors.kCommandInvalidInState)
 
-        if self.check_pics("RVCOPSTATE.S.M.ST_RUNNING"):
-            self.print_instruction(6, "Manually put the device in the RUNNING operational state")
+        if self.check_pics("RVCOPSTATE.S.M.ST_CHARGING"):
+            self.print_step(5, "Manually put the device in the CHARGING operational state")
+            if self.is_ci:
+                self.write_to_app_pipe('{"Name": "Reset"}')
+                self.write_to_app_pipe('{"Name": "Docked"}')
+                self.write_to_app_pipe('{"Name": "Charging"}')
+            else:
+                input("Press Enter when done.\n")
 
-            await self.read_operational_state_with_check(7, op_states.kRunning)
+            await self.read_operational_state_with_check(6, rvc_op_states.kCharging)
 
-            await self.send_go_home_cmd_with_check(8, op_errors.kNoError)
+            await self.send_go_home_cmd_with_check(7, op_errors.kCommandInvalidInState)
+
+        if self.check_pics("RVCOPSTATE.S.M.ST_DOCKED"):
+            self.print_step(8, "Manually put the device in the DOCKED operational state")
+            if self.is_ci:
+                self.write_to_app_pipe('{"Name": "Charged"}')
+            else:
+                input("Press Enter when done.\n")
+
+            await self.read_operational_state_with_check(9, rvc_op_states.kDocked)
+
+            await self.send_go_home_cmd_with_check(10, op_errors.kCommandInvalidInState)
+
+        if self.check_pics("PICS_M_ST_SEEKING_CHARGER"):
+            self.print_step(8, "Manually put the device in the SEEKING CHARGER operational state")
+            if self.is_ci:
+                await self.send_run_change_to_mode_cmd(rvc_app_run_mode_cleaning)
+                await self.send_run_change_to_mode_cmd(rvc_app_run_mode_idle)
+            else:
+                input("Press Enter when done.\n")
 
             await self.read_operational_state_with_check(9, rvc_op_states.kSeekingCharger)
 
-        if self.check_pics("RVCOPSTATE.S.M.ST_PAUSED"):
-            self.print_instruction(10, "Manually put the device in the PAUSED operational state")
-
-            await self.read_operational_state_with_check(11, op_states.kPaused)
-
-            await self.send_go_home_cmd_with_check(12, op_errors.kNoError)
-
-            await self.read_operational_state_with_check(13, rvc_op_states.kSeekingCharger)
-
-        if self.check_pics("RVCOPSTATE.S.M.ST_ERROR"):
-            self.print_instruction(14, "Manually put the device in the ERROR operational state")
-
-            await self.read_operational_state_with_check(15, op_states.kError)
-
-            await self.send_go_home_cmd_with_check(16, op_errors.kCommandInvalidInState)
-
-        if self.check_pics("RVCOPSTATE.S.M.ST_CHARGING"):
-            self.print_instruction(17, "Manually put the device in the CHARGING operational state")
-
-            await self.read_operational_state_with_check(18, rvc_op_states.kCharging)
-
-            await self.send_go_home_cmd_with_check(19, op_errors.kCommandInvalidInState)
-
-        if self.check_pics("RVCOPSTATE.S.M.ST_DOCKED"):
-            self.print_instruction(20, "Manually put the device in the DOCKED operational state")
-
-            await self.read_operational_state_with_check(21, rvc_op_states.kDocked)
-
-            await self.send_go_home_cmd_with_check(22, op_errors.kCommandInvalidInState)
+            await self.send_go_home_cmd_with_check(10, op_errors.kNoError)
 
 
 if __name__ == "__main__":