Fix remaining post-TE2 TC-SWTCH issues (#35291)
* Fix remaining post-TE2 TC-SWTCH issues
- Fixes #34556 (-2.2, -2.3 and -2.4 to better match test plan)
- Fixes #28620 (Better bound checks)
- Fixes #35241
* Restyled by clang-format
* Restyled by autopep8
* Restyled by isort
* Fix step 2 of TC-SWTCH--2.4
---------
Co-authored-by: Restyled.io <commits@restyled.io>
diff --git a/examples/all-clusters-app/linux/AllClustersCommandDelegate.cpp b/examples/all-clusters-app/linux/AllClustersCommandDelegate.cpp
index 6e5008a..0d66550 100644
--- a/examples/all-clusters-app/linux/AllClustersCommandDelegate.cpp
+++ b/examples/all-clusters-app/linux/AllClustersCommandDelegate.cpp
@@ -57,6 +57,17 @@
return jsonValue.isMember(field) && jsonValue[field].isNumeric();
}
+uint8_t GetNumberOfSwitchPositions(EndpointId endpointId)
+{
+ // TODO: Move to using public API of cluster.
+ uint8_t numPositions = 0;
+
+ // On failure, the numPositions won't be changed, so 0 returned.
+ (void) Switch::Attributes::NumberOfPositions::Get(endpointId, &numPositions);
+
+ return numPositions;
+}
+
/**
* Named pipe handler for simulated long press
*
@@ -99,6 +110,15 @@
EndpointId endpointId = static_cast<EndpointId>(jsonValue["EndpointId"].asUInt());
uint8_t buttonId = static_cast<uint8_t>(jsonValue["ButtonId"].asUInt());
+
+ uint8_t numPositions = GetNumberOfSwitchPositions(endpointId);
+ if (buttonId >= numPositions)
+ {
+ std::string inputJson = jsonValue.toStyledString();
+ ChipLogError(NotSpecified, "Invalid ButtonId (out of range) in %s", inputJson.c_str());
+ return;
+ }
+
System::Clock::Milliseconds32 longPressDelayMillis{ static_cast<unsigned>(jsonValue["LongPressDelayMillis"].asUInt()) };
System::Clock::Milliseconds32 longPressDurationMillis{ static_cast<unsigned>(jsonValue["LongPressDurationMillis"].asUInt()) };
uint32_t featureMap = static_cast<uint32_t>(jsonValue["FeatureMap"].asUInt());
@@ -169,6 +189,15 @@
EndpointId endpointId = static_cast<EndpointId>(jsonValue["EndpointId"].asUInt());
uint8_t buttonId = static_cast<uint8_t>(jsonValue["ButtonId"].asUInt());
+
+ uint8_t numPositions = GetNumberOfSwitchPositions(endpointId);
+ if (buttonId >= numPositions)
+ {
+ std::string inputJson = jsonValue.toStyledString();
+ ChipLogError(NotSpecified, "Invalid ButtonId (out of range) in %s", inputJson.c_str());
+ return;
+ }
+
System::Clock::Milliseconds32 multiPressPressedTimeMillis{ static_cast<unsigned>(
jsonValue["MultiPressPressedTimeMillis"].asUInt()) };
System::Clock::Milliseconds32 multiPressReleasedTimeMillis{ static_cast<unsigned>(
@@ -227,6 +256,14 @@
EndpointId endpointId = static_cast<EndpointId>(jsonValue["EndpointId"].asUInt());
uint8_t positionId = static_cast<uint8_t>(jsonValue["PositionId"].asUInt());
+ uint8_t numPositions = GetNumberOfSwitchPositions(endpointId);
+ if (positionId >= numPositions)
+ {
+ std::string inputJson = jsonValue.toStyledString();
+ ChipLogError(NotSpecified, "Invalid PositionId (out of range) in %s", inputJson.c_str());
+ return;
+ }
+
uint8_t previousPositionId = 0;
Protocols::InteractionModel::Status status = Switch::Attributes::CurrentPosition::Get(endpointId, &previousPositionId);
VerifyOrReturn(Protocols::InteractionModel::Status::Success == status,
diff --git a/src/python_testing/TC_SWTCH.py b/src/python_testing/TC_SWTCH.py
index 1001402..3db2b21 100644
--- a/src/python_testing/TC_SWTCH.py
+++ b/src/python_testing/TC_SWTCH.py
@@ -38,8 +38,8 @@
from chip.clusters import ClusterObjects as ClusterObjects
from chip.clusters.Attribute import EventReadResult
from chip.tlv import uint
-from matter_testing_support import (ClusterAttributeChangeAccumulator, EventChangeCallback, MatterBaseTest, TestStep,
- await_sequence_of_reports, default_matter_test_main, has_feature, per_endpoint_test)
+from matter_testing_support import (AttributeValue, ClusterAttributeChangeAccumulator, EventChangeCallback, MatterBaseTest,
+ TestStep, await_sequence_of_reports, default_matter_test_main, has_feature, per_endpoint_test)
from mobly import asserts
logger = logging.getLogger(__name__)
@@ -70,6 +70,10 @@
super().__init__(*args, **kwargs)
self._default_pressed_position = self.user_params.get("default_pressed_position", 1)
+ def setup_test(self):
+ super().setup_test()
+ self.is_ci = self._use_button_simulator()
+
def _send_named_pipe_command(self, command_dict: dict[str, Any]):
app_pid = self.matter_test_config.app_pid
if app_pid == 0:
@@ -259,32 +263,39 @@
def steps_TC_SWTCH_2_2(self):
return [TestStep(1, test_plan_support.commission_if_required(), "", is_commissioning=True),
- TestStep(2, "Set up subscription to all events of Switch cluster on the endpoint"),
+ TestStep(2, "Set up subscription to all events and attributes of Switch cluster on the endpoint"),
TestStep(3, "Operator sets switch to first position on the DUT"),
- TestStep(4, "TH reads the CurrentPosition attribute from the DUT", "Verify that the value is 0"),
+ TestStep(4, "TH reads the CurrentPosition attribute from the DUT.", "Verify that the value is 0."),
TestStep(5, "Operator sets switch to second position (one) on the DUT",
- "Verify that the TH receives SwitchLatched event with NewPosition set to 1 from the DUT"),
- TestStep(6, "TH reads the CurrentPosition attribute from the DUT", "Verify that the value is 1"),
+ "Verify that the TH receives SwitchLatched event with NewPosition set to 1 from the DUT."),
+ TestStep(6, "TH reads the CurrentPosition attribute from the DUT",
+ "Verify that the value is 1, and that a subscription report was received for that change."),
TestStep(7, "If there are more than 2 positions, test subsequent positions of the DUT"),
TestStep(8, "Operator sets switch to first position on the DUT."),
TestStep(9, "Wait 10 seconds for event reports stable." "Verify that last SwitchLatched event received is for NewPosition 0."),
- TestStep(10, "TH reads the CurrentPosition attribute from the DUT", "Verify that the value is 0"),
+ TestStep(10, "TH reads the CurrentPosition attribute from the DUT",
+ "Verify that the value is 0, and that a subscription report was received for that change."),
]
@per_endpoint_test(has_feature(Clusters.Switch, Clusters.Switch.Bitmaps.Feature.kLatchingSwitch))
async def test_TC_SWTCH_2_2(self):
post_prompt_settle_delay_seconds = 10.0
+ cluster = Clusters.Switch
+ endpoint_id = self.matter_test_config.endpoint
# Step 1: Commissioning - already done
self.step(1)
- cluster = Clusters.Switch
- endpoint_id = self.matter_test_config.endpoint
-
# Step 2: Set up subscription to all events of Switch cluster on the endpoint.
self.step(2)
event_listener = EventChangeCallback(cluster)
await event_listener.start(self.default_controller, self.dut_node_id, endpoint=endpoint_id)
+ attrib_listener = ClusterAttributeChangeAccumulator(cluster)
+ await attrib_listener.start(self.default_controller, self.dut_node_id, endpoint=endpoint_id)
+
+ # Pre-get number of positions for step 7 later.
+ num_positions = await self.read_single_attribute_check_success(cluster=cluster, attribute=cluster.Attributes.NumberOfPositions)
+ asserts.assert_greater(num_positions, 1, "Latching switch has only 1 position, this is impossible.")
# Step 3: Operator sets switch to first position on the DUT.
self.step(3)
@@ -296,25 +307,39 @@
button_val = await self.read_single_attribute_check_success(cluster=cluster, attribute=cluster.Attributes.CurrentPosition)
asserts.assert_equal(button_val, 0, "Switch position value is not 0")
- # Step 5: Operator sets switch to second position (one) on the DUT",
- # Verify that the TH receives SwitchLatched event with NewPosition set to 1 from the DUT
- self.step(5)
- expected_switch_position = 1
- self._ask_for_switch_position(endpoint_id, expected_switch_position)
+ attrib_listener.reset()
+ event_listener.reset()
- data = event_listener.wait_for_event_report(cluster.Events.SwitchLatched, timeout_sec=post_prompt_settle_delay_seconds)
- logging.info(f"-> SwitchLatched event last received: {data}")
- asserts.assert_equal(data, cluster.Events.SwitchLatched(
- newPosition=expected_switch_position), "Did not get expected switch position")
+ for expected_switch_position in range(1, num_positions):
+ # Step 5: Operator sets switch to second position (one) on the DUT",
+ # Verify that the TH receives SwitchLatched event with NewPosition set to 1 from the DUT
+ if expected_switch_position == 1:
+ self.step(5)
+ self._ask_for_switch_position(endpoint_id, expected_switch_position)
- # Step 6: TH reads the CurrentPosition attribute from the DUT", "Verify that the value is 1
- self.step(6)
- button_val = await self.read_single_attribute_check_success(cluster=cluster, attribute=cluster.Attributes.CurrentPosition)
- asserts.assert_equal(button_val, expected_switch_position, f"Switch position is not {expected_switch_position}")
+ data = event_listener.wait_for_event_report(cluster.Events.SwitchLatched, timeout_sec=post_prompt_settle_delay_seconds)
+ logging.info(f"-> SwitchLatched event last received: {data}")
+ asserts.assert_equal(data, cluster.Events.SwitchLatched(
+ newPosition=expected_switch_position), "Did not get expected switch position")
- # Step 7: If there are more than 2 positions, test subsequent positions of the DUT
- # # TODO(#34656): Implement loop for > 2 total positions
- self.skip_step(7)
+ # Step 6: TH reads the CurrentPosition attribute from the DUT", "Verify that the value is 1
+ if expected_switch_position == 1: # Indicate step 7 only once
+ self.step(6)
+ button_val = await self.read_single_attribute_check_success(cluster=cluster, attribute=cluster.Attributes.CurrentPosition)
+ asserts.assert_equal(button_val, expected_switch_position, f"Switch position is not {expected_switch_position}")
+ logging.info(f"Checking to see if a report for {expected_switch_position} is received")
+ attrib_listener.await_sequence_of_reports(attribute=cluster.Attributes.CurrentPosition, sequence=[
+ expected_switch_position], timeout_sec=post_prompt_settle_delay_seconds)
+
+ # Step 7: If there are more than 2 positions, test subsequent positions of the DUT
+ if expected_switch_position == 1:
+ if num_positions > 2: # Indicate step 7 only once
+ self.step(7)
+ else:
+ self.skip_step(7)
+
+ if num_positions > 2:
+ logging.info("Looping for the other positions")
# Step 8: Operator sets switch to first position on the DUT.
self.step(8)
@@ -324,7 +349,7 @@
# Step 9: Wait 10 seconds for event reports stable.
# Verify that last SwitchLatched event received is for NewPosition 0.
self.step(9)
- time.sleep(10.0)
+ time.sleep(10.0 if not self.is_ci else 1.0)
expected_switch_position = 0
last_event = event_listener.get_last_event()
@@ -337,15 +362,21 @@
# Step 10: TH reads the CurrentPosition attribute from the DUT.
# Verify that the value is 0
self.step(10)
+
button_val = await self.read_single_attribute_check_success(cluster=cluster, attribute=cluster.Attributes.CurrentPosition)
asserts.assert_equal(button_val, 0, "Button value is not 0")
+ logging.info(f"Checking to see if a report for {expected_switch_position} is received")
+ expected_final_value = [AttributeValue(
+ endpoint_id, attribute=cluster.Attributes.CurrentPosition, value=expected_switch_position)]
+ attrib_listener.await_all_final_values_reported(expected_final_value, timeout_sec=post_prompt_settle_delay_seconds)
+
def steps_TC_SWTCH_2_3(self):
return [TestStep(1, test_plan_support.commission_if_required(), "", is_commissioning=True),
TestStep(2, "Set up subscription to all events of Switch cluster on the endpoint"),
TestStep(3, "Operator does not operate switch on the DUT"),
TestStep(4, "TH reads the CurrentPosition attribute from the DUT", "Verify that the value is 0"),
- TestStep(5, "Operator operates switch (keep it pressed)",
+ TestStep(5, "Operator operates switch (keep it pressed, and wait at least 5 seconds)",
"Verify that the TH receives InitialPress event with NewPosition set to 1 on the DUT"),
TestStep(6, "TH reads the CurrentPosition attribute from the DUT", "Verify that the value is 1"),
TestStep(7, "Operator releases switch on the DUT"),
@@ -421,15 +452,15 @@
def steps_TC_SWTCH_2_4(self):
return [TestStep(1, test_plan_support.commission_if_required(), "", is_commissioning=True),
- TestStep(2, "Set up subscription to all events of Switch cluster on the endpoint"),
+ TestStep(2, "Set up subscription to all events and attributes of Switch cluster on the endpoint"),
TestStep(3, "Operator does not operate switch on the DUT"),
TestStep(4, "TH reads the CurrentPosition attribute from the DUT", "Verify that the value is 0"),
- TestStep(5, "Operator operates switch (keep pressed for long time, e.g. 5 seconds) on the DUT, the release it",
+ TestStep(5, "Operator operates switch (keep pressed for long time, e.g. 5 seconds) on the DUT, then release it",
"""
* TH expects receiving a subscription report of CurrentPosition 1, followed by a report of Current Position 0.
* TH expects receiving at InitialPress event with NewPosition = 1.
- * if MSL or AS feature is supported, TH expect receiving LongPress/LongRelease in that order.
- * if MS & (!MSL & !AS & !MSR) features present, TH expects receiving no further events for 10 seconds after release.
+ * if MSL feature is supported, TH expect receiving LongPress/LongRelease in that order.
+ * if MS & (!MSL & !AS & !MSR & !MSM) features present, TH expects receiving no further events for 10 seconds after release.
* if (MSR & !MSL) features present, TH expects receiving ShortRelease event.
""")
]
@@ -451,17 +482,18 @@
has_ms_feature = (feature_map & cluster.Bitmaps.Feature.kMomentarySwitch) != 0
has_msr_feature = (feature_map & cluster.Bitmaps.Feature.kMomentarySwitchRelease) != 0
has_msl_feature = (feature_map & cluster.Bitmaps.Feature.kMomentarySwitchLongPress) != 0
+ has_msm_feature = (feature_map & cluster.Bitmaps.Feature.kMomentarySwitchMultiPress) != 0
has_as_feature = (feature_map & cluster.Bitmaps.Feature.kActionSwitch) != 0
if not has_ms_feature:
logging.info("Skipping rest of test: SWTCH.S.F01(MS) feature not present")
self.skip_all_remaining_steps("2")
- # Step 2: Set up subscription to all events of Switch cluster on the endpoint
+ # Step 2: Set up subscription to all events and attributes of Switch cluster on the endpoint
self.step(2)
event_listener = EventChangeCallback(cluster)
- attrib_listener = ClusterAttributeChangeAccumulator(cluster)
await event_listener.start(self.default_controller, self.dut_node_id, endpoint=endpoint_id)
+ attrib_listener = ClusterAttributeChangeAccumulator(cluster)
await attrib_listener.start(self.default_controller, self.dut_node_id, endpoint=endpoint_id)
# Step 3: Operator does not operate switch on the DUT
@@ -503,8 +535,8 @@
self._await_sequence_of_events(event_queue=event_listener.event_queue, endpoint_id=endpoint_id,
sequence=expected_events, timeout_sec=post_prompt_settle_delay_seconds)
- # - if MS & (!MSL & !AS & !MSR) features present, expect no further events for 10 seconds after release.
- if not has_msl_feature and not has_as_feature and not has_msr_feature:
+ # - if MS & (!MSL & !AS & !MSR & !MSM) features present, expect no further events for 10 seconds after release.
+ if not has_msl_feature and not has_as_feature and not has_msr_feature and not has_msm_feature:
self._expect_no_events_for_cluster(event_queue=event_listener.event_queue,
endpoint_id=endpoint_id, expected_cluster=cluster, timeout_sec=10.0)
diff --git a/src/python_testing/matter_testing_support.py b/src/python_testing/matter_testing_support.py
index ea93730..1e0fa26 100644
--- a/src/python_testing/matter_testing_support.py
+++ b/src/python_testing/matter_testing_support.py
@@ -301,6 +301,10 @@
_ = self.get_last_event()
return
+ def reset(self) -> None:
+ """Resets state as if no events had ever been received."""
+ self.flush_events()
+
@property
def event_queue(self) -> queue.Queue:
return self._q
@@ -356,7 +360,7 @@
timestamp_utc: Optional[datetime] = None
-def await_sequence_of_reports(report_queue: queue.Queue, endpoint_id: int, attribute: TypedAttributePath, sequence: list[Any], timeout_sec: float):
+def await_sequence_of_reports(report_queue: queue.Queue, endpoint_id: int, attribute: TypedAttributePath, sequence: list[Any], timeout_sec: float) -> None:
"""Given a queue.Queue hooked-up to an attribute change accumulator, await a given expected sequence of attribute reports.
Args:
@@ -418,6 +422,7 @@
self._subscription = None
self._lock = threading.Lock()
self._q = queue.Queue()
+ self._endpoint_id = 0
self.reset()
def reset(self):
@@ -441,6 +446,7 @@
fabricFiltered=fabric_filtered,
keepSubscriptions=keepSubscriptions
)
+ self._endpoint_id = endpoint
self._subscription.SetAttributeUpdateCallback(self.__call__)
return self._subscription
@@ -513,6 +519,24 @@
logging.info(f" -> {expected_element} found: {last_report_matches.get(expected_idx)}")
asserts.fail("Did not find all expected last report values before time-out")
+ def await_sequence_of_reports(self, attribute: TypedAttributePath, sequence: list[Any], timeout_sec: float) -> None:
+ """Await a given expected sequence of attribute reports in the accumulator for the endpoint associated.
+
+ Args:
+ - attribute: attribute to match for reports to check.
+ - sequence: list of attribute values in order that are expected.
+ - timeout_sec: number of seconds to wait for.
+
+ *** WARNING: The queue contains every report since the sub was established. Use
+ self.reset() to make it empty. ***
+
+ This will fail current Mobly test with assertion failure if the data is not as expected in order.
+
+ Returns nothing on success so the test can go on.
+ """
+ await_sequence_of_reports(report_queue=self.attribute_queue, endpoint_id=self._endpoint_id,
+ attribute=attribute, sequence=sequence, timeout_sec=timeout_sec)
+
@property
def attribute_queue(self) -> queue.Queue:
return self._q