blob: 438ab89c55912b84667592b98330bf4a2c2cd76f [file] [log] [blame]
# Copyright (c) 2009-2021 Arm Limited
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import os
import platform
import random
import shlex
import re
from chip.setup_payload import SetupPayload
from chip import exceptions
if platform.system() == 'Darwin':
from chip.ChipCoreBluetoothMgr import CoreBluetoothManager as BleManager
elif sys.platform.startswith('linux'):
from chip.ChipBluezMgr import BluezManager as BleManager
import logging
log = logging.getLogger(__name__)
class ParsingError(exceptions.ChipStackException):
def __init__(self, msg=None):
self.msg = "Parsing Error: " + msg
def __str__(self):
return self.msg
def get_device_details(device):
"""
Get device details from logs
:param device: serial device instance
:return: device details dictionary or None
"""
ret = device.wait_for_output("SetupQRCode")
if ret == None or len(ret) < 2:
return None
qr_code = re.sub(
r"[\[\]]", "", ret[-1].partition("SetupQRCode:")[2]).strip()
try:
device_details = dict(SetupPayload().ParseQrCode(
"VP:vendorpayload%{}".format(qr_code)).attributes)
except exceptions.ChipStackError as ex:
log.error(ex.msg)
return None
return device_details
def ParseEncodedString(value):
if value.find(":") < 0:
raise ParsingError(
"Value should be encoded in encoding:encodedvalue format")
enc, encValue = value.split(":", 1)
if enc == "str":
return encValue.encode("utf-8") + b'\x00'
elif enc == "hex":
return bytes.fromhex(encValue)
raise ParsingError("Only str and hex encoding is supported")
def ParseValueWithType(value, type):
if type == 'int':
return int(value)
elif type == 'str':
return value
elif type == 'bytes':
return ParseEncodedString(value)
elif type == 'bool':
return (value.upper() not in ['F', 'FALSE', '0'])
else:
raise ParsingError('Cannot recognize type: {}'.format(type))
def FormatZCLArguments(args, command):
commandArgs = {}
for kvPair in args:
if kvPair.find("=") < 0:
raise ParsingError("Argument should in key=value format")
key, value = kvPair.split("=", 1)
valueType = command.get(key, None)
commandArgs[key] = ParseValueWithType(value, valueType)
return commandArgs
def send_zcl_command(devCtrl, line):
"""
Send ZCL message to device:
<cluster> <command> <nodeid> <endpoint> <groupid> [key=value]...
:param devCtrl: device controller instance
:param line: command line
:return: error code and command responde
"""
res = None
err = 0
try:
args = shlex.split(line)
all_commands = devCtrl.ZCLCommandList()
if len(args) < 5:
raise exceptions.InvalidArgumentCount(5, len(args))
if args[0] not in all_commands:
raise exceptions.UnknownCluster(args[0])
command = all_commands.get(args[0]).get(args[1], None)
# When command takes no arguments, (not command) is True
if command == None:
raise exceptions.UnknownCommand(args[0], args[1])
err, res = devCtrl.ZCLSend(args[0], args[1], int(
args[2]), int(args[3]), int(args[4]), FormatZCLArguments(args[5:], command), blocking=True)
if err != 0:
log.error("Failed to send ZCL command [{}] {}.".format(err, res))
elif res != None:
log.info("Success, received command response:")
log.info(res)
else:
log.info("Success, no command response.")
except exceptions.ChipStackException as ex:
log.error("An exception occurred during processing ZCL command:")
log.error(str(ex))
err = -1
except Exception as ex:
log.error("An exception occurred during processing input:")
log.error(str(ex))
err = -1
return (err, res)
def scan_chip_ble_devices(devCtrl):
"""
BLE scan CHIP device
BLE scanning for 10 seconds and collect the results
:param devCtrl: device controller instance
:return: List of visible BLE devices
"""
devices = []
bleMgr = BleManager(devCtrl)
bleMgr.scan("-t 10")
for device in bleMgr.peripheral_list:
devIdInfo = bleMgr.get_peripheral_devIdInfo(device)
if devIdInfo:
devInfo = devIdInfo.__dict__
devInfo["name"] = device.Name
devices.append(devInfo)
return devices
def check_chip_ble_devices_advertising(devCtrl, name, deviceDetails=None):
"""
Check if CHIP device advertise
BLE scanning for 10 seconds and compare with device details
:param devCtrl: device controller instance
:param name: device advertising name
:param name: device details
:return: True if device advertise else False
"""
ble_chip_device = scan_chip_ble_devices(devCtrl)
if ble_chip_device == None or len(ble_chip_device) == 0:
log.info("No BLE CHIP device found")
return False
chip_device_found = False
for ble_device in ble_chip_device:
if deviceDetails != None:
if (ble_device["name"] == name and
int(ble_device["discriminator"]) == int(deviceDetails["Discriminator"]) and
int(ble_device["vendorId"]) == int(deviceDetails["VendorID"]) and
int(ble_device["productId"]) == int(deviceDetails["ProductID"])):
chip_device_found = True
break
else:
if (ble_device["name"] == name):
chip_device_found = True
break
return chip_device_found
def connect_device_over_ble(devCtrl, discriminator, pinCode, nodeId=None):
"""
Connect to Matter accessory device over BLE
:param devCtrl: device controller instance
:param discriminator: CHIP device discriminator
:param pinCode: CHIP device pin code
:param nodeId: default value of node ID
:return: node ID is provisioning successful, otherwise None
"""
if nodeId == None:
nodeId = random.randint(1, 1000000)
try:
devCtrl.ConnectBLE(int(discriminator), int(pinCode), int(nodeId))
except exceptions.ChipStackException as ex:
log.error("Connect device over BLE failed: {}".format(str(ex)))
return None
return nodeId
def close_connection(devCtrl, nodeId):
"""
Close the BLE connection
:param devCtrl: device controller instance
:return: true if successful, otherwise false
"""
try:
devCtrl.CloseSession(nodeId)
except exceptions.ChipStackException as ex:
log.error("Close session failed: {}".format(str(ex)))
return False
return True
def close_ble(devCtrl):
"""
Close the BLE connection
:param devCtrl: device controller instance
:return: true if successful, otherwise false
"""
try:
devCtrl.CloseBLEConnection()
except exceptions.ChipStackException as ex:
log.error("Close BLE connection failed: {}".format(str(ex)))
return False
return True
def commissioning_wifi(devCtrl, ssid, password, nodeId):
"""
Commissioning a Wi-Fi device
:param devCtrl: device controller instance
:param ssid: network ssid
:param password: network password
:param nodeId: value of node ID
:return: error code
"""
# Inject the credentials to the device
err, res = send_zcl_command(
devCtrl, "NetworkCommissioning AddOrUpdateWiFiNetwork {} 0 0 ssid=str:{} credentials=str:{} breadcrumb=0 timeoutMs=1000".format(nodeId, ssid, password))
if err != 0 and res["Status"] != 0:
log.error("Set Wi-Fi credentials failed [{}]".format(err))
return err
# Enable the Wi-Fi interface
err, res = send_zcl_command(
devCtrl, "NetworkCommissioning ConnectNetwork {} 0 0 networkID=str:{} breadcrumb=0 timeoutMs=1000".format(nodeId, ssid))
if err != 0 and res["Status"] != 0:
log.error("Enable Wi-Fi failed [{}]".format(err))
return err
return err
def resolve_device(devCtrl, nodeId):
"""
Discover IP address and port of the device.
:param devCtrl: device controller instance
:param nodeId: value of node ID
:return: device IP address and port if successful, otherwise None
"""
ret = None
try:
devCtrl.ResolveNode(int(nodeId))
ret = devCtrl.GetAddressAndPort(int(nodeId))
if ret == None:
log.error("Get address and port failed")
except exceptions.ChipStackException as ex:
log.error("Resolve node failed {}".format(str(ex)))
return ret