Updated BRBINFO-4-1 test after timeoutMs added (#35160)
---------
Co-authored-by: Restyled.io <commits@restyled.io>
Co-authored-by: saurabhst <s.kumar9@samsung.com>
diff --git a/src/python_testing/TC_BRBINFO_4_1.py b/src/python_testing/TC_BRBINFO_4_1.py
index ea23015..5e1474a 100644
--- a/src/python_testing/TC_BRBINFO_4_1.py
+++ b/src/python_testing/TC_BRBINFO_4_1.py
@@ -30,6 +30,7 @@
import chip.clusters as Clusters
from chip import ChipDeviceCtrl
+from chip.interaction_model import InteractionModelError, Status
from matter_testing_support import MatterBaseTest, SimpleEventCallback, TestStep, async_test_body, default_matter_test_main
from mobly import asserts
@@ -57,14 +58,25 @@
def steps_TC_BRBINFO_4_1(self) -> list[TestStep]:
steps = [
- TestStep("0", "DUT commissioned", is_commissioning=True),
- TestStep("0a", "Preconditions"),
- TestStep("1a", "TH reads from the ICD the A_IDLE_MODE_DURATION, A_ACTIVE_MODE_DURATION, and ACTIVE_MODE_THRESHOLD attributes"),
- TestStep("1b", "Simple KeepActive command w/ subscription. ActiveChanged event received by TH contains PromisedActiveDuration"),
- TestStep("2", "Sends 3x KeepActive commands w/ subscription. ActiveChanged event received ONCE and contains PromisedActiveDuration"),
- TestStep("3", "TH waits for check-in from TH_ICD to confirm no additional ActiveChanged events are recieved"),
- TestStep("4", "KeepActive not returned after 60 minutes of offline ICD"),
- ]
+ TestStep("0", "DUT commissioned and preconditions", is_commissioning=True),
+ TestStep("1", "TH reads from the ICD the A_IDLE_MODE_DURATION, A_ACTIVE_MODE_DURATION, and ACTIVE_MODE_THRESHOLD attributes"),
+ TestStep("2", "Setting up subscribe to ActiveChange event"),
+ TestStep("3", "Check TimeoutMs too low fails"),
+ TestStep("4", "Check TimeoutMs too high fails"),
+ TestStep("5", "Check KeepActive successful with valid command parameters lowest possible TimeoutMs"),
+ TestStep("6", "Validate previous command results in ActiveChanged event shortly after ICD device checks-in"),
+ TestStep("7", "Check KeepActive successful with valid command parameters highest possible TimeoutMs"),
+ TestStep("8", "Validate previous command results in ActiveChanged event shortly after ICD device checks-in"),
+ TestStep("9", "Send multiple KeepActive commands during window where ICD device will not check in"),
+ TestStep("10", "Validate previous command results in single ActiveChanged event shortly after ICD device checks-in"),
+ TestStep("11", "Validate we received no additional ActiveChanged event after subsequent ICD check in"),
+ TestStep("12", "Send KeepActive command with shortest TimeoutMs value while TH_ICD is prevented from sending check-ins"),
+ TestStep("13", "TH allows TH_ICD to resume sending check-ins after timeout should have expired"),
+ TestStep("14", "Wait for TH_ICD to check into TH twice, then confirm we have had no new ActiveChanged events reported from DUT"),
+ TestStep("15", "Send KeepActive command with shortest TimeoutMs value while TH_ICD is prevented from sending check-ins"),
+ TestStep("16", "Wait 15 seconds then send second KeepActive command with double the TimeoutMs value of the previous step"),
+ TestStep("17", "TH allows TH_ICD to resume sending check-ins after timeout from step 15 expired but before second timeout from step 16 still valid"),
+ TestStep("18", "Wait for TH_ICD to check into TH, then confirm we have received new event from DUT")]
return steps
def _ask_for_vendor_commissioniong_ux_operation(self, discriminator, setupPinCode, setupManualCode, setupQRCode):
@@ -77,9 +89,9 @@
f"If using FabricSync Admin test app, you may type:\n"
f">>> pairing onnetwork 111 {setupPinCode} --icd-registration true")
- async def _send_keep_active_command(self, duration, endpoint_id) -> int:
+ async def _send_keep_active_command(self, stay_active_duration_ms, timeout_ms, endpoint_id) -> int:
logging.info("Sending keep active command")
- keep_active = await self.default_controller.SendCommand(nodeid=self.dut_node_id, endpoint=endpoint_id, payload=Clusters.Objects.BridgedDeviceBasicInformation.Commands.KeepActive(stayActiveDuration=duration))
+ keep_active = await self.default_controller.SendCommand(nodeid=self.dut_node_id, endpoint=endpoint_id, payload=Clusters.Objects.BridgedDeviceBasicInformation.Commands.KeepActive(stayActiveDuration=stay_active_duration_ms, timeoutMs=timeout_ms))
return keep_active
async def _wait_for_active_changed_event(self, timeout_s) -> int:
@@ -189,10 +201,6 @@
logging.info(f"Dynamic endpoint is {dynamic_endpoint_id}")
self.step("0")
-
- # Preconditions
- self.step("0a")
-
logging.info("Ensuring DUT is commissioned to TH")
# Confirms commissioning of DUT on TH as it reads its fature map
@@ -205,15 +213,7 @@
logging.info("Ensuring ICD is commissioned to TH")
- # Confirms commissioning of ICD on TH as it reads its feature map
- await self._read_attribute_expect_success(
- _ROOT_ENDPOINT_ID,
- basic_info_cluster,
- basic_info_attributes.FeatureMap,
- self.icd_nodeid
- )
-
- self.step("1a")
+ self.step("1")
idle_mode_duration_s = await self._read_attribute_expect_success(
_ROOT_ENDPOINT_ID,
@@ -231,9 +231,7 @@
)
logging.info(f"ActiveModeDurationMs: {active_mode_duration_ms}")
- self.step("1b")
-
- # Subscription to ActiveChanged
+ self.step("2")
event = brb_info_cluster.Events.ActiveChanged
self.q = queue.Queue()
urgent = 1
@@ -241,66 +239,107 @@
subscription = await self.default_controller.ReadEvent(nodeid=self.dut_node_id, events=[(dynamic_endpoint_id, event, urgent)], reportInterval=[1, 3])
subscription.SetEventUpdateCallback(callback=cb)
+ self.step("3")
stay_active_duration_ms = 1000
- logging.info(f"Sending KeepActiveCommand({stay_active_duration_ms}ms)")
- await self._send_keep_active_command(stay_active_duration_ms, dynamic_endpoint_id)
+ keep_active_timeout_ms = 29999
+ try:
+ await self._send_keep_active_command(stay_active_duration_ms, keep_active_timeout_ms, dynamic_endpoint_id)
+ asserts.fail("KeepActive with invalid TimeoutMs was expected to fail")
+ except InteractionModelError as e:
+ asserts.assert_equal(e.status, Status.ConstraintError,
+ "DUT sent back an unexpected error, we were expecting ConstraintError")
- logging.info("Waiting for ActiveChanged from DUT...")
- timeout_s = idle_mode_duration_s + max(active_mode_duration_ms, stay_active_duration_ms)/1000
- promised_active_duration_ms = await self._wait_for_active_changed_event(timeout_s)
+ self.step("4")
+ keep_active_timeout_ms = 3600001
+ try:
+ await self._send_keep_active_command(stay_active_duration_ms, keep_active_timeout_ms, dynamic_endpoint_id)
+ asserts.fail("KeepActive with invalid TimeoutMs was expected to fail")
+ except InteractionModelError as e:
+ asserts.assert_equal(e.status, Status.ConstraintError,
+ "DUT sent back an unexpected error, we were expecting ConstraintError")
+ self.step("5")
+ keep_active_timeout_ms = 30000
+ await self._send_keep_active_command(stay_active_duration_ms, keep_active_timeout_ms, dynamic_endpoint_id)
+
+ self.step("6")
+ wait_for_icd_checkin_timeout_s = idle_mode_duration_s + max(active_mode_duration_ms, stay_active_duration_ms)/1000
+ wait_for_dut_event_subscription_s = 5
+ # This will throw exception if timeout is exceeded.
+ await self.default_controller.WaitForActive(self.icd_nodeid, timeoutSeconds=wait_for_icd_checkin_timeout_s, stayActiveDurationMs=5000)
+ promised_active_duration_ms = await self._wait_for_active_changed_event(timeout_s=wait_for_dut_event_subscription_s)
asserts.assert_greater_equal(promised_active_duration_ms, stay_active_duration_ms,
"PromisedActiveDuration < StayActiveDuration")
+ asserts.assert_equal(self.q.qsize(), 0, "Unexpected event received from DUT")
- self.step("2")
+ self.step("7")
+ keep_active_timeout_ms = 3600000
+ await self._send_keep_active_command(stay_active_duration_ms, keep_active_timeout_ms, dynamic_endpoint_id)
- # Prevent icd app from sending any check-in messages.
+ self.step("8")
+ # This will throw exception if timeout is exceeded.
+ await self.default_controller.WaitForActive(self.icd_nodeid, timeoutSeconds=wait_for_icd_checkin_timeout_s, stayActiveDurationMs=5000)
+ promised_active_duration_ms = await self._wait_for_active_changed_event(timeout_s=wait_for_dut_event_subscription_s)
+ asserts.assert_greater_equal(promised_active_duration_ms, stay_active_duration_ms,
+ "PromisedActiveDuration < StayActiveDuration")
+ asserts.assert_equal(self.q.qsize(), 0, "Unexpected event received from DUT")
+
+ self.step("9")
self.pause_th_icd_server(check_state=True)
# sends 3x keep active commands
stay_active_duration_ms = 2000
+ keep_active_timeout_ms = 60000
logging.info(f"Sending first KeepActiveCommand({stay_active_duration_ms})")
- await self._send_keep_active_command(stay_active_duration_ms, dynamic_endpoint_id)
+ await self._send_keep_active_command(stay_active_duration_ms, keep_active_timeout_ms, dynamic_endpoint_id)
logging.info(f"Sending second KeepActiveCommand({stay_active_duration_ms})")
- await self._send_keep_active_command(stay_active_duration_ms, dynamic_endpoint_id)
+ await self._send_keep_active_command(stay_active_duration_ms, keep_active_timeout_ms, dynamic_endpoint_id)
logging.info(f"Sending third KeepActiveCommand({stay_active_duration_ms})")
- await self._send_keep_active_command(stay_active_duration_ms, dynamic_endpoint_id)
+ await self._send_keep_active_command(stay_active_duration_ms, keep_active_timeout_ms, dynamic_endpoint_id)
+
+ self.step("10")
self.resume_th_icd_server(check_state=True)
-
- logging.info("Waiting for ActiveChanged from DUT...")
- promised_active_duration_ms = await self._wait_for_active_changed_event((idle_mode_duration_s + max(active_mode_duration_ms, stay_active_duration_ms))/1000)
+ await self.default_controller.WaitForActive(self.icd_nodeid, timeoutSeconds=wait_for_icd_checkin_timeout_s, stayActiveDurationMs=5000)
+ promised_active_duration_ms = await self._wait_for_active_changed_event(timeout_s=wait_for_dut_event_subscription_s)
asserts.assert_equal(self.q.qsize(), 0, "More than one event received from DUT")
- self.step("3")
- await self.default_controller.WaitForActive(self.icd_nodeid, stayActiveDurationMs=5000)
+ self.step("11")
+ await self.default_controller.WaitForActive(self.icd_nodeid, timeoutSeconds=wait_for_icd_checkin_timeout_s, stayActiveDurationMs=5000)
asserts.assert_equal(self.q.qsize(), 0, "More than one event received from DUT")
- self.step("4")
-
- logging.info("TH waiting for checkin from TH_ICD...")
- await self.default_controller.WaitForActive(self.icd_nodeid, stayActiveDurationMs=10000)
- stay_active_duration_ms = 10000
- logging.info(f"Sending KeepActiveCommand({stay_active_duration_ms})")
- await self._send_keep_active_command(stay_active_duration_ms, dynamic_endpoint_id)
-
+ self.step("12")
self.pause_th_icd_server(check_state=True)
- # If we are seeing assertion below fail test assumption is likely incorrect.
- # Test assumes after TH waits for check-in from TH_ICD it has enough time to
- # call the KeepActive command and pause the app to prevent it from checking in
- # after DUT recieved the KeepActive command. Should this assumption be incorrect
- # we could look into using existing ICDTestEventTriggerEvent, or adding test
- # event trigger that will help suppress check-ins from the TH_ICD_SERVER.
- asserts.assert_equal(self.q.qsize(), 0, "")
+ stay_active_duration_ms = 2000
+ keep_active_timeout_ms = 30000
+ await self._send_keep_active_command(stay_active_duration_ms, keep_active_timeout_ms, dynamic_endpoint_id)
- if not self.is_ci:
- logging.info("Waiting for 60 minutes")
- time.sleep(60*60)
-
+ self.step("13")
+ time.sleep(30)
self.resume_th_icd_server(check_state=True)
- logging.info("TH waiting for first checkin from TH_ICD...")
- await self.default_controller.WaitForActive(self.icd_nodeid, stayActiveDurationMs=10000)
- logging.info("TH waiting for second checkin from TH_ICD...")
- await self.default_controller.WaitForActive(self.icd_nodeid, stayActiveDurationMs=10000)
+ self.step("14")
+ await self.default_controller.WaitForActive(self.icd_nodeid, timeoutSeconds=wait_for_icd_checkin_timeout_s, stayActiveDurationMs=5000)
+ await self.default_controller.WaitForActive(self.icd_nodeid, timeoutSeconds=wait_for_icd_checkin_timeout_s, stayActiveDurationMs=5000)
+ asserts.assert_equal(self.q.qsize(), 0, "Unexpected event received from DUT")
+
+ self.step("15")
+ self.pause_th_icd_server(check_state=True)
+ stay_active_duration_ms = 2000
+ keep_active_timeout_ms = 30000
+ await self._send_keep_active_command(stay_active_duration_ms, keep_active_timeout_ms, dynamic_endpoint_id)
+
+ self.step("16")
+ time.sleep(15)
+ stay_active_duration_ms = 2000
+ keep_active_timeout_ms = 60000
+ await self._send_keep_active_command(stay_active_duration_ms, keep_active_timeout_ms, dynamic_endpoint_id)
+
+ self.step("17")
+ time.sleep(15)
+ self.resume_th_icd_server(check_state=True)
+
+ self.step("18")
+ await self.default_controller.WaitForActive(self.icd_nodeid, timeoutSeconds=wait_for_icd_checkin_timeout_s, stayActiveDurationMs=5000)
+ promised_active_duration_ms = await self._wait_for_active_changed_event(timeout_s=wait_for_dut_event_subscription_s)
asserts.assert_equal(self.q.qsize(), 0, "More than one event received from DUT")