TC-ACE-1.2 (#27157)
* TC-ACE-1.2
* Add TC-ACE-1.2 to CI
* Whoops, added test twice
* Fixes to TC-ACE-1.2
* Set an option to not auto-resubscribe
If we error on initial subscription becuase we're NEVER going to
have access, the python scripts will never return - they will
continue to resubscribe forever and the future never completes.
This flag prevents any resubscription attempts. It's used in this
test becuase I know we're going to get a failure and I'd like to
have it returned.
* Fix python linter
* Restyled by isort
* linter.
* Apply suggestions from code review
Co-authored-by: Marc Lepage <67919234+mlepage-google@users.noreply.github.com>
* Fix weird typo.
---------
Co-authored-by: Restyled.io <commits@restyled.io>
Co-authored-by: Marc Lepage <67919234+mlepage-google@users.noreply.github.com>
diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml
index 5aa759b..b8e5afb 100644
--- a/.github/workflows/tests.yaml
+++ b/.github/workflows/tests.yaml
@@ -442,6 +442,7 @@
scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --app out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace_decode 1" --script "src/python_testing/TC_SC_3_6.py" --script-args "--storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021"'
scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --app out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace_decode 1" --script "src/python_testing/TC_DA_1_7.py" --script-args "--storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --bool-arg allow_sdk_dac:true"'
scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --app out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace_decode 1 --enable-key 000102030405060708090a0b0c0d0e0f" --script "src/python_testing/TC_TestEventTrigger.py" --script-args "--storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --bool-arg allow_sdk_dac:true"'
+ scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --app out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace_decode 1" --script "src/python_testing/TC_ACE_1_2.py" --script-args "--storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021"'
scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --app out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace_decode 1" --script "src/python_testing/TC_ACE_1_4.py" --script-args "--storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --int-arg PIXIT.ACE.APPENDPOINT:1 PIXIT.ACE.APPDEVTYPEID:0x0100 --string-arg PIXIT.ACE.APPCLUSTER:OnOff PIXIT.ACE.APPATTRIBUTE:OnOff"'
scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --app out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace_decode 1" --script "src/python_testing/TC_ACE_1_3.py" --script-args "--storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021"'
scripts/run_in_python_env.sh out/venv './scripts/tests/run_python_test.py --app out/linux-x64-all-clusters-ipv6only-no-ble-no-wifi-tsan-clang-test/chip-all-clusters-app --factoryreset --app-args "--discriminator 1234 --KVS kvs1 --trace_decode 1" --script "src/python_testing/TC_CGEN_2_4.py" --script-args "--storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021"'
diff --git a/src/controller/python/chip/ChipDeviceCtrl.py b/src/controller/python/chip/ChipDeviceCtrl.py
index 4e4f775..1022182 100644
--- a/src/controller/python/chip/ChipDeviceCtrl.py
+++ b/src/controller/python/chip/ChipDeviceCtrl.py
@@ -1016,7 +1016,7 @@
]] = None,
eventNumberFilter: typing.Optional[int] = None,
returnClusterObject: bool = False, reportInterval: typing.Tuple[int, int] = None,
- fabricFiltered: bool = True, keepSubscriptions: bool = False):
+ fabricFiltered: bool = True, keepSubscriptions: bool = False, autoResubscribe: bool = True):
'''
Read a list of attributes and/or events from a target node
@@ -1076,7 +1076,7 @@
subscriptionParameters=ClusterAttribute.SubscriptionParameters(
reportInterval[0], reportInterval[1]) if reportInterval else None,
fabricFiltered=fabricFiltered,
- keepSubscriptions=keepSubscriptions).raise_on_error()
+ keepSubscriptions=keepSubscriptions, autoResubscribe=autoResubscribe).raise_on_error()
return await future
async def ReadAttribute(self, nodeid: int, attributes: typing.List[typing.Union[
@@ -1093,7 +1093,7 @@
]], dataVersionFilters: typing.List[typing.Tuple[int, typing.Type[ClusterObjects.Cluster], int]] = None,
returnClusterObject: bool = False,
reportInterval: typing.Tuple[int, int] = None,
- fabricFiltered: bool = True, keepSubscriptions: bool = False):
+ fabricFiltered: bool = True, keepSubscriptions: bool = False, autoResubscribe: bool = True):
'''
Read a list of attributes from a target node, this is a wrapper of DeviceController.Read()
@@ -1125,7 +1125,8 @@
returnClusterObject=returnClusterObject,
reportInterval=reportInterval,
fabricFiltered=fabricFiltered,
- keepSubscriptions=keepSubscriptions)
+ keepSubscriptions=keepSubscriptions,
+ autoResubscribe=autoResubscribe)
if isinstance(res, ClusterAttribute.SubscriptionTransaction):
return res
else:
@@ -1146,7 +1147,8 @@
]], eventNumberFilter: typing.Optional[int] = None,
fabricFiltered: bool = True,
reportInterval: typing.Tuple[int, int] = None,
- keepSubscriptions: bool = False):
+ keepSubscriptions: bool = False,
+ autoResubscribe: bool = True):
'''
Read a list of events from a target node, this is a wrapper of DeviceController.Read()
@@ -1174,7 +1176,8 @@
When not provided, a read request will be sent.
'''
res = await self.Read(nodeid=nodeid, events=events, eventNumberFilter=eventNumberFilter,
- fabricFiltered=fabricFiltered, reportInterval=reportInterval, keepSubscriptions=keepSubscriptions)
+ fabricFiltered=fabricFiltered, reportInterval=reportInterval, keepSubscriptions=keepSubscriptions,
+ autoResubscribe=autoResubscribe)
if isinstance(res, ClusterAttribute.SubscriptionTransaction):
return res
else:
diff --git a/src/controller/python/chip/clusters/Attribute.py b/src/controller/python/chip/clusters/Attribute.py
index 6f2b52f..4572065 100644
--- a/src/controller/python/chip/clusters/Attribute.py
+++ b/src/controller/python/chip/clusters/Attribute.py
@@ -615,8 +615,8 @@
return f'<Subscription (Id={self._subscriptionId})>'
-async def DefaultResubscriptionAttemptedCallback(transaction: SubscriptionTransaction,
- terminationError, nextResubscribeIntervalMsec):
+def DefaultResubscriptionAttemptedCallback(transaction: SubscriptionTransaction,
+ terminationError, nextResubscribeIntervalMsec):
print(f"Previous subscription failed with Error: {terminationError} - re-subscribing in {nextResubscribeIntervalMsec}ms...")
@@ -773,6 +773,8 @@
self._handleSubscriptionEstablished, subscriptionId)
def handleResubscriptionAttempted(self, terminationCause: PyChipError, nextResubscribeIntervalMsec: int):
+ if not self._subscription_handler:
+ return
if (self._subscription_handler._onResubscriptionAttemptedCb_isAsync):
self._event_loop.create_task(self._subscription_handler._onResubscriptionAttemptedCb(
self._subscription_handler, terminationCause.code, nextResubscribeIntervalMsec))
@@ -1040,6 +1042,7 @@
"IsSubscription" / construct.Flag,
"IsFabricFiltered" / construct.Flag,
"KeepSubscriptions" / construct.Flag,
+ "AutoResubscribe" / construct.Flag,
)
@@ -1047,7 +1050,7 @@
attributes: List[AttributePath] = None, dataVersionFilters: List[DataVersionFilter] = None,
events: List[EventPath] = None, eventNumberFilter: Optional[int] = None, returnClusterObject: bool = True,
subscriptionParameters: SubscriptionParameters = None,
- fabricFiltered: bool = True, keepSubscriptions: bool = False) -> PyChipError:
+ fabricFiltered: bool = True, keepSubscriptions: bool = False, autoResubscribe: bool = True) -> PyChipError:
if (not attributes) and dataVersionFilters:
raise ValueError(
"Must provide valid attribute list when data version filters is not null")
@@ -1119,6 +1122,7 @@
if subscriptionParameters is not None:
params.MinInterval = subscriptionParameters.MinReportIntervalFloorSeconds
params.MaxInterval = subscriptionParameters.MaxReportIntervalCeilingSeconds
+ params.AutoResubscribe = autoResubscribe
params.IsSubscription = True
params.KeepSubscriptions = keepSubscriptions
params.IsFabricFiltered = fabricFiltered
diff --git a/src/controller/python/chip/clusters/attribute.cpp b/src/controller/python/chip/clusters/attribute.cpp
index d30655b..d85d132 100644
--- a/src/controller/python/chip/clusters/attribute.cpp
+++ b/src/controller/python/chip/clusters/attribute.cpp
@@ -149,10 +149,17 @@
CHIP_ERROR OnResubscriptionNeeded(ReadClient * apReadClient, CHIP_ERROR aTerminationCause) override
{
- ReturnErrorOnFailure(ReadClient::Callback::OnResubscriptionNeeded(apReadClient, aTerminationCause));
+ if (mAutoResubscribe)
+ {
+ ReturnErrorOnFailure(ReadClient::Callback::OnResubscriptionNeeded(apReadClient, aTerminationCause));
+ }
gOnResubscriptionAttemptedCallback(mAppContext, ToPyChipError(aTerminationCause),
apReadClient->ComputeTimeTillNextSubscription());
- return CHIP_NO_ERROR;
+ if (mAutoResubscribe)
+ {
+ return CHIP_NO_ERROR;
+ }
+ return aTerminationCause;
}
void OnEventData(const EventHeader & aEventHeader, TLV::TLVReader * apData, const StatusIB * apStatus) override
@@ -226,12 +233,15 @@
void AdoptReadClient(std::unique_ptr<ReadClient> apReadClient) { mReadClient = std::move(apReadClient); }
+ void SetAutoResubscribe(bool autoResubscribe) { mAutoResubscribe = autoResubscribe; }
+
private:
BufferedReadCallback mBufferedReadCallback;
PyObject * mAppContext;
std::unique_ptr<ReadClient> mReadClient;
+ bool mAutoResubscribe = true;
};
extern "C" {
@@ -243,6 +253,7 @@
bool isSubscription;
bool isFabricFiltered;
bool keepSubscriptions;
+ bool autoResubscribe;
};
// Encodes n attribute write requests, follows 3 * n arguments, in the (AttributeWritePath*=void *, uint8_t*, size_t) order.
@@ -564,6 +575,7 @@
params.mMinIntervalFloorSeconds = pyParams.minInterval;
params.mMaxIntervalCeilingSeconds = pyParams.maxInterval;
params.mKeepSubscriptions = pyParams.keepSubscriptions;
+ callback->SetAutoResubscribe(pyParams.autoResubscribe);
dataVersionFilters.release();
attributePaths.release();
diff --git a/src/python_testing/TC_ACE_1_2.py b/src/python_testing/TC_ACE_1_2.py
new file mode 100644
index 0000000..1bf5189
--- /dev/null
+++ b/src/python_testing/TC_ACE_1_2.py
@@ -0,0 +1,272 @@
+#
+# Copyright (c) 2022 Project CHIP Authors
+# All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import logging
+import queue
+
+import chip.clusters as Clusters
+from chip.clusters import ClusterObjects as ClusterObjects
+from chip.clusters.Attribute import EventReadResult, SubscriptionTransaction, TypedAttributePath
+from chip.exceptions import ChipStackError
+from chip.interaction_model import Status
+from matter_testing_support import MatterBaseTest, async_test_body, default_matter_test_main
+from mobly import asserts
+
+
+class AttributeChangeCallback:
+ def __init__(self, expected_attribute: ClusterObjects.ClusterAttributeDescriptor, output: queue.Queue):
+ self._output = output
+ self._expected_attribute = expected_attribute
+
+ def __call__(self, path: TypedAttributePath, transaction: SubscriptionTransaction):
+ if path.AttributeType == self._expected_attribute:
+ q = (path, transaction)
+ logging.info(f'Got subscription report for {path.AttributeType}')
+ self._output.put(q)
+
+
+class EventChangeCallback:
+ def __init__(self, expected_event: ClusterObjects.ClusterEvent, output: queue.Queue):
+ self._output = output
+ self._expected_cluster_id = expected_event.cluster_id
+ self._expected_event_id = expected_event.event_id
+
+ def __call__(self, res: EventReadResult, transaction: SubscriptionTransaction):
+ if res.Status == Status.Success and res.Header.ClusterId == self._expected_cluster_id and res.Header.EventId == self._expected_event_id:
+ logging.info(
+ f'Got subscription report for event {self._expected_event_id} on cluster {self._expected_cluster_id}: {res.Data}')
+ self._output.put(res)
+
+
+def WaitForAttributeReport(q: queue.Queue, expected_attribute: ClusterObjects.ClusterAttributeDescriptor):
+ try:
+ path, transaction = q.get(block=True, timeout=10)
+ except queue.Empty:
+ asserts.fail("Failed to receive a report for the attribute change for {}".format(expected_attribute))
+
+ asserts.assert_equal(path.AttributeType, expected_attribute, "Received incorrect attribute report")
+ try:
+ transaction.GetAttribute(path)
+ except KeyError:
+ asserts.fail("Attribute not found in returned report")
+
+
+def WaitForEventReport(q: queue.Queue, expected_event: ClusterObjects.ClusterEvent):
+ try:
+ res = q.get(block=True, timeout=10)
+ except queue.Empty:
+ asserts.fail("Failed to receive a report for the event {}".format(expected_event))
+
+ asserts.assert_equal(res.Header.ClusterId, expected_event.cluster_id, "Expected cluster ID not found in event report")
+ asserts.assert_equal(res.Header.EventId, expected_event.event_id, "Expected event ID not found in event report")
+
+
+class TC_ACE_1_2(MatterBaseTest):
+ def __init__(self, *args):
+ self.breadcrumb = 1
+ self.breadcrumb_queue = queue.Queue()
+ self.subscription_breadcrumb = None
+ super().__init__(*args)
+
+ async def write_acl(self, acl):
+ # This returns an attribute status
+ result = await self.default_controller.WriteAttribute(self.dut_node_id, [(0, Clusters.AccessControl.Attributes.Acl(acl))])
+ asserts.assert_equal(result[0].Status, Status.Success, "ACL write failed")
+
+ async def steps_subscribe_breadcrumb(self, print_steps: bool):
+ if print_steps:
+ self.print_step(3, "TH2 subscribes to the Breadcrumb attribute")
+ self.subscription_breadcrumb = await self.TH2.ReadAttribute(nodeid=self.dut_node_id, attributes=[(0, Clusters.GeneralCommissioning.Attributes.Breadcrumb)], reportInterval=(1, 5), keepSubscriptions=False, autoResubscribe=False)
+ breadcrumb_cb = AttributeChangeCallback(Clusters.GeneralCommissioning.Attributes.Breadcrumb, self.breadcrumb_queue)
+ self.subscription_breadcrumb.SetAttributeUpdateCallback(breadcrumb_cb)
+
+ async def steps_receive_breadcrumb(self, print_steps: bool):
+ if print_steps:
+ self.print_step(9, "TH1 writes the breadcrumb attribute")
+ await self.default_controller.WriteAttribute(nodeid=self.dut_node_id, attributes=[(0, Clusters.GeneralCommissioning.Attributes.Breadcrumb(self.breadcrumb))])
+
+ if print_steps:
+ self.print_step(10, "TH2 waits for a subscription report from the DUT for breadcrumb")
+ WaitForAttributeReport(self.breadcrumb_queue, Clusters.GeneralCommissioning.Attributes.Breadcrumb)
+ self.breadcrumb = self.breadcrumb + 1
+
+ async def steps_admin_subscription_error(self, print_steps: bool):
+ if print_steps:
+ self.print_step(13, "Subscribe to the ACL attribute, expect INVALID_ACTION")
+ try:
+ await self.TH2.ReadAttribute(nodeid=self.dut_node_id, attributes=[(0, Clusters.AccessControl.Attributes.Acl)], reportInterval=(1, 5), fabricFiltered=False, keepSubscriptions=False, autoResubscribe=False)
+ asserts.fail("Incorrectly subscribed to attribute with invalid permissions")
+ except ChipStackError as e:
+ asserts.assert_equal(e.err, 0x580, "Incorrect error message received from subscription with no permission")
+
+ if print_steps:
+ self.print_step(14, "Subscribe to the AccessControlEntryChanged event, expect INVALID_ACTION")
+ try:
+ await self.TH2.ReadEvent(nodeid=self.dut_node_id, events=[(0, Clusters.AccessControl.Events.AccessControlEntryChanged)], reportInterval=(
+ 1, 5), fabricFiltered=False, keepSubscriptions=False, autoResubscribe=False)
+ asserts.fail("Incorrectly subscribed to event with invalid permissions")
+ except ChipStackError as e:
+ asserts.assert_equal(e.err, 0x580, "Incorrect error message received from subscription with no permission")
+
+ @async_test_body
+ async def test_TC_ACE_1_2(self):
+ self.print_step(1, "Commissioning, already done")
+
+ fabric_admin = self.certificate_authority_manager.activeCaList[0].adminList[0]
+
+ TH1_nodeid = self.matter_test_config.controller_node_id
+ TH2_nodeid = self.matter_test_config.controller_node_id + 1
+
+ self.TH2 = fabric_admin.NewController(nodeId=TH2_nodeid,
+ paaTrustStorePath=str(self.matter_test_config.paa_trust_store_path))
+
+ self.print_step(2, "TH1 writes ACL for admin with two subjects")
+ TH1_2_admin = Clusters.AccessControl.Structs.AccessControlEntryStruct(
+ privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kAdminister,
+ authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase,
+ subjects=[TH1_nodeid, TH2_nodeid],
+ targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)])
+ await self.write_acl([TH1_2_admin])
+
+ # Step 3 - subscribe to breadcrumb - print handled in function
+ await self.steps_subscribe_breadcrumb(print_steps=True)
+
+ self.print_step(4, "TH2 subscribes to ACL attribute")
+ subscription_acl = await self.TH2.ReadAttribute(nodeid=self.dut_node_id, attributes=[(0, Clusters.AccessControl.Attributes.Acl)], reportInterval=(1, 5), fabricFiltered=False, keepSubscriptions=True, autoResubscribe=False)
+ acl_queue = queue.Queue()
+ acl_cb = AttributeChangeCallback(Clusters.AccessControl.Attributes.Acl, acl_queue)
+ subscription_acl.SetAttributeUpdateCallback(acl_cb)
+
+ self.print_step(5, "TH2 subscribes to the AccessControlEntryChanged event")
+ subscription_ace = await self.TH2.ReadEvent(nodeid=self.dut_node_id, events=[(0, Clusters.AccessControl.Events.AccessControlEntryChanged)], reportInterval=(1, 5), fabricFiltered=False, keepSubscriptions=True, autoResubscribe=False)
+ ace_queue = queue.Queue()
+ ace_cb = EventChangeCallback(Clusters.AccessControl.Events.AccessControlEntryChanged, ace_queue)
+ subscription_ace.SetEventUpdateCallback(ace_cb)
+
+ self.print_step(6, "TH1 writes ACL attribute")
+ acl = Clusters.AccessControl.Structs.AccessControlEntryStruct(
+ privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kAdminister,
+ authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase,
+ subjects=[TH1_nodeid, TH2_nodeid, TH2_nodeid + 1],
+ targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)])
+ await self.write_acl([acl])
+
+ self.print_step(7, "TH2 waits for subscription report for ACL")
+ WaitForAttributeReport(acl_queue, Clusters.AccessControl.Attributes.Acl)
+
+ self.print_step(8, "TH2 waits for subscription report for access control entry changed event")
+ WaitForEventReport(ace_queue, Clusters.AccessControl.Events.AccessControlEntryChanged)
+
+ # this function prints the steps for 9 and 10
+ await self.steps_receive_breadcrumb(print_steps=True)
+
+ self.print_step(11, "TH1 writes ACL attribute")
+ acl1 = Clusters.AccessControl.Structs.AccessControlEntryStruct(
+ privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kAdminister,
+ authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase,
+ subjects=[TH1_nodeid],
+ targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)])
+ acl2 = Clusters.AccessControl.Structs.AccessControlEntryStruct(
+ privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kManage,
+ authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase,
+ subjects=[TH2_nodeid],
+ targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)])
+ await self.write_acl([acl1, acl2])
+
+ self.print_step(12, "TH2 Repeats steps to change breadcrumb and receive subscription report")
+ await self.steps_receive_breadcrumb(print_steps=False)
+
+ # step 13 and 14 - printed in the function
+ await self.steps_admin_subscription_error(print_steps=True)
+
+ self.print_step(15, "TH2 subscribes to breadcrumb attribute")
+ await self.steps_subscribe_breadcrumb(print_steps=False)
+
+ self.print_step(16, "TH2 Repeats steps to change breadcrumb and receive subscription report")
+ await self.steps_receive_breadcrumb(print_steps=False)
+
+ self.print_step(17, "TH1 writes ACL attribute")
+ acl1 = Clusters.AccessControl.Structs.AccessControlEntryStruct(
+ privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kAdminister,
+ authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase,
+ subjects=[TH1_nodeid],
+ targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)])
+ acl2 = Clusters.AccessControl.Structs.AccessControlEntryStruct(
+ privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kOperate,
+ authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase,
+ subjects=[TH2_nodeid],
+ targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)])
+ await self.write_acl([acl1, acl2])
+
+ self.print_step(18, "TH2 Repeats steps to change breadcrumb and receive subscription report")
+ await self.steps_receive_breadcrumb(print_steps=False)
+
+ self.print_step(19, "TH2 repeats subscriptions to Admin attribute and event to ensure it still errors")
+ await self.steps_admin_subscription_error(print_steps=False)
+
+ self.print_step(20, "TH2 subscribes to breadcrumb attribute")
+ await self.steps_subscribe_breadcrumb(print_steps=False)
+
+ self.print_step(21, "TH2 Repeats steps to change breadcrumb and receive subscription report")
+ await self.steps_receive_breadcrumb(print_steps=False)
+
+ self.print_step(22, "TH1 writes ACL attribute")
+ acl1 = Clusters.AccessControl.Structs.AccessControlEntryStruct(
+ privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kAdminister,
+ authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase,
+ subjects=[TH1_nodeid],
+ targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)])
+ acl2 = Clusters.AccessControl.Structs.AccessControlEntryStruct(
+ privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView,
+ authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase,
+ subjects=[TH2_nodeid],
+ targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)])
+ await self.write_acl([acl1, acl2])
+
+ self.print_step(23, "TH2 Repeats steps to change breadcrumb and receive subscription report")
+ await self.steps_receive_breadcrumb(print_steps=False)
+
+ self.print_step(24, "TH2 repeats subscriptions to Admin attribute and event to ensure it still errors")
+ await self.steps_admin_subscription_error(print_steps=False)
+
+ self.print_step(25, "TH2 subscribes to breadcrumb attribute")
+ await self.steps_subscribe_breadcrumb(print_steps=False)
+
+ self.print_step(26, "TH2 Repeats steps to change breadcrumb and receive subscription report")
+ await self.steps_receive_breadcrumb(print_steps=False)
+
+ self.print_step(27, "TH1 writes ACL attribute")
+ acl = Clusters.AccessControl.Structs.AccessControlEntryStruct(
+ privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kAdminister,
+ authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase,
+ subjects=[TH1_nodeid],
+ targets=[])
+ await self.write_acl([acl])
+
+ self.print_step(28, "TH2 repeats subscriptions to Admin attribute and event to ensure it still errors")
+ await self.steps_admin_subscription_error(print_steps=False)
+
+ self.print_step(29, "TH2 attempts to subscribe to the breadcrumb attribute - expect error")
+ try:
+ await self.TH2.ReadAttribute(nodeid=self.dut_node_id, attributes=[(0, Clusters.GeneralCommissioning.Attributes.Breadcrumb)], reportInterval=(1, 5), keepSubscriptions=False, autoResubscribe=False)
+ asserts.fail("Incorrectly subscribed to attribute with invalid permissions")
+ except ChipStackError as e:
+ asserts.assert_equal(e.err, 0x580, "Incorrect error message received from subscription with no permission")
+
+
+if __name__ == "__main__":
+ default_matter_test_main()