blob: 57a434c05cc4cdb424964f54659bd2d02fa47f7a [file] [log] [blame]
#
# Copyright (c) 2022 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
import queue
import chip.clusters as Clusters
from chip.clusters import ClusterObjects as ClusterObjects
from chip.clusters.Attribute import EventReadResult, SubscriptionTransaction, TypedAttributePath
from chip.exceptions import ChipStackError
from chip.interaction_model import Status
from matter_testing_support import MatterBaseTest, async_test_body, default_matter_test_main
from mobly import asserts
class AttributeChangeCallback:
def __init__(self, expected_attribute: ClusterObjects.ClusterAttributeDescriptor, output: queue.Queue):
self._output = output
self._expected_attribute = expected_attribute
def __call__(self, path: TypedAttributePath, transaction: SubscriptionTransaction):
if path.AttributeType == self._expected_attribute:
q = (path, transaction)
logging.info(f'Got subscription report for {path.AttributeType}')
self._output.put(q)
class EventChangeCallback:
def __init__(self, expected_event: ClusterObjects.ClusterEvent, output: queue.Queue):
self._output = output
self._expected_cluster_id = expected_event.cluster_id
self._expected_event_id = expected_event.event_id
def __call__(self, res: EventReadResult, transaction: SubscriptionTransaction):
if res.Status == Status.Success and res.Header.ClusterId == self._expected_cluster_id and res.Header.EventId == self._expected_event_id:
logging.info(
f'Got subscription report for event {self._expected_event_id} on cluster {self._expected_cluster_id}: {res.Data}')
self._output.put(res)
def WaitForAttributeReport(q: queue.Queue, expected_attribute: ClusterObjects.ClusterAttributeDescriptor):
try:
path, transaction = q.get(block=True, timeout=10)
except queue.Empty:
asserts.fail("Failed to receive a report for the attribute change for {}".format(expected_attribute))
asserts.assert_equal(path.AttributeType, expected_attribute, "Received incorrect attribute report")
try:
transaction.GetAttribute(path)
except KeyError:
asserts.fail("Attribute not found in returned report")
def WaitForEventReport(q: queue.Queue, expected_event: ClusterObjects.ClusterEvent):
try:
res = q.get(block=True, timeout=10)
except queue.Empty:
asserts.fail("Failed to receive a report for the event {}".format(expected_event))
asserts.assert_equal(res.Header.ClusterId, expected_event.cluster_id, "Expected cluster ID not found in event report")
asserts.assert_equal(res.Header.EventId, expected_event.event_id, "Expected event ID not found in event report")
class TC_ACE_1_2(MatterBaseTest):
def __init__(self, *args):
self.breadcrumb = 1
self.breadcrumb_queue = queue.Queue()
self.subscription_breadcrumb = None
super().__init__(*args)
async def write_acl(self, acl):
# This returns an attribute status
result = await self.default_controller.WriteAttribute(self.dut_node_id, [(0, Clusters.AccessControl.Attributes.Acl(acl))])
asserts.assert_equal(result[0].Status, Status.Success, "ACL write failed")
async def steps_subscribe_breadcrumb(self, print_steps: bool):
if print_steps:
self.print_step(3, "TH2 subscribes to the Breadcrumb attribute")
self.subscription_breadcrumb = await self.TH2.ReadAttribute(nodeid=self.dut_node_id, attributes=[(0, Clusters.GeneralCommissioning.Attributes.Breadcrumb)], reportInterval=(1, 5), keepSubscriptions=False, autoResubscribe=False)
breadcrumb_cb = AttributeChangeCallback(Clusters.GeneralCommissioning.Attributes.Breadcrumb, self.breadcrumb_queue)
self.subscription_breadcrumb.SetAttributeUpdateCallback(breadcrumb_cb)
async def steps_receive_breadcrumb(self, print_steps: bool):
if print_steps:
self.print_step(9, "TH1 writes the breadcrumb attribute")
await self.default_controller.WriteAttribute(nodeid=self.dut_node_id, attributes=[(0, Clusters.GeneralCommissioning.Attributes.Breadcrumb(self.breadcrumb))])
if print_steps:
self.print_step(10, "TH2 waits for a subscription report from the DUT for breadcrumb")
WaitForAttributeReport(self.breadcrumb_queue, Clusters.GeneralCommissioning.Attributes.Breadcrumb)
self.breadcrumb = self.breadcrumb + 1
async def steps_admin_subscription_error(self, print_steps: bool):
if print_steps:
self.print_step(13, "Subscribe to the ACL attribute, expect INVALID_ACTION")
try:
await self.TH2.ReadAttribute(nodeid=self.dut_node_id, attributes=[(0, Clusters.AccessControl.Attributes.Acl)], reportInterval=(1, 5), fabricFiltered=False, keepSubscriptions=False, autoResubscribe=False)
asserts.fail("Incorrectly subscribed to attribute with invalid permissions")
except ChipStackError as e:
asserts.assert_equal(e.err, 0x580, "Incorrect error message received from subscription with no permission")
if print_steps:
self.print_step(14, "Subscribe to the AccessControlEntryChanged event, expect INVALID_ACTION")
try:
await self.TH2.ReadEvent(nodeid=self.dut_node_id, events=[(0, Clusters.AccessControl.Events.AccessControlEntryChanged)], reportInterval=(
1, 5), fabricFiltered=False, keepSubscriptions=False, autoResubscribe=False)
asserts.fail("Incorrectly subscribed to event with invalid permissions")
except ChipStackError as e:
asserts.assert_equal(e.err, 0x580, "Incorrect error message received from subscription with no permission")
@async_test_body
async def test_TC_ACE_1_2(self):
self.print_step(1, "Commissioning, already done")
fabric_admin = self.certificate_authority_manager.activeCaList[0].adminList[0]
TH1_nodeid = self.matter_test_config.controller_node_id
TH2_nodeid = self.matter_test_config.controller_node_id + 1
self.TH2 = fabric_admin.NewController(nodeId=TH2_nodeid,
paaTrustStorePath=str(self.matter_test_config.paa_trust_store_path))
self.print_step(2, "TH1 writes ACL for admin with two subjects")
TH1_2_admin = Clusters.AccessControl.Structs.AccessControlEntryStruct(
privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kAdminister,
authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase,
subjects=[TH1_nodeid, TH2_nodeid],
targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)])
await self.write_acl([TH1_2_admin])
# Step 3 - subscribe to breadcrumb - print handled in function
await self.steps_subscribe_breadcrumb(print_steps=True)
self.print_step(4, "TH2 subscribes to ACL attribute")
subscription_acl = await self.TH2.ReadAttribute(nodeid=self.dut_node_id, attributes=[(0, Clusters.AccessControl.Attributes.Acl)], reportInterval=(1, 5), fabricFiltered=False, keepSubscriptions=True, autoResubscribe=False)
acl_queue = queue.Queue()
acl_cb = AttributeChangeCallback(Clusters.AccessControl.Attributes.Acl, acl_queue)
subscription_acl.SetAttributeUpdateCallback(acl_cb)
self.print_step(5, "TH2 subscribes to the AccessControlEntryChanged event")
urgent = 1
subscription_ace = await self.TH2.ReadEvent(nodeid=self.dut_node_id, events=[(0, Clusters.AccessControl.Events.AccessControlEntryChanged, urgent)], reportInterval=(1, 5), fabricFiltered=False, keepSubscriptions=True, autoResubscribe=False)
ace_queue = queue.Queue()
ace_cb = EventChangeCallback(Clusters.AccessControl.Events.AccessControlEntryChanged, ace_queue)
subscription_ace.SetEventUpdateCallback(ace_cb)
self.print_step(6, "TH1 writes ACL attribute")
acl = Clusters.AccessControl.Structs.AccessControlEntryStruct(
privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kAdminister,
authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase,
subjects=[TH1_nodeid, TH2_nodeid, TH2_nodeid + 1],
targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)])
await self.write_acl([acl])
self.print_step(7, "TH2 waits for subscription report for ACL")
WaitForAttributeReport(acl_queue, Clusters.AccessControl.Attributes.Acl)
self.print_step(8, "TH2 waits for subscription report for access control entry changed event")
WaitForEventReport(ace_queue, Clusters.AccessControl.Events.AccessControlEntryChanged)
# this function prints the steps for 9 and 10
await self.steps_receive_breadcrumb(print_steps=True)
self.print_step(11, "TH1 writes ACL attribute")
acl1 = Clusters.AccessControl.Structs.AccessControlEntryStruct(
privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kAdminister,
authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase,
subjects=[TH1_nodeid],
targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)])
acl2 = Clusters.AccessControl.Structs.AccessControlEntryStruct(
privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kManage,
authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase,
subjects=[TH2_nodeid],
targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)])
await self.write_acl([acl1, acl2])
self.print_step(12, "TH2 Repeats steps to change breadcrumb and receive subscription report")
await self.steps_receive_breadcrumb(print_steps=False)
# step 13 and 14 - printed in the function
await self.steps_admin_subscription_error(print_steps=True)
self.print_step(15, "TH2 subscribes to breadcrumb attribute")
await self.steps_subscribe_breadcrumb(print_steps=False)
self.print_step(16, "TH2 Repeats steps to change breadcrumb and receive subscription report")
await self.steps_receive_breadcrumb(print_steps=False)
self.print_step(17, "TH1 writes ACL attribute")
acl1 = Clusters.AccessControl.Structs.AccessControlEntryStruct(
privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kAdminister,
authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase,
subjects=[TH1_nodeid],
targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)])
acl2 = Clusters.AccessControl.Structs.AccessControlEntryStruct(
privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kOperate,
authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase,
subjects=[TH2_nodeid],
targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)])
await self.write_acl([acl1, acl2])
self.print_step(18, "TH2 Repeats steps to change breadcrumb and receive subscription report")
await self.steps_receive_breadcrumb(print_steps=False)
self.print_step(19, "TH2 repeats subscriptions to Admin attribute and event to ensure it still errors")
await self.steps_admin_subscription_error(print_steps=False)
self.print_step(20, "TH2 subscribes to breadcrumb attribute")
await self.steps_subscribe_breadcrumb(print_steps=False)
self.print_step(21, "TH2 Repeats steps to change breadcrumb and receive subscription report")
await self.steps_receive_breadcrumb(print_steps=False)
self.print_step(22, "TH1 writes ACL attribute")
acl1 = Clusters.AccessControl.Structs.AccessControlEntryStruct(
privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kAdminister,
authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase,
subjects=[TH1_nodeid],
targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)])
acl2 = Clusters.AccessControl.Structs.AccessControlEntryStruct(
privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView,
authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase,
subjects=[TH2_nodeid],
targets=[Clusters.AccessControl.Structs.AccessControlTargetStruct(endpoint=0)])
await self.write_acl([acl1, acl2])
self.print_step(23, "TH2 Repeats steps to change breadcrumb and receive subscription report")
await self.steps_receive_breadcrumb(print_steps=False)
self.print_step(24, "TH2 repeats subscriptions to Admin attribute and event to ensure it still errors")
await self.steps_admin_subscription_error(print_steps=False)
self.print_step(25, "TH2 subscribes to breadcrumb attribute")
await self.steps_subscribe_breadcrumb(print_steps=False)
self.print_step(26, "TH2 Repeats steps to change breadcrumb and receive subscription report")
await self.steps_receive_breadcrumb(print_steps=False)
self.print_step(27, "TH1 writes ACL attribute")
acl = Clusters.AccessControl.Structs.AccessControlEntryStruct(
privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kAdminister,
authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase,
subjects=[TH1_nodeid],
targets=[])
await self.write_acl([acl])
self.print_step(28, "TH2 repeats subscriptions to Admin attribute and event to ensure it still errors")
await self.steps_admin_subscription_error(print_steps=False)
self.print_step(29, "TH2 attempts to subscribe to the breadcrumb attribute - expect error")
try:
await self.TH2.ReadAttribute(nodeid=self.dut_node_id, attributes=[(0, Clusters.GeneralCommissioning.Attributes.Breadcrumb)], reportInterval=(1, 5), keepSubscriptions=False, autoResubscribe=False)
asserts.fail("Incorrectly subscribed to attribute with invalid permissions")
except ChipStackError as e:
asserts.assert_equal(e.err, 0x580, "Incorrect error message received from subscription with no permission")
if __name__ == "__main__":
default_matter_test_main()