blob: e713f4f0e71a6159c4e7ec6332625a9917e4cf82 [file] [log] [blame]
#
# Copyright (c) 2023 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import time
import typing
from datetime import datetime, timedelta, timezone
import chip.clusters as Clusters
from chip.clusters.Types import Nullable, NullValue
from chip.tlv import uint
from matter_testing_support import (MatterBaseTest, async_test_body, compare_time, default_matter_test_main,
get_wait_seconds_from_set_time, parse_pics, type_matches, utc_time_in_matter_epoch)
from mobly import asserts, signals
from taglist_and_topology_test_support import (TagProblem, create_device_type_list_for_root, create_device_type_lists,
find_tag_list_problems, find_tree_roots, get_all_children,
get_direct_children_of_root, parts_list_cycles, separate_endpoint_types)
def get_raw_type_list():
test = Clusters.UnitTesting
struct = test.Structs.SimpleStruct()
struct_type = test.Structs.SimpleStruct
null_opt_struct = test.Structs.NullablesAndOptionalsStruct()
null_opt_struct_type = test.Structs.NullablesAndOptionalsStruct
double_nested_struct_list = test.Structs.DoubleNestedStructList()
double_nested_struct_list_type = test.Structs.DoubleNestedStructList
list_of_uints = [0, 1]
list_of_uints_type = typing.List[uint]
list_of_structs = [struct, struct]
list_of_structs_type = typing.List[struct_type]
list_of_double_nested_struct_list = [double_nested_struct_list, double_nested_struct_list]
list_of_double_nested_struct_list_type = typing.List[double_nested_struct_list_type]
# Create a list with all the types and a list of the values that should match for that type
vals = {uint: [1],
str: ["str"],
struct_type: [struct],
null_opt_struct_type: [null_opt_struct],
double_nested_struct_list_type: [double_nested_struct_list],
list_of_uints_type: [list_of_uints],
list_of_structs_type: [list_of_structs],
list_of_double_nested_struct_list_type: [list_of_double_nested_struct_list]}
return vals
def test_type_matching_for_type(test_type, test_nullable: bool = False, test_optional: bool = False):
vals = get_raw_type_list()
if test_nullable and test_optional:
match_type = typing.Union[Nullable, None, test_type]
elif test_nullable:
match_type = typing.Union[Nullable, test_type]
elif test_optional:
match_type = typing.Optional[test_type]
else:
match_type = test_type
true_list = vals[test_type]
if test_nullable:
true_list.append(NullValue)
if test_optional:
true_list.append(None)
del vals[test_type]
# true_list is all the values that should match with the test type
for i in true_list:
asserts.assert_true(type_matches(i, match_type), "{} type checking failure".format(test_type))
# try every value in every type in the remaining dict - they should all fail
for v in vals.values():
for i in v:
asserts.assert_false(type_matches(i, match_type), "{} falsely matched to type {}".format(i, match_type))
# Test the nullables or optionals that aren't supposed to work
if not test_nullable:
asserts.assert_false(type_matches(NullValue, match_type), "NullValue falsely matched to {}".format(match_type))
if not test_optional:
asserts.assert_false(type_matches(None, match_type), "None falsely matched to {}".format(match_type))
def run_all_match_tests_for_type(test_type):
test_type_matching_for_type(test_type=test_type)
test_type_matching_for_type(test_type=test_type, test_nullable=True)
test_type_matching_for_type(test_type=test_type, test_optional=True)
test_type_matching_for_type(test_type=test_type, test_nullable=True, test_optional=True)
class TestMatterTestingSupport(MatterBaseTest):
@async_test_body
async def test_matter_epoch_time(self):
# Matter epoch should return zero
ret = utc_time_in_matter_epoch(datetime(2000, 1, 1, 0, 0, 0, 0, timezone.utc))
asserts.assert_equal(ret, 0, "UTC epoch returned non-zero value")
# Jan 2 is exactly 1 day after Jan 1
ret = utc_time_in_matter_epoch(datetime(2000, 1, 2, 0, 0, 0, 0, timezone.utc))
expected_delay = timedelta(days=1)
actual_delay = timedelta(microseconds=ret)
asserts.assert_equal(expected_delay, actual_delay, "Calculation for Jan 2 date is incorrect")
# There's a catch 22 for knowing the current time, but we can check that it's
# going up, and that it's larger than when I wrote the test
# Check that the returned value is larger than the test writing date
writing_date = utc_time_in_matter_epoch(datetime(2023, 5, 5, 0, 0, 0, 0, timezone.utc))
current_date = utc_time_in_matter_epoch()
asserts.assert_greater(current_date, writing_date, "Calculation for current date is smaller than writing date")
# Check that the time is going up
last_date = current_date
current_date = utc_time_in_matter_epoch()
asserts.assert_greater(current_date, last_date, "Time does not appear to be incrementing")
@async_test_body
async def test_type_checking(self):
vals = get_raw_type_list()
for k in vals.keys():
run_all_match_tests_for_type(k)
@async_test_body
async def test_pics_support(self):
pics_list = ['TEST.S.A0000=1',
'TEST.S.A0001=0',
'lower.s.a0000=1',
'',
' ',
'# comment',
' # comment',
' SPACE.S.A0000 = 1']
pics = parse_pics(pics_list)
# force the parsed pics here to be in the config so we can check the check_pics function
self.matter_test_config.pics = pics
asserts.assert_true(self.check_pics("TEST.S.A0000"), "PICS parsed incorrectly for TEST.S.A0000")
asserts.assert_false(self.check_pics("TEST.S.A0001"), "PICS parsed incorrectly for TEST.S.A0001")
asserts.assert_true(self.check_pics("LOWER.S.A0000"), "PICS pased incorrectly for LOWER.S.A0000")
asserts.assert_true(self.check_pics("SPACE.S.A0000"), "PICS parsed incorrectly for SPACE.S.A0000")
asserts.assert_false(self.check_pics("NOT.S.A0000"), "PICS parsed incorrectly for NOT.S.A0000")
asserts.assert_true(self.check_pics(" test.s.a0000"), "PICS checker lowercase handled incorrectly")
# invalid pics file should throw a value error
pics_list.append("BAD.S.A000=5")
try:
pics = parse_pics(pics_list)
asserts.assert_false(True, "PICS parser did not throw an error as expected")
except ValueError:
pass
def test_time_compare_function(self):
# only offset, exact match
compare_time(received=1000, offset=timedelta(microseconds=1000), utc=0, tolerance=timedelta())
# only utc, exact match
compare_time(received=1000, offset=timedelta(), utc=1000, tolerance=timedelta())
# both, exact match
compare_time(received=2000, offset=timedelta(microseconds=1000), utc=1000, tolerance=timedelta())
# both, negative offset
compare_time(received=0, offset=timedelta(microseconds=-1000), utc=1000, tolerance=timedelta())
# Exact match, within delta, both
compare_time(received=2000, offset=timedelta(microseconds=1000), utc=1000, tolerance=timedelta(seconds=5))
# Just inside tolerance
compare_time(received=1001, offset=timedelta(), utc=2000, tolerance=timedelta(microseconds=1000))
# Just outside tolerance
try:
compare_time(received=999, offset=timedelta(), utc=2000, tolerance=timedelta(microseconds=1000))
asserts.fail("Expected failure case for time just outside of the tolerance failed")
except signals.TestFailure:
pass
# everything in the seconds range
compare_time(received=timedelta(seconds=3600).total_seconds() * 1000000,
offset=timedelta(seconds=3605), utc=0, tolerance=timedelta(seconds=5))
def test_get_wait_time_function(self):
th_utc = utc_time_in_matter_epoch()
secs = get_wait_seconds_from_set_time(th_utc, 5)
asserts.assert_equal(secs, 5)
# If we've pass less than a second, we still want to wait 5
time.sleep(0.5)
secs = get_wait_seconds_from_set_time(th_utc, 5)
asserts.assert_equal(secs, 5)
time.sleep(0.5)
secs = get_wait_seconds_from_set_time(th_utc, 5)
asserts.assert_equal(secs, 4)
secs = get_wait_seconds_from_set_time(th_utc, 15)
asserts.assert_equal(secs, 14)
def create_example_topology(self):
"""Creates a limited example of a wildcard read that contains only the descriptor cluster parts list and device types"""
def create_endpoint(parts_list: list[uint], device_types: list[uint]):
endpoint = {}
device_types_structs = []
for device_type in device_types:
device_types_structs.append(Clusters.Descriptor.Structs.DeviceTypeStruct(deviceType=device_type, revision=1))
endpoint[Clusters.Descriptor] = {Clusters.Descriptor.Attributes.PartsList: parts_list,
Clusters.Descriptor.Attributes.DeviceTypeList: device_types_structs,
Clusters.Descriptor.Attributes.FeatureMap: 0}
return endpoint
endpoints = {}
# Root node is 0
# We have two trees in the root node and two trees in the aggregator
# 2 - 1
# - 3 - 4
# - 5 - 9
# 6 - 7
# - 8
# 10
# 11 (aggregator - all remaining are under it)
# 13 - 12
# - 14 - 15
# - 16
# 17 - 18
# - 19
# 20
# 21
endpoints[0] = create_endpoint([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21], [22])
endpoints[1] = create_endpoint([], [1]) # Just using a random device id, as long as it's not the aggregator it's fine
endpoints[2] = create_endpoint([1, 3], [1])
endpoints[3] = create_endpoint([4, 5], [1])
endpoints[4] = create_endpoint([], [1])
endpoints[5] = create_endpoint([9], [1])
endpoints[6] = create_endpoint([7, 8], [1])
endpoints[7] = create_endpoint([], [1])
endpoints[8] = create_endpoint([], [1])
endpoints[9] = create_endpoint([], [1])
endpoints[10] = create_endpoint([], [1])
endpoints[11] = create_endpoint([12, 13, 14, 15, 16, 17, 18, 19, 20, 21], [0xe]) # aggregator device type
endpoints[12] = create_endpoint([], [1])
endpoints[13] = create_endpoint([12, 14], [1])
endpoints[14] = create_endpoint([15, 16], [1])
endpoints[15] = create_endpoint([], [1])
endpoints[16] = create_endpoint([], [1])
endpoints[17] = create_endpoint([18, 19], [1])
endpoints[18] = create_endpoint([], [1])
endpoints[19] = create_endpoint([], [1])
endpoints[20] = create_endpoint([], [1])
endpoints[21] = create_endpoint([], [1])
return endpoints
def test_cycle_detection_and_splitting(self):
# Example topology has no cycles
endpoints = self.create_example_topology()
flat, tree = separate_endpoint_types(endpoints)
asserts.assert_equal(len(flat), len(set(flat)), "Duplicate endpoints found in flat list")
asserts.assert_equal(len(tree), len(set(tree)), "Duplicate endpoints found in tree list")
asserts.assert_equal(set(flat), {11}, "Aggregator node not found in list")
asserts.assert_equal(set(tree), {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21})
cycles = parts_list_cycles(tree, endpoints)
asserts.assert_equal(len(cycles), 0, "Found cycles in the example tree")
# Add in several cycles and make sure we detect them all
# ep 10 refers back to itself (0 level cycle) on 10
endpoints[10][Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList].append(10)
cycles = parts_list_cycles(tree, endpoints)
asserts.assert_equal(cycles, [10])
endpoints[10][Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList].remove(10)
print(endpoints[10])
# ep 4 refers back to 3 (1 level cycle) on 3 (will include 2, 3 and 4 in the cycles list)
endpoints[4][Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList].append(3)
cycles = parts_list_cycles(tree, endpoints)
asserts.assert_equal(cycles, [2, 3, 4])
endpoints[4][Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList].remove(3)
# ep 16 refers back to 13 (2 level cycle) on 13 (will include 13, 14 and 16 in cycles)
endpoints[16][Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList].append(13)
cycles = parts_list_cycles(tree, endpoints)
asserts.assert_equal(cycles, [13, 14, 16])
endpoints[16][Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList].remove(13)
# ep 9 refers back to 2 (3 level cycle) on 2 (includes 2, 3, 5, and 9)
endpoints[9][Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList].append(2)
cycles = parts_list_cycles(tree, endpoints)
asserts.assert_equal(cycles, [2, 3, 5, 9])
endpoints[9][Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList].remove(2)
# make sure we get them all
endpoints[10][Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList].append(10)
endpoints[4][Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList].append(3)
endpoints[16][Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList].append(13)
endpoints[9][Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList].append(2)
cycles = parts_list_cycles(tree, endpoints)
asserts.assert_equal(cycles, [2, 3, 4, 5, 9, 10, 13, 14, 16])
def test_get_all_children(self):
endpoints = self.create_example_topology()
asserts.assert_equal(get_all_children(2, endpoints), {1, 3, 4, 5, 9}, "Child list for ep2 is incorrect")
asserts.assert_equal(get_all_children(6, endpoints), {7, 8}, "Child list for ep6 is incorrect")
asserts.assert_equal(get_all_children(13, endpoints), {12, 14, 15, 16}, "Child list for ep13 is incorrect")
asserts.assert_equal(get_all_children(17, endpoints), {18, 19}, "Child list for ep17 is incorrect")
def test_get_tree_roots(self):
endpoints = self.create_example_topology()
_, tree = separate_endpoint_types(endpoints)
asserts.assert_equal(find_tree_roots(tree, endpoints), {2, 6, 13, 17}, "Incorrect tree root list")
def test_tag_list_problems(self):
# Right now, the whole endpoint list uses the same device id except for ep11, which is an aggregator
# The relevant trees are
# 2 - 1
# - 3 - 4
# - 5 - 9
#
# 6 - 7
# - 8
#
# 13 - 12
# - 14 - 15
# - 16
#
# 17 - 18
# - 19
endpoints = self.create_example_topology()
# First test, everything in every tree has the same device type, so the device lists
# should contain all the device endpoints
_, tree = separate_endpoint_types(endpoints)
roots = find_tree_roots(tree, endpoints)
device_types = create_device_type_lists(roots, endpoints)
asserts.assert_equal(set(roots), set(device_types.keys()), "Device types list does not match roots list")
for root in roots:
asserts.assert_equal({1}, set(device_types[root].keys()), "Unexpected device type found in device type list")
asserts.assert_equal(device_types[2][1], {2, 1, 3, 4, 5, 9}, "device type list for ep 2 is incorrect")
asserts.assert_equal(device_types[6][1], {6, 7, 8}, "device type list for ep 6 is incorrect")
asserts.assert_equal(device_types[13][1], {13, 12, 14, 15, 16}, "device type list for ep 13 is incorrect")
asserts.assert_equal(device_types[17][1], {17, 18, 19}, "device type list for ep 17 is incorrect")
# every single one of these should have the same problem - they have no tags
problems = find_tag_list_problems(roots, device_types, endpoints)
expected_problems = {2, 1, 3, 4, 5, 9, 6, 7, 8, 13, 12, 14, 15, 16, 17, 18, 19}
asserts.assert_equal(set(problems.keys()), expected_problems, "Incorrect set of tag problems")
for root in roots:
eps = get_all_children(root, endpoints)
eps.add(root)
for ep in eps:
expected_problem = TagProblem(root=root, missing_attribute=True,
missing_feature=True, duplicates=set(eps), same_tag=set())
asserts.assert_equal(problems[ep], expected_problem, f"Incorrect problem for ep {ep}")
# Add the feature for every endpoint, but not the attribute
for ep in expected_problems:
endpoints[ep][Clusters.Descriptor][Clusters.Descriptor.Attributes.FeatureMap] = 1
problems = find_tag_list_problems(roots, device_types, endpoints)
for root in roots:
eps = get_all_children(root, endpoints)
eps.add(root)
for ep in eps:
expected_problem = TagProblem(root=root, missing_attribute=True,
missing_feature=False, duplicates=set(eps), same_tag=set())
asserts.assert_equal(problems[ep], expected_problem, f"Incorrect problem for ep {ep}")
# Add empty tag lists
for ep in expected_problems:
endpoints[ep][Clusters.Descriptor][Clusters.Descriptor.Attributes.TagList] = []
problems = find_tag_list_problems(roots, device_types, endpoints)
for root in roots:
eps = get_all_children(root, endpoints)
eps.add(root)
for ep in eps:
expected_problem = TagProblem(root=root, missing_attribute=True,
missing_feature=False, duplicates=set(eps), same_tag=set())
asserts.assert_equal(problems[ep], expected_problem, f"Incorrect problem for ep {ep}")
# Add a tag list on every one of these, but make it the same tag
tag = Clusters.Descriptor.Structs.SemanticTagStruct()
for ep in expected_problems:
endpoints[ep][Clusters.Descriptor][Clusters.Descriptor.Attributes.TagList] = [tag]
problems = find_tag_list_problems(roots, device_types, endpoints)
for root in roots:
eps = get_all_children(root, endpoints)
eps.add(root)
for ep in eps:
expected_problem = TagProblem(root=root, missing_attribute=False,
missing_feature=False, duplicates=set(eps), same_tag=set(eps))
asserts.assert_equal(problems[ep], expected_problem, f"Incorrect problem for ep {ep}")
# swap out all the tags lists so they're all different - we should get no problems
for ep in expected_problems:
tag = Clusters.Descriptor.Structs.SemanticTagStruct(tag=ep)
endpoints[ep][Clusters.Descriptor][Clusters.Descriptor.Attributes.TagList] = [tag]
problems = find_tag_list_problems(roots, device_types, endpoints)
asserts.assert_equal(len(problems), 0, "Unexpected problems found in list")
# Remove all the feature maps, we should get all errors again
for ep in expected_problems:
endpoints[ep][Clusters.Descriptor][Clusters.Descriptor.Attributes.FeatureMap] = 0
problems = find_tag_list_problems(roots, device_types, endpoints)
for root in roots:
eps = get_all_children(root, endpoints)
eps.add(root)
for ep in eps:
expected_problem = TagProblem(root=root, missing_attribute=False,
missing_feature=True, duplicates=set(eps))
asserts.assert_equal(problems[ep], expected_problem, f"Incorrect problem for ep {ep}")
# Create a simple two-tree system where everything is OK, but the tags are the same between the trees (should be ok)
# 1 (dt 1) - 2 (dt 2) - tag 2
# - 3 (dt 2) - tag 3
# 4 (dt 1) - 5 (dt 2) - tag 2
# - 6 (dt 2) - tag 3
desc_dt2_tag2 = {Clusters.Descriptor.Attributes.FeatureMap: 1,
Clusters.Descriptor.Attributes.PartsList: [],
Clusters.Descriptor.Attributes.DeviceTypeList: [Clusters.Descriptor.Structs.DeviceTypeStruct(2, 1)],
Clusters.Descriptor.Attributes.TagList: [Clusters.Descriptor.Structs.SemanticTagStruct(tag=2)]
}
desc_dt2_tag3 = {Clusters.Descriptor.Attributes.FeatureMap: 1,
Clusters.Descriptor.Attributes.PartsList: [],
Clusters.Descriptor.Attributes.DeviceTypeList: [Clusters.Descriptor.Structs.DeviceTypeStruct(2, 1)],
Clusters.Descriptor.Attributes.TagList: [Clusters.Descriptor.Structs.SemanticTagStruct(tag=3)]
}
desc_ep1 = {Clusters.Descriptor.Attributes.FeatureMap: 0,
Clusters.Descriptor.Attributes.PartsList: [2, 3],
Clusters.Descriptor.Attributes.DeviceTypeList: [Clusters.Descriptor.Structs.DeviceTypeStruct(1, 1)],
}
desc_ep4 = {Clusters.Descriptor.Attributes.FeatureMap: 0,
Clusters.Descriptor.Attributes.PartsList: [5, 6],
Clusters.Descriptor.Attributes.DeviceTypeList: [Clusters.Descriptor.Structs.DeviceTypeStruct(1, 1)],
}
new_endpoints = {}
new_endpoints[1] = {Clusters.Descriptor: desc_ep1}
new_endpoints[2] = {Clusters.Descriptor: desc_dt2_tag2}
new_endpoints[3] = {Clusters.Descriptor: desc_dt2_tag3}
new_endpoints[4] = {Clusters.Descriptor: desc_ep4}
new_endpoints[5] = {Clusters.Descriptor: desc_dt2_tag2}
new_endpoints[6] = {Clusters.Descriptor: desc_dt2_tag3}
_, tree = separate_endpoint_types(new_endpoints)
roots = find_tree_roots(tree, new_endpoints)
device_types = create_device_type_lists(roots, new_endpoints)
problems = find_tag_list_problems(roots, device_types, new_endpoints)
asserts.assert_equal(len(problems), 0, "Unexpected problems found in list")
# Create a simple tree where ONE of the tags in the set matches, but not the other - should be no problems
# 1 (dt 1) - 2 (dt 2) - tag 2,3
# - 3 (dt 2) - tag 2,4
desc_dt2_tag23 = {Clusters.Descriptor.Attributes.FeatureMap: 1,
Clusters.Descriptor.Attributes.PartsList: [],
Clusters.Descriptor.Attributes.DeviceTypeList: [Clusters.Descriptor.Structs.DeviceTypeStruct(2, 1)],
Clusters.Descriptor.Attributes.TagList: [Clusters.Descriptor.Structs.SemanticTagStruct(
tag=2), Clusters.Descriptor.Structs.SemanticTagStruct(tag=3)]
}
desc_dt2_tag24 = {Clusters.Descriptor.Attributes.FeatureMap: 1,
Clusters.Descriptor.Attributes.PartsList: [],
Clusters.Descriptor.Attributes.DeviceTypeList: [Clusters.Descriptor.Structs.DeviceTypeStruct(2, 1)],
Clusters.Descriptor.Attributes.TagList: [Clusters.Descriptor.Structs.SemanticTagStruct(
tag=2), Clusters.Descriptor.Structs.SemanticTagStruct(tag=4)]
}
simple = {}
simple[1] = {Clusters.Descriptor: desc_ep1}
simple[2] = {Clusters.Descriptor: desc_dt2_tag23}
simple[3] = {Clusters.Descriptor: desc_dt2_tag24}
_, tree = separate_endpoint_types(simple)
roots = find_tree_roots(tree, simple)
device_types = create_device_type_lists(roots, simple)
problems = find_tag_list_problems(roots, device_types, simple)
asserts.assert_equal(len(problems), 0, "Unexpected problems found in list")
# now both match, but the ordering is different - this SHOULD be a problem
desc_dt2_tag32 = {Clusters.Descriptor.Attributes.FeatureMap: 1,
Clusters.Descriptor.Attributes.PartsList: [],
Clusters.Descriptor.Attributes.DeviceTypeList: [Clusters.Descriptor.Structs.DeviceTypeStruct(2, 1)],
Clusters.Descriptor.Attributes.TagList: [Clusters.Descriptor.Structs.SemanticTagStruct(
tag=3), Clusters.Descriptor.Structs.SemanticTagStruct(tag=2)]
}
simple[3] = {Clusters.Descriptor: desc_dt2_tag32}
problems = find_tag_list_problems(roots, device_types, simple)
# expect this problem reported on both 2 and 3 endpoints
expected_problem = TagProblem(root=1, missing_attribute=False, missing_feature=False, duplicates={2, 3}, same_tag={2, 3})
asserts.assert_true(2 in problems.keys(), "Missing problem report for ep2")
asserts.assert_true(3 in problems.keys(), "Missing problem report for ep3")
asserts.assert_equal(problems[2], expected_problem, "Problem report for simple EP2 is not as expected")
asserts.assert_equal(problems[3], expected_problem, "Problem report for simple EP3 is not as expected")
# Let's check that we're correctly checking all the pieces of the tag
# Different mfgcode
simple[2][Clusters.Descriptor][Clusters.Descriptor.Attributes.TagList] = [
Clusters.Descriptor.Structs.SemanticTagStruct(mfgCode=1)]
simple[3][Clusters.Descriptor][Clusters.Descriptor.Attributes.TagList] = [Clusters.Descriptor.Structs.SemanticTagStruct()]
problems = find_tag_list_problems(roots, device_types, simple)
asserts.assert_equal(len(problems), 0, "Unexpected problems found in list")
simple[3][Clusters.Descriptor][Clusters.Descriptor.Attributes.TagList] = [
Clusters.Descriptor.Structs.SemanticTagStruct(mfgCode=2)]
problems = find_tag_list_problems(roots, device_types, simple)
asserts.assert_equal(len(problems), 0, "Unexpected problems found in list")
# Different namespace ids
simple[2][Clusters.Descriptor][Clusters.Descriptor.Attributes.TagList] = [
Clusters.Descriptor.Structs.SemanticTagStruct(namespaceID=1)]
simple[3][Clusters.Descriptor][Clusters.Descriptor.Attributes.TagList] = [Clusters.Descriptor.Structs.SemanticTagStruct()]
problems = find_tag_list_problems(roots, device_types, simple)
asserts.assert_equal(len(problems), 0, "Unexpected problems found in list")
# Different labels
simple[2][Clusters.Descriptor][Clusters.Descriptor.Attributes.TagList] = [
Clusters.Descriptor.Structs.SemanticTagStruct(label="test")]
simple[3][Clusters.Descriptor][Clusters.Descriptor.Attributes.TagList] = [Clusters.Descriptor.Structs.SemanticTagStruct()]
problems = find_tag_list_problems(roots, device_types, simple)
asserts.assert_equal(len(problems), 0, "Unexpected problems found in list")
simple[3][Clusters.Descriptor][Clusters.Descriptor.Attributes.TagList] = [
Clusters.Descriptor.Structs.SemanticTagStruct(label="test1")]
problems = find_tag_list_problems(roots, device_types, simple)
asserts.assert_equal(len(problems), 0, "Unexpected problems found in list")
# One tag list is a subset of the other - this should pass
tag1 = Clusters.Descriptor.Structs.SemanticTagStruct(tag=1)
tag2 = Clusters.Descriptor.Structs.SemanticTagStruct(tag=2)
tag3 = Clusters.Descriptor.Structs.SemanticTagStruct(tag=3)
simple[2][Clusters.Descriptor][Clusters.Descriptor.Attributes.TagList] = [tag1, tag2]
simple[3][Clusters.Descriptor][Clusters.Descriptor.Attributes.TagList] = [tag1, tag2, tag3]
problems = find_tag_list_problems(roots, device_types, simple)
asserts.assert_equal(len(problems), 0, "Unexpected problems found in list")
def test_root_node_tag_list_functions(self):
# Example topology - see comment above for the layout.
# There are 4 direct children of root 0
# node 2, node 6 and node 10 all have device ID 1
# node 11 is an aggregator
endpoints = self.create_example_topology()
expected = {2, 6, 10, 11}
direct = get_direct_children_of_root(endpoints)
asserts.assert_equal(expected, direct, 'Incorrect list of direct children returned from root')
# add a new child endpoint that's an aggregator on EP 20
aggregator_desc = {Clusters.Descriptor.Attributes.FeatureMap: 1,
Clusters.Descriptor.Attributes.PartsList: [],
Clusters.Descriptor.Attributes.DeviceTypeList: [Clusters.Descriptor.Structs.DeviceTypeStruct(0xe)],
}
endpoints[22] = {Clusters.Descriptor: aggregator_desc}
endpoints[0][Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList].append(22)
expected.add(22)
direct = get_direct_children_of_root(endpoints)
asserts.assert_equal(expected, direct, 'Incorrect list of direct children returned from root')
device_type_list = create_device_type_list_for_root(direct, endpoints)
asserts.assert_equal(len(device_type_list), 2, 'Incorrect number of device types returned in root device type list')
expected_device_types = {1, 0xe}
asserts.assert_equal(set(device_type_list.keys()), expected_device_types, 'Unexpected device type list returned')
expected_eps_dt1 = {2, 6, 10}
asserts.assert_equal(set(device_type_list[1]), expected_eps_dt1, 'Unexpected endpoint list for DT1')
expected_eps_dte = {11, 22}
asserts.assert_equal(set(device_type_list[0xe]), expected_eps_dte, 'Unexpected endpoint list for DT 0xe')
problems = find_tag_list_problems(roots=[0], device_types={0: device_type_list}, endpoint_dict=endpoints)
# NONE of the endpoints currently have tags, so they should ALL be reported as having problems
expected_problems = {2, 6, 10, 11, 22}
asserts.assert_equal(set(problems.keys()), expected_problems, "Unexpected problem list returned for root node")
# Let's add correct tags to everything and make sure we get no problems reported.
# the various problems are tested individually in the above test case, so the intent is to ensure this also
# works for the root
for ep in expected_problems:
endpoints[ep][Clusters.Descriptor][Clusters.Descriptor.Attributes.TagList] = [
Clusters.Descriptor.Structs.SemanticTagStruct(namespaceID=ep)]
endpoints[ep][Clusters.Descriptor][Clusters.Descriptor.Attributes.FeatureMap] = 1
problems = find_tag_list_problems(roots=[0], device_types={0: device_type_list}, endpoint_dict=endpoints)
asserts.assert_equal(len(problems.keys()), 0, 'Unexpected problems found in root endpoint')
if __name__ == "__main__":
default_matter_test_main()