blob: 660f1c85d12d49b5571ad41c8f90c9c396cae244 [file] [log] [blame]
# Copyright (c) 2022 Project CHIP Authors
# All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
from binascii import unhexlify, hexlify
import logging
from chip import ChipDeviceCtrl
import chip.clusters as Clusters
from chip.ChipStack import *
from import PersistentStorage
import chip.logging
import chip.native
import chip.FabricAdmin
from chip.utils import CommissioningBuildingBlocks
import builtins
from typing import Optional
from dataclasses import dataclass, field
from dataclasses import asdict as dataclass_asdict
import re
import os
import sys
import pathlib
import json
import uuid
import asyncio
from mobly import base_test
from mobly.config_parser import TestRunConfig, ENV_MOBLY_LOGPATH
from mobly import logger
from mobly import signals
from mobly import utils
from mobly.test_runner import TestRunner
# TODO: Add utility to commission a device if needed
# TODO: Add utilities to keep track of controllers/fabrics
logger = logging.getLogger("matter.python_testing")
DiscoveryFilterType = ChipDeviceCtrl.DiscoveryFilterType
_DEFAULT_STORAGE_PATH = "admin_storage.json"
_DEFAULT_LOG_PATH = "/tmp/matter_testing/logs"
# Mobly cannot deal with user config passing of ctypes objects,
# so we use this dict of uuid -> object to recover items stashed
# by reference.
def stash_globally(o: object) -> str:
id = str(uuid.uuid1())
_GLOBAL_DATA[id] = o
return id
def unstash_globally(id: str) -> object:
return _GLOBAL_DATA.get(id)
def default_paa_rootstore_from_root(root_path: pathlib.Path) -> Optional[pathlib.Path]:
"""Attempt to find a PAA trust store following SDK convention at `root_path`
This attempts to find {root_path}/credentials/development/paa-root-certs.
Returns the fully resolved path on success or None if not found.
start_path = root_path.resolve()
cred_path = start_path.joinpath("credentials")
dev_path = cred_path.joinpath("development")
paa_path = dev_path.joinpath("paa-root-certs")
return paa_path.resolve() if all([path.exists() for path in [cred_path, dev_path, paa_path]]) else None
def get_default_paa_trust_store(root_path: pathlib.Path) -> pathlib.Path:
"""Attempt to find a PAA trust store starting at `root_path`.
This tries to find by various heuristics, and goes up one level at a time
until found. After a given number of levels, it will stop.
This returns `root_path` if not PAA store is not found.
# TODO: Add heuristics about TH default PAA location
cur_dir = pathlib.Path.cwd()
max_levels = 10
for level in range(max_levels):
paa_trust_store_path = default_paa_rootstore_from_root(cur_dir)
if paa_trust_store_path is not None:
return paa_trust_store_path
# Go back one level
cur_dir = cur_dir.joinpath("..")
# On not having found a PAA dir, just return current dir to avoid blow-ups
return pathlib.Path.cwd()
class MatterTestConfig:
storage_path: pathlib.Path = None
logs_path: pathlib.Path = None
paa_trust_store_path: pathlib.Path = None
admin_vendor_id: int = _DEFAULT_ADMIN_VENDOR_ID
global_test_params: dict = field(default_factory=dict)
# List of explicit tests to run by name. If empty, all tests will run
tests: list[str] = field(default_factory=list)
commissioning_method: str = None
discriminator: int = None
setup_passcode: int = None
qr_code_content: str = None
manual_code: str = None
wifi_ssid: str = None
wifi_passphrase: str = None
thread_operational_dataset: str = None
# Node ID for basic DUT
dut_node_id: int = _DEFAULT_DUT_NODE_ID
# Node ID to use for controller/commissioner
controller_node_id: int = _DEFAULT_CONTROLLER_NODE_ID
# Fabric ID which to use
fabric_id: int = None
# "Alpha" by default
root_of_trust_index: int = _DEFAULT_TRUST_ROOT_INDEX
# If this is set, we will reuse root of trust keys at that location
chip_tool_credentials_path: pathlib.Path = None
class MatterStackState:
def __init__(self, config: MatterTestConfig):
self._logger = logger
self._config = config
self._fabric_admins = []
if not hasattr(builtins, "chipStack"):
if config.storage_path is None:
raise ValueError("Must have configured a MatterTestConfig.storage_path")
self._init_stack(already_initialized=False, persistentStoragePath=config.storage_path)
self._we_initialized_the_stack = True
self._we_initialized_the_stack = False
def _init_stack(self, already_initialized: bool, **kwargs):
if already_initialized:
self._chip_stack = builtins.chipStack
"Re-using existing ChipStack object found in current interpreter: storage path %s will be ignored!" % (self._config.storage_path))
# TODO: Warn that storage will not follow what we set in config
self._chip_stack = ChipStack(**kwargs)
builtins.chipStack = self._chip_stack
self._storage = self._chip_stack.GetStorageManager()
admin_list = self._storage.GetReplKey('fabricAdmins')
found_admin_list = True
except KeyError:
found_admin_list = False
if not found_admin_list:
self._logger.warn("No previous fabric administrative data found in persistent data: initializing a new one")
for admin_idx in admin_list:
f"Restoring FabricAdmin from storage to manage FabricId {admin_list[admin_idx]['fabricId']}, AdminIndex {admin_idx}")
fabricId=admin_list[admin_idx]['fabricId'], adminIndex=int(admin_idx)))
# TODO: support getting access to chip-tool credentials issuer's data
def Shutdown(self):
if self._we_initialized_the_stack:
# Unfortunately, all the below are singleton and possibly
# managed elsewhere so we have to be careful not to touch unless
# we initialized ourselves.
global_chip_stack = builtins.chipStack
def fabric_admins(self):
return self._fabric_admins
def storage(self) -> PersistentStorage:
return self._storage
def stack(self) -> ChipStack:
return builtins.chipStack
def bytes_from_hex(hex: str) -> bytes:
"""Converts any `hex` string representation including `01:ab:cd` to bytes
Handles any whitespace including newlines, which are all stripped.
return unhexlify("".join(hex.replace(":", "").replace(" ", "").split()))
def hex_from_bytes(b: bytes) -> str:
"""Converts a bytes object `b` into a hex string (reverse of bytes_from_hex)"""
return hexlify(b).decode("utf-8")
class MatterBaseTest(base_test.BaseTestClass):
def __init__(self, *args):
def matter_test_config(self) -> MatterTestConfig:
return unstash_globally(self.user_params.get("matter_test_config"))
def default_controller(self) -> ChipDeviceCtrl:
return unstash_globally(self.user_params.get("default_controller"))
def matter_stack(self) -> MatterStackState:
return unstash_globally(self.user_params.get("matter_stack"))
def dut_node_id(self) -> int:
return self.matter_test_config.dut_node_id
async def read_single_attribute(self, dev_ctrl: ChipDeviceCtrl, node_id: int, endpoint: int, attribute: object) -> object:
result = await dev_ctrl.ReadAttribute(node_id, [(endpoint, attribute)])
data = result[endpoint]
return list(data.values())[0][attribute]
def generate_mobly_test_config(matter_test_config: MatterTestConfig):
test_run_config = TestRunConfig()
# We use a default name. We don't use Mobly YAML configs, so that we can be
# freestanding without relying
test_run_config.testbed_name = "MatterTest"
log_path = matter_test_config.logs_path
log_path = _DEFAULT_LOG_PATH if log_path is None else log_path
if ENV_MOBLY_LOGPATH in os.environ:
log_path = os.environ[ENV_MOBLY_LOGPATH]
test_run_config.log_path = log_path
# TODO: For later, configure controllers
test_run_config.controller_configs = {}
test_run_config.user_params = matter_test_config.global_test_params
return test_run_config
def _find_test_class():
"""Finds the test class in a test script.
Walk through module members and find the subclass of MatterBaseTest. Only
one subclass is allowed in a test script.
The test class in the test module.
SystemExit: Raised if the number of test classes is not exactly one.
subclasses = utils.find_subclasses_in_module([MatterBaseTest], sys.modules['__main__'])
subclasses = [c for c in subclasses if c.__name__ != "MatterBaseTest"]
if len(subclasses) != 1:
'Exactly one subclass of `MatterBaseTest` should be in the main file. Found %s.' %
str([subclass.__name__ for subclass in subclasses]))
return subclasses[0]
def int_decimal_or_hex(s: str) -> int:
val = int(s, 0)
if val < 0:
raise ValueError("Negative values not supported")
return val
def byte_string_from_hex(s: str) -> bytes:
return unhexlify(s.replace(":", "").replace(" ", "").replace("0x", ""))
def int_from_manual_code(s: str) -> int:
regex = r"^([0-9]{11}|[0-9]{21})$"
match = re.match(regex, s)
if not match:
raise ValueError("Invalid manual code format, does not match %s" % regex)
return int(s, 10)
def int_named_arg(s: str) -> tuple[str, int]:
regex = r"^(?P<name>[a-zA-Z_0-9.]+):((?P<hex_value>0x[0-9a-fA-F_]+)|(?P<decimal_value>-?\d+))$"
match = re.match(regex, s)
if not match:
raise ValueError("Invalid int argument format, does not match %s" % regex)
name ="name")
value = int("hex_value"), 0)
value = int("decimal_value"), 10)
return (name, value)
def str_named_arg(s: str) -> tuple[str, str]:
regex = r"^(?P<name>[a-zA-Z_0-9.]+):(?P<value>.*)$"
match = re.match(regex, s)
if not match:
raise ValueError("Invalid string argument format, does not match %s" % regex)
return ("name"),"value"))
def float_named_arg(s: str) -> tuple[str, float]:
regex = r"^(?P<name>[a-zA-Z_0-9.]+):(?P<value>.*)$"
match = re.match(regex, s)
if not match:
raise ValueError("Invalid float argument format, does not match %s" % regex)
name ="name")
value = float("value"))
return (name, value)
def json_named_arg(s: str) -> tuple[str, object]:
regex = r"^(?P<name>[a-zA-Z_0-9.]+):(?P<value>.*)$"
match = re.match(regex, s)
if not match:
raise ValueError("Invalid JSON argument format, does not match %s" % regex)
name ="name")
value = json.loads("value"))
return (name, value)
def bool_named_arg(s: str) -> tuple[str, bool]:
regex = r"^(?P<name>[a-zA-Z_0-9.]+):((?P<truth_value>true|false)|(?P<decimal_value>[01]))$"
match = re.match(regex, s.lower())
if not match:
raise ValueError("Invalid bool argument format, does not match %s" % regex)
name ="name")
value = True if"truth_value") == "true" else False
value = int("decimal_value")) != 0
return (name, value)
def bytes_as_hex_named_arg(s: str) -> tuple[str, bytes]:
regex = r"^(?P<name>[a-zA-Z_0-9.]+):(?P<value>[0-9a-fA-F:]+)$"
match = re.match(regex, s)
if not match:
raise ValueError("Invalid bytes as hex argument format, does not match %s" % regex)
name ="name")
value_str ="value")
value_str = value_str.replace(":", "")
if len(value_str) % 2 != 0:
raise ValueError("Byte string argument value needs to be event number of hex chars")
value = unhexlify(value_str)
return (name, value)
def root_index(s: str) -> int:
"alpha": 1,
"beta": 2,
"gamma": 3
for name, id in CHIP_TOOL_COMPATIBILITY.items():
if s.lower() == name:
return id
root_index = int(s)
if root_index == 0:
raise ValueError("Only support root index >= 1")
return root_index
def populate_commissioning_args(args: argparse.Namespace, config: MatterTestConfig) -> bool:
if args.commissioning_method is None:
return True
config.commissioning_method = args.commissioning_method
if args.dut_node_id is None:
print("error: When --commissioning-method present, --dut-node-id is mandatory!")
return False
config.dut_node_id = args.dut_node_id
if args.discriminator is None and (args.qr_code is None and args.manual_code is None):
print("error: Missing --discriminator when no --qr-code/--manual-code present!")
return False
config.discriminator = args.discriminator
if args.passcode is None and (args.qr_code is None and args.manual_code is None):
print("error: Missing --passcode when no --qr-code/--manual-code present!")
return False
config.setup_passcode = args.passcode
if args.qr_code is not None and args.manual_code is not None:
print("error: Cannot have both --qr-code and --manual-code present!")
return False
config.qr_code_content = args.qr_code
config.manual_code = args.manual_code
config.root_of_trust_index = args.root_index
# Follow root of trust index if ID not provided to have same behavior as legacy
# chip-tool that fabricID == commissioner_name == root of trust index
config.fabric_id = args.fabric_id if args.fabric_id is not None else config.root_of_trust_index
if args.chip_tool_credentials_path is not None and not args.chip_tool_credentials_path.exists():
print("error: chip-tool credentials path %s doesn't exist!" % args.chip_tool_credentials_path)
return False
config.chip_tool_credentials_path = args.chip_tool_credentials_path
if config.commissioning_method == "ble-wifi":
if args.wifi_ssid is None:
print("error: missing --wifi-ssid <SSID> for --commissioning-method ble-wifi!")
return False
if args.wifi_passphrase is None:
print("error: missing --wifi-passphrase <passphrasse> for --commissioning-method ble-wifi!")
return False
config.wifi_ssid = args.wifi_ssid
config.wifi_passphrase = args.wifi_passphrase
elif config.commissioning_method == "ble-thread":
if args.thread_dataset_hex is None:
print("error: missing --thread-dataset-hex <DATASET_HEX> for --commissioning-method ble-thread!")
return False
config.thread_operational_dataset = args.thread_dataset_hex
return True
def convert_args_to_matter_config(args: argparse.Namespace) -> MatterTestConfig:
config = MatterTestConfig()
# Populate commissioning config if present, exiting on error
if not populate_commissioning_args(args, config):
config.storage_path = pathlib.Path(_DEFAULT_STORAGE_PATH) if args.storage_path is None else args.storage_path
config.logs_path = pathlib.Path(_DEFAULT_LOG_PATH) if args.logs_path is None else args.logs_path
config.paa_trust_store_path = args.paa_trust_store_path
config.controller_node_id = args.controller_node_id
# Accumulate all command-line-passed named args
all_global_args = []
argsets = [item for item in (args.int_arg, args.float_arg, args.string_arg, args.json_arg,
args.hex_arg, args.bool_arg) if item is not None]
for argset in argsets:
config.global_test_params = {}
for name, value in all_global_args:
config.global_test_params[name] = value
# Embed the rest of the config in the global test params dict which will be passed to Mobly tests
config.global_test_params["meta_config"] = {k: v for k, v in dataclass_asdict(config).items() if k != "global_test_params"}
return config
def parse_matter_test_args(argv: list[str]) -> MatterTestConfig:
parser = argparse.ArgumentParser(description='Matter standalone Python test')
basic_group = parser.add_argument_group(title="Basic arguments", description="Overall test execution arguments")
metavar='test_a test_b...',
help='A list of tests in the test class to execute.')
basic_group.add_argument('--storage-path', action="store", type=pathlib.Path,
metavar="PATH", help="Location for persisted storage of instance")
basic_group.add_argument('--logs-path', action="store", type=pathlib.Path, metavar="PATH", help="Location for test logs")
paa_path_default = get_default_paa_trust_store(pathlib.Path.cwd())
basic_group.add_argument('--paa-trust-store-path', action="store", type=pathlib.Path, metavar="PATH", default=paa_path_default,
help="PAA trust store path (default: %s)" % str(paa_path_default))
basic_group.add_argument('-N', '--controller-node-id', type=int_decimal_or_hex,
help='NodeID to use for initial/default controller (default: %d)' % _DEFAULT_CONTROLLER_NODE_ID)
basic_group.add_argument('-n', '--dut-node-id', type=int_decimal_or_hex,
metavar='NODE_ID', default=_DEFAULT_DUT_NODE_ID,
help='Node ID for primary DUT communication, and NodeID to assign if commissioning (default: %d)' % _DEFAULT_DUT_NODE_ID)
commission_group = parser.add_argument_group(title="Commissioning", description="Arguments to commission a node")
commission_group.add_argument('-m', '--commissioning-method', type=str,
choices=["on-network", "ble-wifi", "ble-thread"],
help='Name of commissioning method to use')
commission_group.add_argument('-d', '--discriminator', type=int_decimal_or_hex,
help='Discriminator to use for commissioning')
commission_group.add_argument('-p', '--passcode', type=int_decimal_or_hex,
help='PAKE passcode to use')
commission_group.add_argument('--wifi-ssid', type=str,
help='Wi-Fi SSID for ble-wifi commissioning')
commission_group.add_argument('--wifi-passphrase', type=str,
help='Wi-Fi passphrase for ble-wifi commissioning')
commission_group.add_argument('--thread-dataset-hex', type=byte_string_from_hex,
help='Thread operational dataset as a hex string for ble-thread commissioning')
commission_group.add_argument('--admin-vendor-id', action="store", type=int_decimal_or_hex, default=_DEFAULT_ADMIN_VENDOR_ID,
metavar="VENDOR_ID", help="VendorID to use during commissioning (default 0x%04X)" % _DEFAULT_ADMIN_VENDOR_ID)
code_group = parser.add_mutually_exclusive_group(required=False)
code_group.add_argument('-q', '--qr-code', type=str,
metavar="QR_CODE", help="QR setup code content (overrides passcode and discriminator)")
code_group.add_argument('--manual-code', type=int_from_manual_code,
metavar="MANUAL_CODE", help="Manual setup code content (overrides passcode and discriminator)")
fabric_group = parser.add_argument_group(
title="Fabric selection", description="Fabric selection for single-fabric basic usage, and commissioning")
fabric_group.add_argument('-f', '--fabric-id', type=int_decimal_or_hex,
help='Fabric ID on which to operate under the root of trust')
fabric_group.add_argument('-r', '--root-index', type=root_index,
help='Root of trust under which to operate/commission for single-fabric basic usage. alpha/beta/gamma are aliases for 1/2/3. Default (%d)' % _DEFAULT_TRUST_ROOT_INDEX)
fabric_group.add_argument('-c', '--chip-tool-credentials-path', type=pathlib.Path,
help='Path to chip-tool credentials file root')
args_group = parser.add_argument_group(title="Config arguments", description="Test configuration global arguments set")
args_group.add_argument('--int-arg', nargs='*', type=int_named_arg, metavar="NAME:VALUE",
help="Add a named test argument for an integer as hex or decimal (e.g. -2 or 0xFFFF_1234)")
args_group.add_argument('--bool-arg', nargs='*', type=bool_named_arg, metavar="NAME:VALUE",
help="Add a named test argument for an boolean value (e.g. true/false or 0/1)")
args_group.add_argument('--float-arg', nargs='*', type=float_named_arg, metavar="NAME:VALUE",
help="Add a named test argument for a floating point value (e.g. -2.1 or 6.022e23)")
args_group.add_argument('--string-arg', nargs='*', type=str_named_arg, metavar="NAME:VALUE",
help="Add a named test argument for a string value")
args_group.add_argument('--json-arg', nargs='*', type=json_named_arg, metavar="NAME:VALUE",
help="Add a named test argument for JSON stored as a list or dict")
args_group.add_argument('--hex-arg', nargs='*', type=bytes_as_hex_named_arg, metavar="NAME:VALUE",
help="Add a named test argument for an octet string in hex (e.g. 0011cafe or 00:11:CA:FE)")
if not argv:
argv = sys.argv[1:]
return convert_args_to_matter_config(parser.parse_known_args(argv)[0])
def async_test_body(body):
"""Decorator required to be applied whenever a `test_*` method is `async def`.
Since Mobly doesn't support asyncio directly, and the test methods are called
synchronously, we need a mechanism to allow an `async def` to be converted to
a asyncio-run synchronous method. This decorator does the wrapping.
def async_runner(*args, **kwargs):
return*args, **kwargs))
return async_runner
class CommissionDeviceTest(MatterBaseTest):
"""Test class auto-injected at the start of test list to commission a device when requested"""
def test_run_commissioning(self):
conf = self.matter_test_config"Starting commissioning for root index %d, fabric ID 0x%016X, node ID 0x%016X" %
(conf.root_of_trust_index, conf.fabric_id, conf.dut_node_id))"Commissioning method: %s" % conf.commissioning_method)
if not self._commission_device():
raise signals.TestAbortAll("Failed to commission node")
def _commission_device(self) -> bool:
dev_ctrl = self.default_controller
conf = self.matter_test_config
# TODO: support by manual code and QR
if conf.commissioning_method == "on-network":
return dev_ctrl.CommissionOnNetwork(nodeId=conf.dut_node_id, setupPinCode=conf.setup_passcode, filterType=DiscoveryFilterType.LONG_DISCRIMINATOR, filter=conf.discriminator)
elif conf.commissioning_method == "ble-wifi":
return dev_ctrl.CommissionWiFi(conf.discriminator, conf.setup_passcode, conf.dut_node_id, conf.wifi_ssid, conf.wifi_passphrase)
elif conf.commissioning_method == "ble-thread":
return dev_ctrl.CommissionThread(conf.discriminator, conf.setup_passcode, conf.dut_node_id, conf.thread_operational_dataset)
raise ValueError("Invalid commissioning method %s!" % conf.commissioning_method)
def default_matter_test_main(argv=None):
"""Execute the test class in a test module.
This is the default entry point for running a test script file directly.
In this case, only one test class in a test script is allowed.
To make your test script executable, add the following to your file:
.. code-block:: python
from import default_matter_test_main
if __name__ == '__main__':
argv: A list that is then parsed as command line args. If None, defaults to sys.argv
matter_test_config = parse_matter_test_args(argv)
# Find the test class in the test script.
test_class = _find_test_class()
# Load test config file.
test_config = generate_mobly_test_config(matter_test_config)
# Parse test specifiers if exist.
tests = None
if len(matter_test_config.tests) > 0:
tests = matter_test_config.tests
stack = MatterStackState(matter_test_config)
test_config.user_params["matter_stack"] = stash_globally(stack)
# TODO: Steer to right FabricAdmin!
default_controller = stack.fabric_admins[0].NewController(nodeId=matter_test_config.controller_node_id,
test_config.user_params["default_controller"] = stash_globally(default_controller)
test_config.user_params["matter_test_config"] = stash_globally(matter_test_config)
# Execute the test class with the config
ok = True
runner = TestRunner(log_dir=test_config.log_path,
with runner.mobly_logger():
if matter_test_config.commissioning_method is not None:
runner.add_test_class(test_config, CommissionDeviceTest, None)
runner.add_test_class(test_config, test_class, tests)
ok = runner.results.is_all_pass and ok
except signals.TestAbortAll:
ok = False
except Exception:
logging.exception('Exception when executing %s.', test_config.testbed_name)
ok = False
# Shutdown the stack when all done
if ok:"Final result: PASS !")
logging.error("Final result: FAIL !")