blob: 16a2e6dbb0cd5bcbb6d3da0e5d9b9431cb45423d [file] [log] [blame]
#
# Copyright (c) 2023 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import pprint
import sys
import chip.clusters as Clusters
import graphviz
from rich.console import Console
# Add the path to python_testing folder, in order to be able to import from matter_testing_support
sys.path.append(os.path.abspath(sys.path[0] + "/../../python_testing"))
from matter_testing_support import MatterBaseTest, async_test_body, default_matter_test_main # noqa: E402
console = None
maxClusterNameLength = 30
# Given there is currently no tranlation from DeviceTypeID to the device type name,
# this dict is created for now. When some more general is available, it should be updated to use this.
deviceTypeDict = {
22: "Root Node",
17: "Power Source",
18: "OTA Requestor",
20: "OTA Provider",
14: "Aggregator",
19: "Bridged Node",
256: "On/Off Light",
257: "Dimmable Light",
268: "Color Temperature Light",
269: "Extended Color Light",
266: "On/Off Plug-in Unit",
267: "Dimmable Plug-In Unit",
771: "Pump",
259: "On/Off Light Switch",
260: "Dimmer Switch",
261: "Color Dimmer Switch",
2112: "Control Bridge",
772: "Pump Controller",
15: "Generic Switch",
21: "Contact Sensor",
262: "Light Sensor",
263: "Occupancy Sensor",
770: "Temperature Sensor",
773: "Pressure Sensor",
774: "Flow Sensor",
775: "Humidity Sensor",
2128: "On/Off Sensor",
10: "Door Lock",
11: "Door Lock Controller",
514: "Window Covering",
515: "Window Covering Controller",
768: "Heating/Cooling Unit",
769: "Thermostat",
43: "Fan",
35: "Casting Video Player",
34: "Speaker",
36: "Content App",
40: "Basic Video Player",
41: "Casting Video Client",
42: "Video Remote Control",
39: "Mode Select",
45: "Air Purifier",
44: "Air Quality Sensor",
112: "Refrigerator",
113: "Temperature Controlled Cabinet",
114: "Room Air Conditioner",
115: "Laundry Washer",
116: "Robotic Vacuum Cleaner",
117: "Dishwasher",
118: "Smoke CO Alarm"
}
def AddServerOrClientNode(graphSection, endpoint, clusterName, color, nodeRef):
if (len(clusterName) > maxClusterNameLength):
clusterNameAdjustedLength = clusterName[:maxClusterNameLength] + '...'
else:
clusterNameAdjustedLength = clusterName
graphSection.node(f"ep{endpoint}_{clusterName}", label=f"{clusterNameAdjustedLength}", style="filled,rounded",
color=color, shape="box", fixedsize="true", width="3", height="0.5")
graphSection.edge(nodeRef, f"ep{endpoint}_{clusterName}", style="invis")
def CreateEndpointGraph(graph, graphSection, endpoint, wildcardResponse):
numberOfRowsInEndpoint = 2
partsListFromWildcardRead = wildcardResponse[endpoint][Clusters.Objects.Descriptor][Clusters.Objects.Descriptor.Attributes.PartsList]
listOfDeviceTypes = []
for deviceTypeStruct in wildcardResponse[endpoint][Clusters.Objects.Descriptor][Clusters.Objects.Descriptor.Attributes.DeviceTypeList]:
try:
listOfDeviceTypes.append(deviceTypeDict[deviceTypeStruct.deviceType])
except KeyError:
listOfDeviceTypes.append(deviceTypeStruct.deviceType)
# console.print(f"Endpoint: {endpoint}")
# console.print(f"DeviceTypeList: {listOfDeviceTypes}")
# console.print(f"PartsList: {partsListFromWildcardRead}")
endpointLabel = f"Endpoint: {endpoint}\lDeviceTypeList: {listOfDeviceTypes}\lPartsList: {partsListFromWildcardRead}\l" # noqa: W605
nextNodeRef = ""
nodeRef = f"ep{endpoint}"
clusterColumnCount = 0
graphSection.node(f"ep{endpoint}", label=endpointLabel, style="filled,rounded",
color="dodgerblue", shape="box", fixedsize="true", width="4", height="1")
for clusterId in wildcardResponse[endpoint][Clusters.Objects.Descriptor][Clusters.Objects.Descriptor.Attributes.ServerList]:
clusterColumnCount += 1
try:
clusterName = Clusters.ClusterObjects.ALL_CLUSTERS[clusterId].__name__
except KeyError:
clusterName = f"Custom server\l0x{clusterId:08X}" # noqa: W605
AddServerOrClientNode(graphSection, endpoint, clusterName, "olivedrab", nodeRef)
if clusterColumnCount == 2:
nextNodeRef = f"ep{endpoint}_{clusterName}"
elif clusterColumnCount == 3:
nodeRef = nextNodeRef
clusterColumnCount = 0
numberOfRowsInEndpoint += 1
for clusterId in wildcardResponse[endpoint][Clusters.Objects.Descriptor][Clusters.Objects.Descriptor.Attributes.ClientList]:
clusterColumnCount += 1
try:
clusterName = Clusters.ClusterObjects.ALL_CLUSTERS[clusterId].__name__
except KeyError:
clusterName = f"Custom client\l0x{clusterId:08X}" # noqa: W605
AddServerOrClientNode(graphSection, endpoint, clusterName, "orange", nodeRef)
if clusterColumnCount == 2:
nextNodeRef = f"ep{endpoint}_{clusterName}"
elif clusterColumnCount == 3:
nodeRef = nextNodeRef
clusterColumnCount = 0
numberOfRowsInEndpoint += 1
if endpoint != 0:
# Create link to endpoints in the parts list
for part in partsListFromWildcardRead:
graph.edge(f"ep{endpoint}", f"ep{part}", ltail=f"cluster_{endpoint}", minlen=f"{numberOfRowsInEndpoint}")
class TC_MatterDeviceGraph(MatterBaseTest):
@async_test_body
async def test_matter_device_graph(self):
# Create console to print
global console
console = Console()
# Run descriptor validation test
dev_ctrl = self.default_controller
# Perform wildcard read to get all attributes from device
console.print("[blue]Capturing data from device")
wildcardResponse = await dev_ctrl.ReadAttribute(self.dut_node_id, [('*')])
# console.print(wildcardResponse)
# Creating graph object
deviceGraph = graphviz.Digraph()
deviceGraph.attr(style="rounded", splines="line", compound="true")
console.print("[blue]Generating graph")
# Loop through each endpoint in the response from the wildcard read
for endpoint in wildcardResponse:
if endpoint == 0:
with deviceGraph.subgraph(name='cluster_rootnode') as rootNodeSection:
CreateEndpointGraph(deviceGraph, rootNodeSection, endpoint, wildcardResponse)
else:
with deviceGraph.subgraph(name='cluster_endpoints') as endpointsSection:
with endpointsSection.subgraph(name=f'cluster_{endpoint}') as endpointSection:
CreateEndpointGraph(deviceGraph, endpointSection, endpoint, wildcardResponse)
deviceGraph.save(f'{sys.path[0]}/matter-device-graph.dot')
deviceDataFile = open(f'{sys.path[0]}/matter-device-data.txt', 'w')
deviceDataFile.write(pprint.pformat((wildcardResponse)))
deviceDataFile.close()
if __name__ == "__main__":
default_matter_test_main()