| # |
| # Copyright (c) 2024 Project CHIP Authors |
| # All rights reserved. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| # See https://github.com/project-chip/connectedhomeip/blob/master/docs/testing/python.md#defining-the-ci-test-arguments |
| # for details about the block below. |
| # |
| # === BEGIN CI TEST ARGUMENTS === |
| # test-runner-runs: run1 |
| # test-runner-run/run1/app: ${ALL_CLUSTERS_APP} |
| # test-runner-run/run1/factoryreset: True |
| # test-runner-run/run1/quiet: True |
| # test-runner-run/run1/app-args: --discriminator 1234 --KVS kvs1 --trace-to json:${TRACE_APP}.json |
| # test-runner-run/run1/script-args: --storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --endpoint 3 --trace-to json:${TRACE_TEST_JSON}.json --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto --PICS src/app/tests/suites/certification/ci-pics-values |
| # === END CI TEST ARGUMENTS === |
| |
| import json |
| import logging |
| import queue |
| import time |
| from datetime import datetime, timedelta |
| from typing import Any |
| |
| import chip.clusters as Clusters |
| import test_plan_support |
| from chip.clusters import ClusterObjects as ClusterObjects |
| from chip.clusters.Attribute import EventReadResult, TypedAttributePath |
| from matter_testing_support import (AttributeValue, ClusterAttributeChangeAccumulator, EventChangeCallback, MatterBaseTest, |
| TestStep, async_test_body, default_matter_test_main) |
| from mobly import asserts |
| |
| logger = logging.getLogger(__name__) |
| |
| |
| class TC_SwitchTests(MatterBaseTest): |
| def __init__(self, *args, **kwargs): |
| super().__init__(*args, **kwargs) |
| |
| def desc_TC_SWTCH_2_4(self) -> str: |
| """Returns a description of this test""" |
| return "[TC-SWTCH-2.4] Momentary Switch Long Press Verification" |
| |
| def pics_TC_SWTCH_2_4(self): |
| """ This function returns a list of PICS for this test case that must be True for the test to be run""" |
| return ["SWTCH.S", "SWTCH.S.F01"] |
| |
| # def steps_TC_SWTCH_2_4(self) -> list[TestStep]: |
| # steps = [ |
| # TestStep("0", "Commissioning, already done", is_commissioning=True), |
| # # TODO: fill when test is done |
| # ] |
| |
| # return steps |
| |
| def _send_named_pipe_command(self, command_dict: dict[str, Any]): |
| app_pid = self.matter_test_config.app_pid |
| if app_pid == 0: |
| asserts.fail("The --app-pid flag must be set when usage of button simulation named pipe is required (e.g. CI)") |
| |
| app_pipe = f"/tmp/chip_all_clusters_fifo_{app_pid}" |
| command = json.dumps(command_dict) |
| |
| # Sends an out-of-band command to the sample app |
| with open(app_pipe, "w") as outfile: |
| logging.info(f"Sending named pipe command to {app_pipe}: '{command}'") |
| outfile.write(command + "\n") |
| # Delay for pipe command to be processed (otherwise tests may be flaky). |
| time.sleep(0.1) |
| |
| def _use_button_simulator(self) -> bool: |
| return self.check_pics("PICS_SDK_CI_ONLY") or self.user_params.get("use_button_simulator", False) |
| |
| def _ask_for_switch_idle(self): |
| if not self._use_button_simulator(): |
| self.wait_for_user_input(prompt_msg="Ensure switch is idle") |
| |
| def _ask_for_long_press(self, endpoint_id: int, pressed_position: int): |
| if not self._use_button_simulator(): |
| self.wait_for_user_input( |
| prompt_msg=f"Press switch position {pressed_position} for a long time (around 5 seconds) on the DUT, then release it.") |
| else: |
| command_dict = {"Name": "SimulateActionSwitchLongPress", "EndpointId": endpoint_id, |
| "ButtonId": pressed_position, "LongPressDelayMillis": 5000, "LongPressDurationMillis": 5500} |
| self._send_named_pipe_command(command_dict) |
| |
| def _ask_for_keep_pressed(self, endpoint_id: int, pressed_position: int): |
| if not self._use_button_simulator(): |
| self.wait_for_user_input( |
| prompt_msg=f"Press switch position {pressed_position} for a long time (around 5 seconds) on the DUT, then release it.") |
| else: |
| # Using the long press here with a long duration so we can check the intermediate value. |
| command_dict = {"Name": "SimulateActionSwitchLongPress", "EndpointId": endpoint_id, |
| "ButtonId": pressed_position, "LongPressDelayMillis": 0, "LongPressDurationMillis": self.keep_pressed_delay} |
| self._send_named_pipe_command(command_dict) |
| |
| def _ask_for_release(self): |
| # Since we used a long press for this, "ask for release" on the button simulator just means waiting out the delay |
| if not self._use_button_simulator(): |
| self.wait_for_user_input( |
| prompt_msg="Release the button." |
| ) |
| else: |
| time.sleep(self.keep_pressed_delay/1000) |
| |
| def _placeholder_for_step(self, step_id: str): |
| # TODO: Global search an replace of `self._placeholder_for_step` with `self.step` when done. |
| logging.info(f"Step {step_id}") |
| pass |
| |
| def _placeholder_for_skip(self, step_id: str): |
| logging.info(f"Skipped step {step_id}") |
| |
| def _await_sequence_of_reports(self, report_queue: queue.Queue, endpoint_id: int, attribute: TypedAttributePath, sequence: list[Any], timeout_sec: float): |
| start_time = time.time() |
| elapsed = 0.0 |
| time_remaining = timeout_sec |
| |
| sequence_idx = 0 |
| actual_values = [] |
| |
| while time_remaining > 0: |
| expected_value = sequence[sequence_idx] |
| logging.info(f"Expecting value {expected_value} for attribute {attribute} on endpoint {endpoint_id}") |
| try: |
| item: AttributeValue = report_queue.get(block=True, timeout=time_remaining) |
| |
| # Track arrival of all values for the given attribute. |
| if item.endpoint_id == endpoint_id and item.attribute == attribute: |
| actual_values.append(item.value) |
| |
| if item.value == expected_value: |
| logging.info(f"Got expected attribute change {sequence_idx+1}/{len(sequence)} for attribute {attribute}") |
| sequence_idx += 1 |
| else: |
| asserts.assert_equal(item.value, expected_value, |
| msg="Did not get expected attribute value in correct sequence.") |
| |
| # We are done waiting when we have accumulated all results. |
| if sequence_idx == len(sequence): |
| logging.info("Got all attribute changes, done waiting.") |
| return |
| except queue.Empty: |
| # No error, we update timeouts and keep going |
| pass |
| |
| elapsed = time.time() - start_time |
| time_remaining = timeout_sec - elapsed |
| |
| asserts.fail(f"Did not get full sequence {sequence} in {timeout_sec:.1f} seconds. Got {actual_values} before time-out.") |
| |
| def _await_sequence_of_events(self, event_queue: queue.Queue, endpoint_id: int, sequence: list[ClusterObjects.ClusterEvent], timeout_sec: float): |
| start_time = time.time() |
| elapsed = 0.0 |
| time_remaining = timeout_sec |
| |
| sequence_idx = 0 |
| actual_events = [] |
| |
| while time_remaining > 0: |
| logging.info(f"Expecting event {sequence[sequence_idx]} on endpoint {endpoint_id}") |
| try: |
| item: EventReadResult = event_queue.get(block=True, timeout=time_remaining) |
| expected_event = sequence[sequence_idx] |
| event_data = item.Data |
| |
| if item.Header.EndpointId == endpoint_id and item.Header.ClusterId == event_data.cluster_id: |
| actual_events.append(event_data) |
| |
| if event_data == expected_event: |
| logging.info(f"Got expected Event {sequence_idx+1}/{len(sequence)}: {event_data}") |
| sequence_idx += 1 |
| else: |
| asserts.assert_equal(event_data, expected_event, msg="Did not get expected event in correct sequence.") |
| |
| # We are done waiting when we have accumulated all results. |
| if sequence_idx == len(sequence): |
| logging.info("Got all expected events, done waiting.") |
| return |
| except queue.Empty: |
| # No error, we update timeouts and keep going |
| pass |
| |
| elapsed = time.time() - start_time |
| time_remaining = timeout_sec - elapsed |
| |
| asserts.fail(f"Did not get full sequence {sequence} in {timeout_sec:.1f} seconds. Got {actual_events} before time-out.") |
| |
| def _expect_no_events_for_cluster(self, event_queue: queue.Queue, endpoint_id: int, expected_cluster: ClusterObjects.Cluster, timeout_sec: float): |
| start_time = time.time() |
| elapsed = 0.0 |
| time_remaining = timeout_sec |
| |
| logging.info(f"Waiting {timeout_sec:.1f} seconds for no more events for cluster {expected_cluster} on endpoint {endpoint_id}") |
| while time_remaining > 0: |
| try: |
| item: EventReadResult = event_queue.get(block=True, timeout=time_remaining) |
| event_data = item.Data |
| |
| if item.Header.EndpointId == endpoint_id and item.Header.ClusterId == event_data.cluster_id and item.Header.ClusterId == expected_cluster.id: |
| asserts.fail(f"Got Event {event_data} when we expected no further events for {expected_cluster}") |
| except queue.Empty: |
| # No error, we update timeouts and keep going |
| pass |
| |
| elapsed = time.time() - start_time |
| time_remaining = timeout_sec - elapsed |
| |
| logging.info(f"Successfully waited for no further events on {expected_cluster} for {elapsed:.1f} seconds") |
| |
| @async_test_body |
| async def test_TC_SWTCH_2_4(self): |
| # TODO: Make this come from PIXIT |
| switch_pressed_position = 1 |
| post_prompt_settle_delay_seconds = 10.0 |
| |
| # Commission DUT - already done |
| |
| # Read feature map to set bool markers |
| cluster = Clusters.Objects.Switch |
| feature_map = await self.read_single_attribute_check_success(cluster, attribute=cluster.Attributes.FeatureMap) |
| |
| has_ms_feature = (feature_map & cluster.Bitmaps.Feature.kMomentarySwitch) != 0 |
| has_msr_feature = (feature_map & cluster.Bitmaps.Feature.kMomentarySwitchRelease) != 0 |
| has_msl_feature = (feature_map & cluster.Bitmaps.Feature.kMomentarySwitchLongPress) != 0 |
| has_as_feature = (feature_map & cluster.Bitmaps.Feature.kActionSwitch) != 0 |
| # has_msm_feature = (feature_map & cluster.Bitmaps.Feature.kMomentarySwitchMultiPress) != 0 |
| |
| if not has_ms_feature: |
| logging.info("Skipping rest of test: SWTCH.S.F01(MS) feature not present") |
| self.skip_all_remaining_steps("2") |
| |
| endpoint_id = self.matter_test_config.endpoint |
| |
| # Step 1: Set up subscription to all Switch cluster events |
| self._placeholder_for_step("1") |
| event_listener = EventChangeCallback(cluster) |
| attrib_listener = ClusterAttributeChangeAccumulator(cluster) |
| await event_listener.start(self.default_controller, self.dut_node_id, endpoint=endpoint_id) |
| await attrib_listener.start(self.default_controller, self.dut_node_id, endpoint=endpoint_id) |
| |
| # Step 2: Operator does not operate switch on the DUT |
| self._placeholder_for_step("2") |
| self._ask_for_switch_idle() |
| |
| # Step 3: TH reads the CurrentPosition attribute from the DUT |
| self._placeholder_for_step("3") |
| |
| # Verify that the value is 0 |
| current_position = await self.read_single_attribute_check_success(cluster, attribute=cluster.Attributes.CurrentPosition) |
| asserts.assert_equal(current_position, 0) |
| |
| # Step 4a: Operator operates switch (keep pressed for long time, e.g. 5 seconds) on the DUT, the release it |
| self._placeholder_for_step("4a") |
| self._ask_for_long_press(endpoint_id, switch_pressed_position) |
| |
| # Step 4b: TH expects report of CurrentPosition 1, followed by a report of Current Position 0. |
| self._placeholder_for_step("4b") |
| logging.info( |
| f"Starting to wait for {post_prompt_settle_delay_seconds:.1f} seconds for CurrentPosition to go {switch_pressed_position}, then 0.") |
| self._await_sequence_of_reports(report_queue=attrib_listener.attribute_queue, endpoint_id=endpoint_id, attribute=cluster.Attributes.CurrentPosition, sequence=[ |
| switch_pressed_position, 0], timeout_sec=post_prompt_settle_delay_seconds) |
| |
| # Step 4c: TH expects at least InitialPress with NewPosition = 1 |
| self._placeholder_for_step("4c") |
| logging.info(f"Starting to wait for {post_prompt_settle_delay_seconds:.1f} seconds for InitialPress event.") |
| expected_events = [cluster.Events.InitialPress(newPosition=switch_pressed_position)] |
| self._await_sequence_of_events(event_queue=event_listener.event_queue, endpoint_id=endpoint_id, |
| sequence=expected_events, timeout_sec=post_prompt_settle_delay_seconds) |
| |
| # Step 4d: For MSL/AS, expect to see LongPress/LongRelease in that order |
| if not has_msl_feature and not has_as_feature: |
| logging.info("Skipping Step 4d due to missing MSL and AS features") |
| self._placeholder_for_skip("4d") |
| else: |
| # Steb 4d: TH expects report of LongPress, LongRelease in that order. |
| self._placeholder_for_step("4d") |
| logging.info(f"Starting to wait for {post_prompt_settle_delay_seconds:.1f} seconds for LongPress then LongRelease.") |
| expected_events = [] |
| expected_events.append(cluster.Events.LongPress(newPosition=switch_pressed_position)) |
| expected_events.append(cluster.Events.LongRelease(previousPosition=switch_pressed_position)) |
| self._await_sequence_of_events(event_queue=event_listener.event_queue, endpoint_id=endpoint_id, |
| sequence=expected_events, timeout_sec=post_prompt_settle_delay_seconds) |
| |
| # Step 4e: For MS & (!MSL & !AS & !MSR), expect no further events for 10 seconds. |
| if not has_msl_feature and not has_as_feature and not has_msr_feature: |
| self._placeholder_for_step("4e") |
| self._expect_no_events_for_cluster(event_queue=event_listener.event_queue, |
| endpoint_id=endpoint_id, expected_cluster=cluster, timeout_sec=10.0) |
| |
| # Step 4f: For MSR & not MSL, expect to see ShortRelease. |
| if not has_msl_feature and has_msr_feature: |
| self._placeholder_for_step("4f") |
| expected_events = [cluster.Events.ShortRelease(previousPosition=switch_pressed_position)] |
| self._await_sequence_of_events(event_queue=event_listener.event_queue, endpoint_id=endpoint_id, |
| sequence=expected_events, timeout_sec=post_prompt_settle_delay_seconds) |
| |
| def _received_event(self, event_listener: EventChangeCallback, target_event: ClusterObjects.ClusterEvent, timeout_s: int) -> bool: |
| """ |
| Returns true if this event was received, false otherwise |
| """ |
| remaining = timedelta(seconds=timeout_s) |
| end_time = datetime.now() + remaining |
| while (remaining.seconds > 0): |
| try: |
| event = event_listener.event_queue.get(timeout=remaining.seconds) |
| except queue.Empty: |
| return False |
| |
| if event.Header.EventId == target_event.event_id: |
| return True |
| remaining = end_time - datetime.now() |
| return False |
| |
| def pics_TC_SWTCH_2_3(self): |
| return ['SWTCH.S.F01'] |
| |
| def steps_TC_SWTCH_2_3(self): |
| return [TestStep(1, test_plan_support.commission_if_required(), "", is_commissioning=True), |
| TestStep(2, "Set up subscription to all events of Switch cluster on the endpoint"), |
| TestStep(3, "Operator does not operate switch on the DUT"), |
| TestStep(4, "TH reads the CurrentPosition attribute from the DUT", "Verify that the value is 0"), |
| TestStep(5, "Operator operates switch (keep it pressed)", |
| "Verify that the TH receives InitialPress event with NewPosition set to 1 on the DUT"), |
| TestStep(6, "TH reads the CurrentPosition attribute from the DUT", "Verify that the value is 1"), |
| TestStep(7, "Operator releases switch on the DUT"), |
| TestStep("8a", "If the DUT implements the MSR feature, verify that the TH receives ShortRelease event with NewPosition set to 0 on the DUT", "Event received"), |
| TestStep( |
| "8b", "If the DUT implements the AS feature, verify that the TH does not receive ShortRelease event on the DUT", "No event received"), |
| TestStep(9, "TH reads the CurrentPosition attribute from the DUT", "Verify that the value is 0"), |
| ] |
| |
| @async_test_body |
| async def test_TC_SWTCH_2_3(self): |
| # Commissioning - already done |
| self.step(1) |
| cluster = Clusters.Switch |
| feature_map = await self.read_single_attribute_check_success(cluster, attribute=cluster.Attributes.FeatureMap) |
| |
| has_msr_feature = (feature_map & cluster.Bitmaps.Feature.kMomentarySwitchRelease) != 0 |
| has_as_feature = (feature_map & cluster.Bitmaps.Feature.kActionSwitch) != 0 |
| |
| endpoint_id = self.matter_test_config.endpoint |
| |
| self.step(2) |
| event_listener = EventChangeCallback(cluster) |
| await event_listener.start(self.default_controller, self.dut_node_id, endpoint=endpoint_id) |
| |
| self.step(3) |
| self._ask_for_switch_idle() |
| |
| self.step(4) |
| button_val = await self.read_single_attribute_check_success(cluster=cluster, attribute=cluster.Attributes.CurrentPosition) |
| asserts.assert_equal(button_val, 0, "Button value is not 0") |
| |
| self.step(5) |
| # We're using a long press here with a very long duration (in computer-land). This will let us check the intermediate values. |
| # This is 1s larger than the subscription ceiling |
| self.keep_pressed_delay = 6000 |
| self.pressed_position = 1 |
| self._ask_for_keep_pressed(endpoint_id, self.pressed_position) |
| event_listener.wait_for_event_report(cluster.Events.InitialPress) |
| |
| self.step(6) |
| button_val = await self.read_single_attribute_check_success(cluster=cluster, attribute=cluster.Attributes.CurrentPosition) |
| asserts.assert_equal(button_val, self.pressed_position, f"Button value is not {self.pressed_position}") |
| |
| self.step(7) |
| self._ask_for_release() |
| |
| self.step("8a") |
| if has_msr_feature: |
| asserts.assert_true(self._received_event(event_listener, cluster.Events.ShortRelease, 10), |
| "Did not receive short release") |
| else: |
| self.mark_current_step_skipped() |
| |
| self.step("8b") |
| if has_as_feature: |
| asserts.assert_false(self._received_event(event_listener, cluster.Events.ShortRelease, 10), "Received short release") |
| else: |
| self.mark_current_step_skipped() |
| |
| self.step(9) |
| button_val = await self.read_single_attribute_check_success(cluster=cluster, attribute=cluster.Attributes.CurrentPosition) |
| asserts.assert_equal(button_val, 0, "Button value is not 0") |
| |
| |
| if __name__ == "__main__": |
| default_matter_test_main() |