blob: a97417d4f64204940a8471e953ccbf91d305f4ec [file] [log] [blame]
#
# Copyright (c) 2023 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
import queue
from typing import List
import chip.clusters as Clusters
from chip.clusters import ClusterObjects as ClusterObjects
from chip.clusters.Attribute import SubscriptionTransaction, TypedAttributePath
from chip.interaction_model import Status
from matter_testing_support import MatterBaseTest, async_test_body, default_matter_test_main
from mobly import asserts
class AttributeChangeCallback:
def __init__(self, expected_attribute: ClusterObjects.ClusterAttributeDescriptor, output: queue.Queue):
self._output = output
self._expected_attribute = expected_attribute
def __call__(self, path: TypedAttributePath, transaction: SubscriptionTransaction):
if path.AttributeType == self._expected_attribute:
q = (path, transaction)
logging.info(f'Got subscription report for {path.AttributeType}')
self._output.put(q)
def WaitForAttributeReport(q: queue.Queue, expected_attribute: ClusterObjects.ClusterAttributeDescriptor):
try:
path, transaction = q.get(block=True, timeout=10)
except queue.Empty:
asserts.fail("Failed to receive a report for the attribute change for {}".format(expected_attribute))
asserts.assert_equal(path.AttributeType, expected_attribute, "Received incorrect attribute report")
try:
transaction.GetAttribute(path)
except KeyError:
asserts.fail("Attribute not found in returned report")
class TestGroupTableReports(MatterBaseTest):
@async_test_body
async def test_group_table_reports(self):
self.print_step(1, "Commissioning, already done")
self.TH1 = self.default_controller
self.kGroupKeyset1 = 0x01a1
self.print_step(2, "TH Adds Keyset 1 to the Group Key Management Cluster")
self.groupKey = Clusters.GroupKeyManagement.Structs.GroupKeySetStruct(
groupKeySetID=self.kGroupKeyset1,
groupKeySecurityPolicy=Clusters.GroupKeyManagement.Enums.GroupKeySecurityPolicyEnum.kTrustFirst,
epochKey0="0123456789abcdef".encode(),
epochStartTime0=1110000,
epochKey1="0123456789abcdef".encode(),
epochStartTime1=1110001,
epochKey2="0123456789abcdef".encode(),
epochStartTime2=1110002)
await self.TH1.SendCommand(self.dut_node_id, 0, Clusters.GroupKeyManagement.Commands.KeySetWrite(self.groupKey))
self.kGroup1 = 0x0101
self.kGroup2 = 0x0102
self.kGroup3 = 0x0103
self.print_step(3, "TH maps Keyset 1 to Group 1, 2 and 3")
mapping_structs: List[Clusters.GroupKeyManagement.Structs.GroupKeyMapStruct] = []
mapping_structs.append(Clusters.GroupKeyManagement.Structs.GroupKeyMapStruct(
groupId=self.kGroup1,
groupKeySetID=self.kGroupKeyset1,
fabricIndex=1))
mapping_structs.append(Clusters.GroupKeyManagement.Structs.GroupKeyMapStruct(
groupId=self.kGroup2,
groupKeySetID=self.kGroupKeyset1,
fabricIndex=1))
mapping_structs.append(Clusters.GroupKeyManagement.Structs.GroupKeyMapStruct(
groupId=self.kGroup3,
groupKeySetID=self.kGroupKeyset1,
fabricIndex=1))
result = await self.TH1.WriteAttribute(self.dut_node_id, [(0, Clusters.GroupKeyManagement.Attributes.GroupKeyMap(mapping_structs))])
asserts.assert_equal(result[0].Status, Status.Success, "GroupKeyMap write failed")
self.print_step(4, "TH subscribes to the GroupTable attribute from the Group Key Management Cluster")
subscription_gcm = await self.TH1.ReadAttribute(nodeid=self.dut_node_id, attributes=[(0, Clusters.GroupKeyManagement.Attributes.GroupTable)], reportInterval=(1, 5), fabricFiltered=False, keepSubscriptions=True, autoResubscribe=False)
gcm_queue = queue.Queue()
gcm_cb = AttributeChangeCallback(Clusters.GroupKeyManagement.Attributes.GroupTable, gcm_queue)
subscription_gcm.SetAttributeUpdateCallback(gcm_cb)
self.print_step(5, "TH Adds Group1 to the Group Cluster")
result = await self.TH1.SendCommand(self.dut_node_id, 1, Clusters.Groups.Commands.AddGroup(self.kGroup1, "Group1"))
self.print_step(6, "TH waits for subscription report for the GroupTable attribute from the Group Key Management Cluster")
WaitForAttributeReport(gcm_queue, Clusters.GroupKeyManagement.Attributes.GroupTable)
self.print_step(7, "TH Adds Group2 to the Group Cluster")
result = await self.TH1.SendCommand(self.dut_node_id, 1, Clusters.Groups.Commands.AddGroup(self.kGroup2, "Group2"))
self.print_step(8, "TH waits for subscription report for the GroupTable attribute from the Group Key Management Cluster")
WaitForAttributeReport(gcm_queue, Clusters.GroupKeyManagement.Attributes.GroupTable)
self.print_step(9, "TH Adds Group3 to the Group Cluster")
result = await self.TH1.SendCommand(self.dut_node_id, 1, Clusters.Groups.Commands.AddGroup(self.kGroup3, "Group3"))
self.print_step(10, "TH waits for subscription report for the GroupTable attribute from the Group Key Management Cluster")
WaitForAttributeReport(gcm_queue, Clusters.GroupKeyManagement.Attributes.GroupTable)
self.print_step(12, "TH removes Group2 from the Group Cluster")
result = await self.TH1.SendCommand(self.dut_node_id, 1, Clusters.Groups.Commands.RemoveGroup(self.kGroup2))
self.print_step(13, "TH waits for subscription report for the GroupTable attribute from the Group Key Management Cluster")
WaitForAttributeReport(gcm_queue, Clusters.GroupKeyManagement.Attributes.GroupTable)
self.print_step(14, "TH removes All Groups from the Group Cluster")
await self.TH1.SendCommand(self.dut_node_id, 1, Clusters.Groups.Commands.RemoveAllGroups())
self.print_step(15, "TH waits for subscription report for the GroupTable attribute from the Group Key Management Cluster")
WaitForAttributeReport(gcm_queue, Clusters.GroupKeyManagement.Attributes.GroupTable)
if __name__ == "__main__":
default_matter_test_main()