Added TC-ACL-2-11 test script (#35163)
Implement the M-ACL / Managed NIM / ARL test plan.
The test plan (currently in review form) can be found here:
CHIP-Specifications/chip-test-plans#4316
--------
* Added TC-ACL-2-11 test script
* Restyled by autopep8
* Restyled by isort
* fixed linter issues
* review updates
* Use network-manager-app for TC_ACL_2_11.py
* Fix REPL build commands for network manager
* add trace support to network-manager-app
---------
Co-authored-by: Restyled.io <commits@restyled.io>
Co-authored-by: tennessee.carmelveilleux@gmail.com <tennessee.carmelveilleux@gmail.com>
diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml
index dbbda0a..b37ed3c 100644
--- a/.github/workflows/tests.yaml
+++ b/.github/workflows/tests.yaml
@@ -485,6 +485,7 @@
--target linux-x64-energy-management-ipv6only-no-ble-no-wifi-tsan-clang-test \
--target linux-x64-microwave-oven-ipv6only-no-ble-no-wifi-tsan-clang-test \
--target linux-x64-rvc-ipv6only-no-ble-no-wifi-tsan-clang-test \
+ --target linux-x64-network-manager-ipv6only-no-ble-no-wifi-tsan-clang-test \
--target linux-x64-python-bindings \
build \
--copy-artifacts-to objdir-clone \
@@ -498,6 +499,7 @@
echo "LIT_ICD_APP: out/linux-x64-lit-icd-ipv6only-no-ble-no-wifi-tsan-clang-test/lit-icd-app" >> /tmp/test_env.yaml
echo "CHIP_MICROWAVE_OVEN_APP: out/linux-x64-microwave-oven-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-microwave-oven-app" >> /tmp/test_env.yaml
echo "CHIP_RVC_APP: out/linux-x64-rvc-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-rvc-app" >> /tmp/test_env.yaml
+ echo "NETWORK_MANAGEMENT_APP: out/linux-x64-network-manager-ipv6only-no-ble-no-wifi-tsan-clang-test/matter-network-manager-app" >> /tmp/test_env.yaml
echo "TRACE_APP: out/trace_data/app-{SCRIPT_BASE_NAME}" >> /tmp/test_env.yaml
echo "TRACE_TEST_JSON: out/trace_data/test-{SCRIPT_BASE_NAME}" >> /tmp/test_env.yaml
echo "TRACE_TEST_PERFETTO: out/trace_data/test-{SCRIPT_BASE_NAME}" >> /tmp/test_env.yaml
diff --git a/examples/network-manager-app/linux/args.gni b/examples/network-manager-app/linux/args.gni
index e97ddb1..e463c7d 100644
--- a/examples/network-manager-app/linux/args.gni
+++ b/examples/network-manager-app/linux/args.gni
@@ -25,3 +25,5 @@
# This enables AccessRestrictionList (ARL) support used by the NIM sample app
chip_enable_access_restrictions = true
+
+matter_enable_tracing_support = true
diff --git a/src/python_testing/TC_ACL_2_11.py b/src/python_testing/TC_ACL_2_11.py
new file mode 100644
index 0000000..a979b1c
--- /dev/null
+++ b/src/python_testing/TC_ACL_2_11.py
@@ -0,0 +1,172 @@
+#
+# Copyright (c) 2024 Project CHIP Authors
+# All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# See https://github.com/project-chip/connectedhomeip/blob/master/docs/testing/python.md#defining-the-ci-test-arguments
+# for details about the block below.
+#
+# === BEGIN CI TEST ARGUMENTS ===
+# test-runner-runs: run1
+# test-runner-run/run1/app: ${NETWORK_MANAGEMENT_APP}
+# test-runner-run/run1/factoryreset: True
+# test-runner-run/run1/quiet: True
+# test-runner-run/run1/app-args: --discriminator 1234 --KVS kvs1 --trace-to json:${TRACE_APP}.json --commissioning-arl-entries "[{\"endpoint\": 1,\"cluster\": 1105,\"restrictions\": [{\"type\": 0,\"id\": 0}]}]" --arl-entries "[{\"endpoint\": 1,\"cluster\": 1105,\"restrictions\": [{\"type\": 0,\"id\": 0}]}]"
+# test-runner-run/run1/script-args: --storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --trace-to json:${TRACE_TEST_JSON}.json --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto
+# === END CI TEST ARGUMENTS ===
+
+import logging
+import queue
+
+import chip.clusters as Clusters
+from chip.clusters.Attribute import EventReadResult, SubscriptionTransaction
+from chip.clusters.ClusterObjects import ALL_ACCEPTED_COMMANDS, ALL_ATTRIBUTES, ALL_CLUSTERS, ClusterEvent
+from chip.clusters.Objects import AccessControl
+from chip.interaction_model import Status
+from matter_testing_support import MatterBaseTest, TestStep, async_test_body, default_matter_test_main
+from mobly import asserts
+
+
+class EventChangeCallback:
+ def __init__(self, expected_event: ClusterEvent, output: queue.Queue):
+ self._output = output
+ self._expected_cluster_id = expected_event.cluster_id
+ self._expected_event_id = expected_event.event_id
+
+ def __call__(self, res: EventReadResult, transaction: SubscriptionTransaction):
+ if res.Status == Status.Success and res.Header.ClusterId == self._expected_cluster_id and res.Header.EventId == self._expected_event_id:
+ logging.info(
+ f'Got subscription report for event {self._expected_event_id} on cluster {self._expected_cluster_id}: {res.Data}')
+ self._output.put(res)
+
+
+def WaitForEventReport(q: queue.Queue, expected_event: ClusterEvent):
+ try:
+ res = q.get(block=True, timeout=10)
+ except queue.Empty:
+ asserts.fail("Failed to receive a report for the event {}".format(expected_event))
+
+ asserts.assert_equal(res.Header.ClusterId, expected_event.cluster_id, "Expected cluster ID not found in event report")
+ asserts.assert_equal(res.Header.EventId, expected_event.event_id, "Expected event ID not found in event report")
+
+
+class TC_ACL_2_11(MatterBaseTest):
+
+ def desc_TC_ACL_2_11(self) -> str:
+ return "[TC-ACL-2.11] Verification of Managed Device feature"
+
+ def steps_TC_ACL_2_11(self) -> list[TestStep]:
+ steps = [
+ TestStep(1, "Commissioning, already done"),
+ TestStep(2, "TH1 reads DUT Endpoint 0 AccessControl cluster CommissioningARL attribute"),
+ TestStep(3, "TH1 reads DUT Endpoint 0 AccessControl cluster ARL attribute"),
+ TestStep(4, "For each entry in ARL, iterate over each restriction and attempt access the restriction's ID on the Endpoint and Cluster in the ARL entry.",
+ "If the restriction is Type AttributeAccessForbidden, read the restriction's attribute ID and verify the response is UNSUPPORTED_ACCESS."
+ "If the restriction is Type AttributeWriteForbidden, write restriction's the attribute ID and verify the response is UNSUPPORTED_ACCESS."
+ "If the restriction is Type CommandForbidden, invoke the restriction's command ID and verify the response is UNSUPPORTED_ACCESS."),
+ TestStep(5, "TH1 sends DUT Endpoint 0 AccessControl cluster command ReviewFabricRestrictions"),
+ TestStep(6, "Wait for up to 1 hour. Follow instructions provided by device maker to remove all access restrictions",
+ "AccessRestrictionReviewUpdate event is received"),
+ TestStep(7, "TH1 reads DUT Endpoint 0 AccessControl cluster ARL attribute", "ARL is empty")
+ ]
+ return steps
+
+ @async_test_body
+ async def test_TC_ACL_2_11(self):
+ self.step(1)
+ self.step(2)
+ await self.read_single_attribute_check_success(
+ endpoint=0,
+ cluster=Clusters.AccessControl,
+ attribute=Clusters.AccessControl.Attributes.CommissioningARL
+ )
+ self.step(3)
+ arl = await self.read_single_attribute_check_success(
+ endpoint=0,
+ cluster=Clusters.AccessControl,
+ attribute=Clusters.AccessControl.Attributes.Arl
+ )
+ self.step(4)
+
+ care_struct = None
+
+ for arl_entry in arl:
+ E1 = arl_entry.endpoint
+ C1 = arl_entry.cluster
+ R1 = arl_entry.restrictions
+
+ care_struct = Clusters.AccessControl.Structs.AccessRestrictionEntryStruct(E1, C1, R1)
+
+ cluster = ALL_CLUSTERS[C1]
+
+ for restriction in R1:
+ restriction_type = restriction.type
+ ID1 = restriction.id
+
+ attribute = ALL_ATTRIBUTES[C1][ID1]
+ command = ALL_ACCEPTED_COMMANDS[C1][ID1]
+
+ if restriction_type == AccessControl.Enums.AccessRestrictionTypeEnum.kAttributeAccessForbidden:
+ await self.read_single_attribute_expect_error(cluster=cluster, attribute=attribute, error=Status.UnsupportedAccess, endpoint=E1)
+ elif restriction_type == AccessControl.Enums.AccessRestrictionTypeEnum.kAttributeWriteForbidden:
+ status = await self.write_single_attribute(attribute_value=attribute, endpoint_id=E1)
+ asserts.assert_equal(status, Status.UnsupportedAccess,
+ f"Failed to verify UNSUPPORTED_ACCESS when writing to Attribute {ID1} Cluster {C1} Endpoint {E1}")
+ elif restriction_type == AccessControl.Enums.AccessRestrictionTypeEnum.kCommandForbidden:
+ result = await self.send_single_cmd(cmd=command, endpoint=E1)
+ asserts.assert_equal(result.status, Status.UnsupportedAccess,
+ f"Failed to verify UNSUPPORTED_ACCESS when sending command {ID1} to Cluster {C1} Endpoint {E1}")
+
+ # Belongs to step 6, but needs to be subscribed before executing step 5: begin
+ arru_queue = queue.Queue()
+ arru_cb = EventChangeCallback(Clusters.AccessControl.Events.FabricRestrictionReviewUpdate, arru_queue)
+
+ urgent = 1
+ subscription_arru = await self.default_controller.ReadEvent(nodeid=self.dut_node_id, events=[(0, Clusters.AccessControl.Events.FabricRestrictionReviewUpdate, urgent)], reportInterval=(1, 5), keepSubscriptions=True, autoResubscribe=False)
+ subscription_arru.SetEventUpdateCallback(callback=arru_cb)
+ # end
+
+ # Belongs to step 7, but needs to be subscribed before executing step 5: begin
+ arec_queue = queue.Queue()
+ arec_cb = EventChangeCallback(Clusters.AccessControl.Events.AccessRestrictionEntryChanged, arec_queue)
+
+ urgent = 1
+ subscription_arec = await self.default_controller.ReadEvent(nodeid=self.dut_node_id, events=[(0, Clusters.AccessControl.Events.AccessRestrictionEntryChanged, urgent)], reportInterval=(1, 5), keepSubscriptions=True, autoResubscribe=False)
+ subscription_arec.SetEventUpdateCallback(callback=arec_cb)
+ # end
+
+ self.step(5)
+ response = await self.send_single_cmd(cmd=Clusters.AccessControl.Commands.ReviewFabricRestrictions([care_struct]), endpoint=0)
+ asserts.assert_true(isinstance(response, Clusters.AccessControl.Commands.ReviewFabricRestrictionsResponse),
+ "Result is not of type ReviewFabricRestrictionsResponse")
+
+ self.step(6)
+ logging.info("Please follow instructions provided by the product maker to remove all ARL entries")
+ WaitForEventReport(arru_queue, Clusters.AccessControl.Events.FabricRestrictionReviewUpdate)
+
+ self.step(7)
+ cluster = Clusters.AccessControl
+ attribute = Clusters.AccessControl.Attributes.Arl
+ arl = await self.read_single_attribute_check_success(
+ node_id=self.dut_node_id,
+ endpoint=0,
+ cluster=cluster,
+ attribute=attribute
+ )
+ asserts.assert_equal(arl, [], "Unexpected Arl; Not empty")
+
+
+if __name__ == "__main__":
+ default_matter_test_main()