blob: dd3639d7c0b476871e3f48ac7af39264e0b2b771 [file] [log] [blame]
#
# Copyright (c) 2022 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
import chip.clusters as Clusters
from chip.interaction_model import InteractionModelError, Status
from matter_testing_support import MatterBaseTest, async_test_body, default_matter_test_main
from mobly import asserts
def acl_subject(cat: int) -> int:
return 0xFFFFFFFD00000000 | cat
class TC_ACE_1_3(MatterBaseTest):
async def write_acl(self, acl):
# This returns an attribute status
result = await self.default_controller.WriteAttribute(self.dut_node_id, [(0, Clusters.AccessControl.Attributes.Acl(acl))])
asserts.assert_equal(result[0].Status, Status.Success, "ACL write failed")
print(result)
async def read_descriptor_expect_success(self, th):
cluster = Clusters.Objects.Descriptor
attribute = Clusters.Descriptor.Attributes.DeviceTypeList
await self.read_single_attribute_check_success(dev_ctrl=th, endpoint=0, cluster=cluster, attribute=attribute)
async def read_descriptor_expect_unsupported_access(self, th):
cluster = Clusters.Objects.Descriptor
attribute = Clusters.Descriptor.Attributes.DeviceTypeList
await self.read_single_attribute_expect_error(dev_ctrl=th, endpoint=0, cluster=cluster, attribute=attribute, error=Status.UnsupportedAccess)
@async_test_body
async def test_TC_ACE_1_3(self):
cat1_id = 0x11110000
cat2_id = 0x22220000
cat1v1 = cat1_id | 0x0001
cat1v2 = cat1_id | 0x0002
cat1v3 = cat1_id | 0x0003
cat2v1 = cat2_id | 0x0001
cat2v2 = cat2_id | 0x0002
cat2v3 = cat2_id | 0x0003
logging.info('cat1v1 0x%x', cat1v1)
self.print_step(1, "Commissioning, already done")
TH0 = self.default_controller
fabric_admin = self.certificate_authority_manager.activeCaList[0].adminList[0]
TH0_nodeid = self.matter_test_config.controller_node_id
TH1_nodeid = self.matter_test_config.controller_node_id + 1
TH2_nodeid = self.matter_test_config.controller_node_id + 2
TH3_nodeid = self.matter_test_config.controller_node_id + 3
TH1 = fabric_admin.NewController(nodeId=TH1_nodeid,
paaTrustStorePath=str(self.matter_test_config.paa_trust_store_path),
catTags=[cat1v3])
TH2 = fabric_admin.NewController(nodeId=TH2_nodeid,
paaTrustStorePath=str(self.matter_test_config.paa_trust_store_path),
catTags=[cat1v2, cat2v1])
TH3 = fabric_admin.NewController(nodeId=TH3_nodeid,
paaTrustStorePath=str(self.matter_test_config.paa_trust_store_path),
catTags=[cat1v1, cat2v2])
self.print_step(2, "TH0 writes ACL all view on PIXIT.ACE.TESTENDPOINT")
TH0_admin_acl = Clusters.AccessControl.Structs.AccessControlEntryStruct(privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kAdminister,
authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase,
subjects=[TH0_nodeid],
targets=[Clusters.AccessControl.Structs.Target(endpoint=0, cluster=0x001f)])
all_view = Clusters.AccessControl.Structs.AccessControlEntryStruct(privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView,
authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase,
subjects=[],
targets=[Clusters.AccessControl.Structs.Target(endpoint=0)])
acl = [TH0_admin_acl, all_view]
await self.write_acl(acl)
self.print_step(3, "TH1 reads EP0 descriptor - expect SUCCESS")
await self.read_descriptor_expect_success(TH1)
self.print_step(4, "TH2 reads EP0 descriptor - expect SUCCESS")
await self.read_descriptor_expect_success(TH2)
self.print_step(5, "TH3 reads EP0 descriptor - expect SUCCESS")
await self.read_descriptor_expect_success(TH3)
self.print_step(6, "TH0 writes ACL TH1 view on EP0")
th1_view = Clusters.AccessControl.Structs.AccessControlEntryStruct(privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView,
authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase,
subjects=[TH1_nodeid],
targets=[Clusters.AccessControl.Structs.Target(endpoint=0)])
acl = [TH0_admin_acl, th1_view]
await self.write_acl(acl)
self.print_step(7, "TH1 reads EP0 descriptor - expect SUCCESS")
await self.read_descriptor_expect_success(TH1)
self.print_step(8, "TH2 reads EP0 descriptor - expect UNSUPPORTED_ACCESS")
await self.read_descriptor_expect_unsupported_access(TH2)
self.print_step(9, "TH3 reads EP0 descriptor - expect UNSUPPORTED_ACCESS")
await self.read_descriptor_expect_unsupported_access(TH3)
self.print_step(10, "TH0 writes ACL TH2 view on EP0")
th2_view = Clusters.AccessControl.Structs.AccessControlEntryStruct(privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView,
authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase,
subjects=[TH2_nodeid],
targets=[Clusters.AccessControl.Structs.Target(endpoint=0)])
acl = [TH0_admin_acl, th2_view]
await self.write_acl(acl)
self.print_step(11, "TH1 reads EP0 descriptor - expect UNSUPPORTED_ACCESS")
await self.read_descriptor_expect_unsupported_access(TH1)
self.print_step(12, "TH2 reads EP0 descriptor - expect SUCCESS")
await self.read_descriptor_expect_success(TH2)
self.print_step(13, "TH3 reads EP0 descriptor - expect UNSUPPORTED_ACCESS")
await self.read_descriptor_expect_unsupported_access(TH3)
self.print_step(14, "TH0 writes ACL TH3 view on EP0")
th3_view = Clusters.AccessControl.Structs.AccessControlEntryStruct(privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView,
authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase,
subjects=[TH3_nodeid],
targets=[Clusters.AccessControl.Structs.Target(endpoint=0)])
acl = [TH0_admin_acl, th3_view]
await self.write_acl(acl)
self.print_step(15, "TH1 reads EP0 descriptor - expect UNSUPPORTED_ACCESS")
await self.read_descriptor_expect_unsupported_access(TH1)
self.print_step(16, "TH2 reads EP0 descriptor - expect UNSUPPORTED_ACCESS")
await self.read_descriptor_expect_unsupported_access(TH2)
self.print_step(17, "TH3 reads EP0 descriptor - expect SUCCESS")
await self.read_descriptor_expect_success(TH3)
self.print_step(18, "TH0 writes ACL TH1 TH2 view on EP0")
th12_view = Clusters.AccessControl.Structs.AccessControlEntryStruct(privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView,
authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase,
subjects=[TH1_nodeid, TH2_nodeid],
targets=[Clusters.AccessControl.Structs.Target(endpoint=0)])
acl = [TH0_admin_acl, th12_view]
await self.write_acl(acl)
self.print_step(19, "TH1 reads EP0 descriptor - expect SUCCESS")
await self.read_descriptor_expect_success(TH1)
self.print_step(20, "TH2 reads EP0 descriptor - expect SUCCESS")
await self.read_descriptor_expect_success(TH2)
self.print_step(21, "TH3 reads EP0 descriptor - expect UNSUPPORTED_ACCESS")
await self.read_descriptor_expect_unsupported_access(TH3)
self.print_step(22, "TH0 writes ACL TH1 TH3 view on EP0")
th13_view = Clusters.AccessControl.Structs.AccessControlEntryStruct(privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView,
authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase,
subjects=[TH1_nodeid, TH3_nodeid],
targets=[Clusters.AccessControl.Structs.Target(endpoint=0)])
acl = [TH0_admin_acl, th13_view]
await self.write_acl(acl)
self.print_step(23, "TH1 reads EP0 descriptor - expect SUCCESS")
await self.read_descriptor_expect_success(TH1)
self.print_step(24, "TH2 reads EP0 descriptor - expect UNSUPPORTED_ACCESS")
await self.read_descriptor_expect_unsupported_access(TH2)
self.print_step(25, "TH3 reads EP0 descriptor - expect SUCCESS")
await self.read_descriptor_expect_success(TH3)
self.print_step(26, "TH0 writes ACL TH2 TH3 view on EP0")
th23_view = Clusters.AccessControl.Structs.AccessControlEntryStruct(privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView,
authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase,
subjects=[TH2_nodeid, TH3_nodeid],
targets=[Clusters.AccessControl.Structs.Target(endpoint=0)])
acl = [TH0_admin_acl, th23_view]
await self.write_acl(acl)
self.print_step(27, "TH1 reads EP0 descriptor - expect UNSUPPORTED_ACCESS")
await self.read_descriptor_expect_unsupported_access(TH1)
self.print_step(28, "TH2 reads EP0 descriptor - expect SUCCESS")
await self.read_descriptor_expect_success(TH2)
self.print_step(29, "TH3 reads EP0 descriptor - expect SUCCESS")
await self.read_descriptor_expect_success(TH3)
self.print_step(30, "TH0 writes ACL TH1 TH2 TH3 view on EP0")
th123_view = Clusters.AccessControl.Structs.AccessControlEntryStruct(privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView,
authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase,
subjects=[TH1_nodeid, TH2_nodeid, TH3_nodeid],
targets=[Clusters.AccessControl.Structs.Target(endpoint=0)])
acl = [TH0_admin_acl, th123_view]
await self.write_acl(acl)
self.print_step(31, "TH1 reads EP0 descriptor - expect SUCCESS")
await self.read_descriptor_expect_success(TH1)
self.print_step(32, "TH2 reads EP0 descriptor - expect SUCCESS")
await self.read_descriptor_expect_success(TH2)
self.print_step(33, "TH3 reads EP0 descriptor - expect SUCCESS")
await self.read_descriptor_expect_success(TH3)
self.print_step(34, "TH0 writes ACL cat1v1 view on EP0")
cat1v1_view = Clusters.AccessControl.Structs.AccessControlEntryStruct(privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView,
authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase,
subjects=[acl_subject(cat1v1)],
targets=[Clusters.AccessControl.Structs.Target(endpoint=0)])
acl = [TH0_admin_acl, cat1v1_view]
await self.write_acl(acl)
self.print_step(35, "TH1 reads EP0 descriptor - expect SUCCESS")
await self.read_descriptor_expect_success(TH1)
self.print_step(36, "TH2 reads EP0 descriptor - expect SUCCESS")
await self.read_descriptor_expect_success(TH2)
self.print_step(37, "TH3 reads EP0 descriptor - expect SUCCESS")
await self.read_descriptor_expect_success(TH3)
self.print_step(38, "TH0 writes ACL cat1v2 view on EP0")
cat1v2_view = Clusters.AccessControl.Structs.AccessControlEntryStruct(privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView,
authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase,
subjects=[acl_subject(cat1v2)],
targets=[Clusters.AccessControl.Structs.Target(endpoint=0)])
acl = [TH0_admin_acl, cat1v2_view]
await self.write_acl(acl)
self.print_step(39, "TH1 reads EP0 descriptor - expect SUCCESS")
await self.read_descriptor_expect_success(TH1)
self.print_step(40, "TH2 reads EP0 descriptor - expect SUCCESS")
await self.read_descriptor_expect_success(TH2)
self.print_step(41, "TH3 reads EP0 descriptor - expect UNSUPPORTED_ACCESS")
await self.read_descriptor_expect_unsupported_access(TH3)
self.print_step(42, "TH0 writes ACL cat1v3 view on EP0")
cat1v3_view = Clusters.AccessControl.Structs.AccessControlEntryStruct(privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView,
authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase,
subjects=[acl_subject(cat1v3)],
targets=[Clusters.AccessControl.Structs.Target(endpoint=0)])
acl = [TH0_admin_acl, cat1v3_view]
await self.write_acl(acl)
self.print_step(43, "TH1 reads EP0 descriptor - expect SUCCESS")
await self.read_descriptor_expect_success(TH1)
self.print_step(44, "TH2 reads EP0 descriptor - expect UNSUPPORTED_ACCESS")
await self.read_descriptor_expect_unsupported_access(TH2)
self.print_step(45, "TH3 reads EP0 descriptor - expect UNSUPPORTED_ACCESS")
await self.read_descriptor_expect_unsupported_access(TH3)
self.print_step(46, "TH0 writes ACL cat2v1 view on EP0")
cat2v1_view = Clusters.AccessControl.Structs.AccessControlEntryStruct(privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView,
authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase,
subjects=[acl_subject(cat2v1)],
targets=[Clusters.AccessControl.Structs.Target(endpoint=0)])
acl = [TH0_admin_acl, cat2v1_view]
await self.write_acl(acl)
self.print_step(47, "TH1 reads EP0 descriptor - expect UNSUPPORTED_ACCESS")
await self.read_descriptor_expect_unsupported_access(TH1)
self.print_step(48, "TH2 reads EP0 descriptor - expect SUCCESS")
await self.read_descriptor_expect_success(TH2)
self.print_step(49, "TH3 reads EP0 descriptor - expect SUCCESS")
await self.read_descriptor_expect_success(TH3)
self.print_step(50, "TH0 writes ACL cat2v2 view on EP0")
cat2v2_view = Clusters.AccessControl.Structs.AccessControlEntryStruct(privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView,
authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase,
subjects=[acl_subject(cat2v2)],
targets=[Clusters.AccessControl.Structs.Target(endpoint=0)])
acl = [TH0_admin_acl, cat2v2_view]
await self.write_acl(acl)
self.print_step(51, "TH1 reads EP0 descriptor - expect UNSUPPORTED_ACCESS")
await self.read_descriptor_expect_unsupported_access(TH1)
self.print_step(52, "TH2 reads EP0 descriptor - expect UNSUPPORTED_ACCESS")
await self.read_descriptor_expect_unsupported_access(TH2)
self.print_step(53, "TH3 reads EP0 descriptor - expect SUCCESS")
await self.read_descriptor_expect_success(TH3)
self.print_step(54, "TH0 writes ACL cat2v3 view on EP0")
cat2v3_view = Clusters.AccessControl.Structs.AccessControlEntryStruct(privilege=Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kView,
authMode=Clusters.AccessControl.Enums.AccessControlEntryAuthModeEnum.kCase,
subjects=[acl_subject(cat2v3)],
targets=[Clusters.AccessControl.Structs.Target(endpoint=0)])
acl = [TH0_admin_acl, cat2v3_view]
await self.write_acl(acl)
self.print_step(55, "TH1 reads EP0 descriptor - expect UNSUPPORTED_ACCESS")
await self.read_descriptor_expect_unsupported_access(TH1)
self.print_step(56, "TH2 reads EP0 descriptor - expect UNSUPPORTED_ACCESS")
await self.read_descriptor_expect_unsupported_access(TH2)
self.print_step(57, "TH3 reads EP0 descriptor - expect UNSUPPORTED_ACCESS")
await self.read_descriptor_expect_unsupported_access(TH3)
self.print_step(58, "TH0 writes ACL back to default")
acl = [TH0_admin_acl]
await self.write_acl(acl)
if __name__ == "__main__":
default_matter_test_main()