blob: 1865cf6274f6ed85c05bb7e55d4b0b113bf15e4f [file] [log] [blame]
#
# Copyright (c) 2022 Project CHIP Authors
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import asyncio
import logging
import random
import re
from chip import discovery, exceptions
from chip.clusters import Attribute as ClusterAttribute
from chip.clusters import Objects as GeneratedObjects
from chip.interaction_model import delegate as IM
from chip.setup_payload import SetupPayload
log = logging.getLogger(__name__)
IP_ADDRESS_BUFFER_LEN = 100
def get_setup_payload(device):
"""
Get device setup payload from logs
:param device: serial device instance
:return: setup payload or None
"""
ret = device.wait_for_output("SetupQRCode")
if ret is None or len(ret) < 2:
return None
qr_code = re.sub(
r"[\[\]]", "", ret[-1].partition("SetupQRCode:")[2]).strip()
try:
setup_payload = SetupPayload().ParseQrCode(
"VP:vendorpayload%{}".format(qr_code))
except exceptions.ChipStackError as ex:
log.error("SetupPayload failed {}".format(str(ex)))
return None
return setup_payload
def discover_device(devCtrl, setupPayload):
"""
Discover specific device in network.
Search by device discriminator from setup payload
:param devCtrl: device controller instance
:param setupPayload: device setup payload
:return: CommissionableNode object if node device discovered or None if failed
"""
log.info("Attempting to find device on network")
longDiscriminator = int(setupPayload.attributes['Long discriminator'])
try:
res = devCtrl.DiscoverCommissionableNodes(
discovery.FilterType.LONG_DISCRIMINATOR, longDiscriminator, stopOnFirst=True, timeoutSecond=5)
except exceptions.ChipStackError as ex:
log.error("DiscoverCommissionableNodes failed {}".format(str(ex)))
return None
if not res:
log.info("Device not found")
return None
log.info("Device found at %r" % res[0])
return res[0]
def connect_device(devCtrl, setupPayload, commissionableDevice, nodeId=None):
"""
Connect to Matter discovered device on network
:param devCtrl: device controller instance
:param setupPayload: device setup payload
:param commissionableDevice: CommissionableNode object with discovered device
:param nodeId: device node ID
:return: node ID if connection successful or None if failed
"""
if nodeId is None:
nodeId = random.randint(1, 1000000)
log.info("Connecting to device %d" % nodeId)
pincode = int(setupPayload.attributes['SetUpPINCode'])
try:
res = devCtrl.CommissionOnNetwork(
nodeId, pincode, filterType=discovery.FilterType.INSTANCE_NAME, filter=commissionableDevice.instanceName)
except exceptions.ChipStackError as ex:
log.error("Commission discovered device failed {}".format(str(ex)))
return None
if not res:
log.info("Commission discovered device failed: %r" % res)
return None
return nodeId
def disconnect_device(devCtrl, nodeId):
"""
Disconnect Matter device
:param devCtrl: device controller instance
:param nodeId: device node ID
:return: node ID if connection successful or None if failed
"""
try:
devCtrl.CloseSession(nodeId)
except exceptions.ChipStackException as ex:
log.error("CloseSession failed {}".format(str(ex)))
return False
return True
def send_zcl_command(devCtrl, cluster: str, command: str, nodeId: int, endpoint: int, args, requestTimeoutMs: int = None):
"""
Send ZCL command to device.
:param devCtrl: device controller instance
:param cluster: cluster name
:param command: command name
:param nodeId: device node ID
:param endpoint: device endpoint
:parma args: command argument in dictionary format
:param requestTimeoutMs: command request timeout in ms
:return: error code and command response
"""
res = None
err = 0
try:
allCommands = devCtrl.ZCLCommandList()
if cluster not in allCommands:
raise exceptions.UnknownCluster(cluster)
cmd = allCommands.get(cluster).get(command)
if cmd is None:
raise exceptions.UnknownCommand(cluster, command)
clusterObj = getattr(GeneratedObjects, cluster)
commandObj = getattr(clusterObj.Commands, command)
if args is not None:
req = commandObj(**args)
else:
req = commandObj()
res = asyncio.run(devCtrl.SendCommand(int(nodeId), int(endpoint), req, timedRequestTimeoutMs=requestTimeoutMs))
except exceptions.ChipStackException as ex:
log.error("An exception occurred during processing ZCL command: {}".format(str(ex)))
err = -1
except Exception as ex:
log.error("An exception occurred during processing input: {}".format(str(ex)))
err = -1
return (err, res)
def write_zcl_attribute(devCtrl, cluster: str, attribute: str, nodeId: int, endpoint: int, value):
"""
Write ZCL attribute to device.
:param devCtrl: device controller instance
:param cluster: cluster name
:param attribute: attribute name
:param nodeId: device node ID
:param endpoint: device endpoint
:parma value: attribute value to write
:return: error code and attribute response
"""
res = None
err = 0
try:
allAttrs = devCtrl.ZCLAttributeList()
if cluster not in allAttrs:
raise exceptions.UnknownCluster(cluster)
attrDetails = allAttrs.get(cluster).get(attribute)
if attrDetails is None:
raise exceptions.UnknownAttribute(cluster, attribute)
clusterObj = getattr(GeneratedObjects, cluster)
attributeObj = getattr(clusterObj.Attributes, attribute)
req = attributeObj(value)
res = asyncio.run(devCtrl.WriteAttribute(nodeId, [(endpoint, req)]))
except exceptions.ChipStackException as ex:
log.error("An exception occurred during processing ZCL attribute: {}".format(str(ex)))
err = -1
except Exception as ex:
log.error("An exception occurred during processing input: {}".format(str(ex)))
err = -1
return (err, res)
def read_zcl_attribute(devCtrl, cluster: str, attribute: str, nodeId: int, endpoint: int):
"""
Read ZCL attribute from device.
:param devCtrl: device controller instance
:param cluster: cluster name
:param attribute: attribute name
:param nodeId: device node ID
:param endpoint: device endpoint
:return: error code and attribute response
"""
res = None
err = 0
try:
allAttrs = devCtrl.ZCLAttributeList()
if cluster not in allAttrs:
raise exceptions.UnknownCluster(cluster)
attrDetails = allAttrs.get(cluster).get(attribute)
if attrDetails is None:
raise exceptions.UnknownAttribute(cluster, attribute)
clusterObj = getattr(GeneratedObjects, cluster)
attributeObj = getattr(clusterObj.Attributes, attribute)
result = asyncio.run(devCtrl.ReadAttribute(nodeId, [(endpoint, attributeObj)]))
path = ClusterAttribute.AttributePath(
EndpointId=endpoint, Attribute=attributeObj)
res = IM.AttributeReadResult(path=IM.AttributePath(nodeId=nodeId, endpointId=path.EndpointId, clusterId=path.ClusterId,
attributeId=path.AttributeId), status=0, value=result[endpoint][clusterObj][attributeObj],
dataVersion=result[endpoint][clusterObj][ClusterAttribute.DataVersion])
except exceptions.ChipStackException as ex:
log.error("An exception occurred during processing ZCL attribute: {}".format(str(ex)))
err = -1
except Exception as ex:
log.error("An exception occurred during processing input: {}".format(str(ex)))
err = -1
return (err, res)
def get_shell_commands_from_help_response(response):
"""
Parse shell help command response to get the list of supported commands
:param response: help command response
:return: list of supported commands
"""
return [line.split()[0].strip() for line in response]
def get_log_messages_from_response(response):
"""
Parse shell help command response to get the list of supported commands
:param response: device response
:return: raw log messages
"""
return [' '.join(line.split()[2:]) for line in response]