blob: 89e52736e86556b614169cc900317d1c0e320824 [file] [log] [blame]
# Copyright (c) 2022 Project CHIP Authors
# All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import asyncio
import builtins
import json
import logging
import os
import pathlib
import re
import sys
import uuid
from binascii import hexlify, unhexlify
from dataclasses import asdict as dataclass_asdict
from dataclasses import dataclass, field
from typing import List, Optional, Tuple
# isort: off
from chip import ChipDeviceCtrl # Needed before chip.FabricAdmin
import chip.FabricAdmin # Needed before chip.CertificateAuthority
import chip.CertificateAuthority
# isort: on
import chip.clusters as Clusters
import chip.logging
import chip.native
from chip.ChipStack import *
from chip.interaction_model import InteractionModelError, Status
from import PersistentStorage
from chip.utils import CommissioningBuildingBlocks
from mobly import asserts, base_test, logger, signals, utils
from mobly.config_parser import ENV_MOBLY_LOGPATH, TestRunConfig
from mobly.test_runner import TestRunner
# TODO: Add utility to commission a device if needed
# TODO: Add utilities to keep track of controllers/fabrics
logger = logging.getLogger("matter.python_testing")
DiscoveryFilterType = ChipDeviceCtrl.DiscoveryFilterType
_DEFAULT_STORAGE_PATH = "admin_storage.json"
_DEFAULT_LOG_PATH = "/tmp/matter_testing/logs"
_DEFAULT_DUT_NODE_ID = 0x12344321
# Mobly cannot deal with user config passing of ctypes objects,
# so we use this dict of uuid -> object to recover items stashed
# by reference.
def stash_globally(o: object) -> str:
id = str(uuid.uuid1())
_GLOBAL_DATA[id] = o
return id
def unstash_globally(id: str) -> object:
return _GLOBAL_DATA.get(id)
def default_paa_rootstore_from_root(root_path: pathlib.Path) -> Optional[pathlib.Path]:
"""Attempt to find a PAA trust store following SDK convention at `root_path`
This attempts to find {root_path}/credentials/development/paa-root-certs.
Returns the fully resolved path on success or None if not found.
start_path = root_path.resolve()
cred_path = start_path.joinpath("credentials")
dev_path = cred_path.joinpath("development")
paa_path = dev_path.joinpath("paa-root-certs")
return paa_path.resolve() if all([path.exists() for path in [cred_path, dev_path, paa_path]]) else None
def get_default_paa_trust_store(root_path: pathlib.Path) -> pathlib.Path:
"""Attempt to find a PAA trust store starting at `root_path`.
This tries to find by various heuristics, and goes up one level at a time
until found. After a given number of levels, it will stop.
This returns `root_path` if not PAA store is not found.
# TODO: Add heuristics about TH default PAA location
cur_dir = pathlib.Path.cwd()
max_levels = 10
for level in range(max_levels):
paa_trust_store_path = default_paa_rootstore_from_root(cur_dir)
if paa_trust_store_path is not None:
return paa_trust_store_path
# Go back one level
cur_dir = cur_dir.joinpath("..")
# On not having found a PAA dir, just return current dir to avoid blow-ups
return pathlib.Path.cwd()
class MatterTestConfig:
storage_path: pathlib.Path = None
logs_path: pathlib.Path = None
paa_trust_store_path: pathlib.Path = None
ble_interface_id: int = None
commission_only: bool = False
admin_vendor_id: int = _DEFAULT_ADMIN_VENDOR_ID
case_admin_subject: int = None
global_test_params: dict = field(default_factory=dict)
# List of explicit tests to run by name. If empty, all tests will run
tests: List[str] = field(default_factory=list)
commissioning_method: str = None
discriminator: int = None
setup_passcode: int = None
commissionee_ip_address_just_for_testing: str = None
maximize_cert_chains: bool = False
qr_code_content: str = None
manual_code: str = None
wifi_ssid: str = None
wifi_passphrase: str = None
thread_operational_dataset: str = None
# Node ID for basic DUT
dut_node_id: int = _DEFAULT_DUT_NODE_ID
# Node ID to use for controller/commissioner
controller_node_id: int = _DEFAULT_CONTROLLER_NODE_ID
# CAT Tags for default controller/commissioner
controller_cat_tags: List[int] = field(default_factory=list)
# Fabric ID which to use
fabric_id: int = None
# "Alpha" by default
root_of_trust_index: int = _DEFAULT_TRUST_ROOT_INDEX
# If this is set, we will reuse root of trust keys at that location
chip_tool_credentials_path: pathlib.Path = None
class MatterStackState:
def __init__(self, config: MatterTestConfig):
self._logger = logger
self._config = config
if not hasattr(builtins, "chipStack"):
if config.storage_path is None:
raise ValueError("Must have configured a MatterTestConfig.storage_path")
self._init_stack(already_initialized=False, persistentStoragePath=config.storage_path)
self._we_initialized_the_stack = True
self._we_initialized_the_stack = False
def _init_stack(self, already_initialized: bool, **kwargs):
if already_initialized:
self._chip_stack = builtins.chipStack
"Re-using existing ChipStack object found in current interpreter: storage path %s will be ignored!" % (self._config.storage_path))
# TODO: Warn that storage will not follow what we set in config
self._chip_stack = ChipStack(**kwargs)
builtins.chipStack = self._chip_stack
self._storage = self._chip_stack.GetStorageManager()
self._certificate_authority_manager = chip.CertificateAuthority.CertificateAuthorityManager(chipStack=self._chip_stack)
if (len(self._certificate_authority_manager.activeCaList) == 0):
"Didn't find any CertificateAuthorities in storage -- creating a new CertificateAuthority + FabricAdmin...")
ca = self._certificate_authority_manager.NewCertificateAuthority(caIndex=self._config.root_of_trust_index)
ca.maximizeCertChains = self._config.maximize_cert_chains
ca.NewFabricAdmin(vendorId=0xFFF1, fabricId=self._config.fabric_id)
elif (len(self._certificate_authority_manager.activeCaList[0].adminList) == 0):
self._logger.warn("Didn't find any FabricAdmins in storage -- creating a new one...")
self._certificate_authority_manager.activeCaList[0].NewFabricAdmin(vendorId=0xFFF1, fabricId=self._config.fabric_id)
# TODO: support getting access to chip-tool credentials issuer's data
def Shutdown(self):
if self._we_initialized_the_stack:
# Unfortunately, all the below are singleton and possibly
# managed elsewhere so we have to be careful not to touch unless
# we initialized ourselves.
global_chip_stack = builtins.chipStack
def certificate_authorities(self):
return self._certificate_authority_manager.activeCaList
def certificate_authority_manager(self):
return self._certificate_authority_manager
def storage(self) -> PersistentStorage:
return self._storage
def stack(self) -> ChipStack:
return builtins.chipStack
def bytes_from_hex(hex: str) -> bytes:
"""Converts any `hex` string representation including `01:ab:cd` to bytes
Handles any whitespace including newlines, which are all stripped.
return unhexlify("".join(hex.replace(":", "").replace(" ", "").split()))
def hex_from_bytes(b: bytes) -> str:
"""Converts a bytes object `b` into a hex string (reverse of bytes_from_hex)"""
return hexlify(b).decode("utf-8")
class MatterBaseTest(base_test.BaseTestClass):
def __init__(self, *args):
def matter_test_config(self) -> MatterTestConfig:
return unstash_globally(self.user_params.get("matter_test_config"))
def default_controller(self) -> ChipDeviceCtrl:
return unstash_globally(self.user_params.get("default_controller"))
def matter_stack(self) -> MatterStackState:
return unstash_globally(self.user_params.get("matter_stack"))
def certificate_authority_manager(self) -> chip.CertificateAuthority.CertificateAuthorityManager:
return unstash_globally(self.user_params.get("certificate_authority_manager"))
def dut_node_id(self) -> int:
return self.matter_test_config.dut_node_id
async def read_single_attribute(self, dev_ctrl: ChipDeviceCtrl, node_id: int, endpoint: int, attribute: object) -> object:
result = await dev_ctrl.ReadAttribute(node_id, [(endpoint, attribute)])
data = result[endpoint]
return list(data.values())[0][attribute]
async def read_single_attribute_check_success(self, cluster: object, attribute: object, dev_ctrl: ChipDeviceCtrl = None, node_id: int = None, endpoint: int = 0) -> object:
if dev_ctrl is None:
dev_ctrl = self.default_controller
if node_id is None:
node_id = self.dut_node_id
result = await dev_ctrl.ReadAttribute(node_id, [(endpoint, attribute)])
attr_ret = result[endpoint][cluster][attribute]
err_msg = "Error reading {}:{}".format(str(cluster), str(attribute))
asserts.assert_true(attr_ret is not None, err_msg)
asserts.assert_false(isinstance(attr_ret, Clusters.Attribute.ValueDecodeFailure), err_msg)
return attr_ret
async def read_single_attribute_expect_error(self, cluster: object, attribute: object, error: Status, dev_ctrl: ChipDeviceCtrl = None, node_id: int = None, endpoint: int = 0) -> object:
if dev_ctrl is None:
dev_ctrl = self.default_controller
if node_id is None:
node_id = self.dut_node_id
result = await dev_ctrl.ReadAttribute(node_id, [(endpoint, attribute)])
attr_ret = result[endpoint][cluster][attribute]
err_msg = "Did not see expected error when reading {}:{}".format(str(cluster), str(attribute))
asserts.assert_true(attr_ret is not None, err_msg)
asserts.assert_true(isinstance(attr_ret, Clusters.Attribute.ValueDecodeFailure), err_msg)
asserts.assert_true(isinstance(attr_ret.Reason, InteractionModelError), err_msg)
asserts.assert_equal(attr_ret.Reason.status, error, err_msg)
return attr_ret
def print_step(self, stepnum: int, title: str) -> None:'***** Test Step %d : %s', stepnum, title)
def generate_mobly_test_config(matter_test_config: MatterTestConfig):
test_run_config = TestRunConfig()
# We use a default name. We don't use Mobly YAML configs, so that we can be
# freestanding without relying
test_run_config.testbed_name = "MatterTest"
log_path = matter_test_config.logs_path
log_path = _DEFAULT_LOG_PATH if log_path is None else log_path
if ENV_MOBLY_LOGPATH in os.environ:
log_path = os.environ[ENV_MOBLY_LOGPATH]
test_run_config.log_path = log_path
# TODO: For later, configure controllers
test_run_config.controller_configs = {}
test_run_config.user_params = matter_test_config.global_test_params
return test_run_config
def _find_test_class():
"""Finds the test class in a test script.
Walk through module members and find the subclass of MatterBaseTest. Only
one subclass is allowed in a test script.
The test class in the test module.
SystemExit: Raised if the number of test classes is not exactly one.
subclasses = utils.find_subclasses_in_module([MatterBaseTest], sys.modules['__main__'])
subclasses = [c for c in subclasses if c.__name__ != "MatterBaseTest"]
if len(subclasses) != 1:
'Exactly one subclass of `MatterBaseTest` should be in the main file. Found %s.' %
str([subclass.__name__ for subclass in subclasses]))
return subclasses[0]
def int_decimal_or_hex(s: str) -> int:
val = int(s, 0)
if val < 0:
raise ValueError("Negative values not supported")
return val
def byte_string_from_hex(s: str) -> bytes:
return unhexlify(s.replace(":", "").replace(" ", "").replace("0x", ""))
def int_from_manual_code(s: str) -> int:
regex = r"^([0-9]{11}|[0-9]{21})$"
match = re.match(regex, s)
if not match:
raise ValueError("Invalid manual code format, does not match %s" % regex)
return int(s, 10)
def int_named_arg(s: str) -> Tuple[str, int]:
regex = r"^(?P<name>[a-zA-Z_0-9.]+):((?P<hex_value>0x[0-9a-fA-F_]+)|(?P<decimal_value>-?\d+))$"
match = re.match(regex, s)
if not match:
raise ValueError("Invalid int argument format, does not match %s" % regex)
name ="name")
value = int("hex_value"), 0)
value = int("decimal_value"), 10)
return (name, value)
def str_named_arg(s: str) -> Tuple[str, str]:
regex = r"^(?P<name>[a-zA-Z_0-9.]+):(?P<value>.*)$"
match = re.match(regex, s)
if not match:
raise ValueError("Invalid string argument format, does not match %s" % regex)
return ("name"),"value"))
def float_named_arg(s: str) -> Tuple[str, float]:
regex = r"^(?P<name>[a-zA-Z_0-9.]+):(?P<value>.*)$"
match = re.match(regex, s)
if not match:
raise ValueError("Invalid float argument format, does not match %s" % regex)
name ="name")
value = float("value"))
return (name, value)
def json_named_arg(s: str) -> Tuple[str, object]:
regex = r"^(?P<name>[a-zA-Z_0-9.]+):(?P<value>.*)$"
match = re.match(regex, s)
if not match:
raise ValueError("Invalid JSON argument format, does not match %s" % regex)
name ="name")
value = json.loads("value"))
return (name, value)
def bool_named_arg(s: str) -> Tuple[str, bool]:
regex = r"^(?P<name>[a-zA-Z_0-9.]+):((?P<truth_value>true|false)|(?P<decimal_value>[01]))$"
match = re.match(regex, s.lower())
if not match:
raise ValueError("Invalid bool argument format, does not match %s" % regex)
name ="name")
value = True if"truth_value") == "true" else False
value = int("decimal_value")) != 0
return (name, value)
def bytes_as_hex_named_arg(s: str) -> Tuple[str, bytes]:
regex = r"^(?P<name>[a-zA-Z_0-9.]+):(?P<value>[0-9a-fA-F:]+)$"
match = re.match(regex, s)
if not match:
raise ValueError("Invalid bytes as hex argument format, does not match %s" % regex)
name ="name")
value_str ="value")
value_str = value_str.replace(":", "")
if len(value_str) % 2 != 0:
raise ValueError("Byte string argument value needs to be event number of hex chars")
value = unhexlify(value_str)
return (name, value)
def root_index(s: str) -> int:
"alpha": 1,
"beta": 2,
"gamma": 3
for name, id in CHIP_TOOL_COMPATIBILITY.items():
if s.lower() == name:
return id
root_index = int(s)
if root_index == 0:
raise ValueError("Only support root index >= 1")
return root_index
def populate_commissioning_args(args: argparse.Namespace, config: MatterTestConfig) -> bool:
if args.commissioning_method is None:
return True
config.commissioning_method = args.commissioning_method
config.commission_only = args.commission_only
if args.dut_node_id is None:
print("error: When --commissioning-method present, --dut-node-id is mandatory!")
return False
config.dut_node_id = args.dut_node_id
if args.discriminator is None and (args.qr_code is None and args.manual_code is None):
print("error: Missing --discriminator when no --qr-code/--manual-code present!")
return False
config.discriminator = args.discriminator
if args.passcode is None and (args.qr_code is None and args.manual_code is None):
print("error: Missing --passcode when no --qr-code/--manual-code present!")
return False
config.setup_passcode = args.passcode
if args.qr_code is not None and args.manual_code is not None:
print("error: Cannot have both --qr-code and --manual-code present!")
return False
config.qr_code_content = args.qr_code
config.manual_code = args.manual_code
config.root_of_trust_index = args.root_index
# Follow root of trust index if ID not provided to have same behavior as legacy
# chip-tool that fabricID == commissioner_name == root of trust index
config.fabric_id = args.fabric_id if args.fabric_id is not None else config.root_of_trust_index
if args.chip_tool_credentials_path is not None and not args.chip_tool_credentials_path.exists():
print("error: chip-tool credentials path %s doesn't exist!" % args.chip_tool_credentials_path)
return False
config.chip_tool_credentials_path = args.chip_tool_credentials_path
if config.commissioning_method == "ble-wifi":
if args.wifi_ssid is None:
print("error: missing --wifi-ssid <SSID> for --commissioning-method ble-wifi!")
return False
if args.wifi_passphrase is None:
print("error: missing --wifi-passphrase <passphrasse> for --commissioning-method ble-wifi!")
return False
config.wifi_ssid = args.wifi_ssid
config.wifi_passphrase = args.wifi_passphrase
elif config.commissioning_method == "ble-thread":
if args.thread_dataset_hex is None:
print("error: missing --thread-dataset-hex <DATASET_HEX> for --commissioning-method ble-thread!")
return False
config.thread_operational_dataset = args.thread_dataset_hex
elif config.commissioning_method == "on-network-ip":
if args.ip_addr is None:
print("error: missing --ip-addr <IP_ADDRESS> for --commissioning-method on-network-ip")
return False
config.commissionee_ip_address_just_for_testing = args.ip_addr
if args.case_admin_subject is None:
# Use controller node ID as CASE admin subject during commissioning if nothing provided
config.case_admin_subject = config.controller_node_id
# If a CASE admin subject is provided, then use that
config.case_admin_subject = args.case_admin_subject
return True
def convert_args_to_matter_config(args: argparse.Namespace) -> MatterTestConfig:
config = MatterTestConfig()
# Populate commissioning config if present, exiting on error
if not populate_commissioning_args(args, config):
config.storage_path = pathlib.Path(_DEFAULT_STORAGE_PATH) if args.storage_path is None else args.storage_path
config.logs_path = pathlib.Path(_DEFAULT_LOG_PATH) if args.logs_path is None else args.logs_path
config.paa_trust_store_path = args.paa_trust_store_path
config.ble_interface_id = args.ble_interface_id
config.controller_node_id = args.controller_node_id
# Accumulate all command-line-passed named args
all_global_args = []
argsets = [item for item in (args.int_arg, args.float_arg, args.string_arg, args.json_arg,
args.hex_arg, args.bool_arg) if item is not None]
for argset in argsets:
config.global_test_params = {}
for name, value in all_global_args:
config.global_test_params[name] = value
# Embed the rest of the config in the global test params dict which will be passed to Mobly tests
config.global_test_params["meta_config"] = {k: v for k, v in dataclass_asdict(config).items() if k != "global_test_params"}
return config
def parse_matter_test_args(argv: List[str]) -> MatterTestConfig:
parser = argparse.ArgumentParser(description='Matter standalone Python test')
basic_group = parser.add_argument_group(title="Basic arguments", description="Overall test execution arguments")
metavar='test_a test_b...',
help='A list of tests in the test class to execute.')
basic_group.add_argument('--storage-path', action="store", type=pathlib.Path,
metavar="PATH", help="Location for persisted storage of instance")
basic_group.add_argument('--logs-path', action="store", type=pathlib.Path, metavar="PATH", help="Location for test logs")
paa_path_default = get_default_paa_trust_store(pathlib.Path.cwd())
basic_group.add_argument('--paa-trust-store-path', action="store", type=pathlib.Path, metavar="PATH", default=paa_path_default,
help="PAA trust store path (default: %s)" % str(paa_path_default))
basic_group.add_argument('--ble-interface-id', action="store", type=int,
metavar="INTERFACE_ID", help="ID of BLE adapter (from hciconfig)")
basic_group.add_argument('-N', '--controller-node-id', type=int_decimal_or_hex,
help='NodeID to use for initial/default controller (default: %d)' % _DEFAULT_CONTROLLER_NODE_ID)
basic_group.add_argument('-n', '--dut-node-id', type=int_decimal_or_hex,
metavar='NODE_ID', default=_DEFAULT_DUT_NODE_ID,
help='Node ID for primary DUT communication, and NodeID to assign if commissioning (default: %d)' % _DEFAULT_DUT_NODE_ID)
commission_group = parser.add_argument_group(title="Commissioning", description="Arguments to commission a node")
commission_group.add_argument('-m', '--commissioning-method', type=str,
choices=["on-network", "ble-wifi", "ble-thread", "on-network-ip"],
help='Name of commissioning method to use')
commission_group.add_argument('-d', '--discriminator', type=int_decimal_or_hex,
help='Discriminator to use for commissioning')
commission_group.add_argument('-p', '--passcode', type=int_decimal_or_hex,
help='PAKE passcode to use')
commission_group.add_argument('-i', '--ip-addr', type=str,
help='IP address to use (only for method "on-network-ip". ONLY FOR LOCAL TESTING!')
commission_group.add_argument('--wifi-ssid', type=str,
help='Wi-Fi SSID for ble-wifi commissioning')
commission_group.add_argument('--wifi-passphrase', type=str,
help='Wi-Fi passphrase for ble-wifi commissioning')
commission_group.add_argument('--thread-dataset-hex', type=byte_string_from_hex,
help='Thread operational dataset as a hex string for ble-thread commissioning')
commission_group.add_argument('--admin-vendor-id', action="store", type=int_decimal_or_hex, default=_DEFAULT_ADMIN_VENDOR_ID,
metavar="VENDOR_ID", help="VendorID to use during commissioning (default 0x%04X)" % _DEFAULT_ADMIN_VENDOR_ID)
commission_group.add_argument('--case-admin-subject', action="store", type=int_decimal_or_hex,
metavar="CASE_ADMIN_SUBJECT", help="Set the CASE admin subject to an explicit value (default to commissioner Node ID)")
commission_group.add_argument('--commission-only', action="store_true", default=False,
help="If true, test exits after commissioning without running subsequent tests")
code_group = parser.add_mutually_exclusive_group(required=False)
code_group.add_argument('-q', '--qr-code', type=str,
metavar="QR_CODE", help="QR setup code content (overrides passcode and discriminator)")
code_group.add_argument('--manual-code', type=int_from_manual_code,
metavar="MANUAL_CODE", help="Manual setup code content (overrides passcode and discriminator)")
fabric_group = parser.add_argument_group(
title="Fabric selection", description="Fabric selection for single-fabric basic usage, and commissioning")
fabric_group.add_argument('-f', '--fabric-id', type=int_decimal_or_hex,
help='Fabric ID on which to operate under the root of trust')
fabric_group.add_argument('-r', '--root-index', type=root_index,
help='Root of trust under which to operate/commission for single-fabric basic usage. alpha/beta/gamma are aliases for 1/2/3. Default (%d)' % _DEFAULT_TRUST_ROOT_INDEX)
fabric_group.add_argument('-c', '--chip-tool-credentials-path', type=pathlib.Path,
help='Path to chip-tool credentials file root')
args_group = parser.add_argument_group(title="Config arguments", description="Test configuration global arguments set")
args_group.add_argument('--int-arg', nargs='*', type=int_named_arg, metavar="NAME:VALUE",
help="Add a named test argument for an integer as hex or decimal (e.g. -2 or 0xFFFF_1234)")
args_group.add_argument('--bool-arg', nargs='*', type=bool_named_arg, metavar="NAME:VALUE",
help="Add a named test argument for an boolean value (e.g. true/false or 0/1)")
args_group.add_argument('--float-arg', nargs='*', type=float_named_arg, metavar="NAME:VALUE",
help="Add a named test argument for a floating point value (e.g. -2.1 or 6.022e23)")
args_group.add_argument('--string-arg', nargs='*', type=str_named_arg, metavar="NAME:VALUE",
help="Add a named test argument for a string value")
args_group.add_argument('--json-arg', nargs='*', type=json_named_arg, metavar="NAME:VALUE",
help="Add a named test argument for JSON stored as a list or dict")
args_group.add_argument('--hex-arg', nargs='*', type=bytes_as_hex_named_arg, metavar="NAME:VALUE",
help="Add a named test argument for an octet string in hex (e.g. 0011cafe or 00:11:CA:FE)")
if not argv:
argv = sys.argv[1:]
return convert_args_to_matter_config(parser.parse_known_args(argv)[0])
def async_test_body(body):
"""Decorator required to be applied whenever a `test_*` method is `async def`.
Since Mobly doesn't support asyncio directly, and the test methods are called
synchronously, we need a mechanism to allow an `async def` to be converted to
a asyncio-run synchronous method. This decorator does the wrapping.
def async_runner(*args, **kwargs):
return*args, **kwargs))
return async_runner
class CommissionDeviceTest(MatterBaseTest):
"""Test class auto-injected at the start of test list to commission a device when requested"""
def test_run_commissioning(self):
conf = self.matter_test_config"Starting commissioning for root index %d, fabric ID 0x%016X, node ID 0x%016X" %
(conf.root_of_trust_index, conf.fabric_id, conf.dut_node_id))"Commissioning method: %s" % conf.commissioning_method)
if not self._commission_device():
raise signals.TestAbortAll("Failed to commission node")
def _commission_device(self) -> bool:
dev_ctrl = self.default_controller
conf = self.matter_test_config
# TODO: support by manual code and QR
if conf.commissioning_method == "on-network":
return dev_ctrl.CommissionOnNetwork(nodeId=conf.dut_node_id, setupPinCode=conf.setup_passcode, filterType=DiscoveryFilterType.LONG_DISCRIMINATOR, filter=conf.discriminator)
elif conf.commissioning_method == "ble-wifi":
return dev_ctrl.CommissionWiFi(conf.discriminator, conf.setup_passcode, conf.dut_node_id, conf.wifi_ssid, conf.wifi_passphrase)
elif conf.commissioning_method == "ble-thread":
return dev_ctrl.CommissionThread(conf.discriminator, conf.setup_passcode, conf.dut_node_id, conf.thread_operational_dataset)
elif conf.commissioning_method == "on-network-ip":
return dev_ctrl.CommissionIP(ipaddr=conf.commissionee_ip_address_just_for_testing, setupPinCode=conf.setup_passcode, nodeid=conf.dut_node_id)
raise ValueError("Invalid commissioning method %s!" % conf.commissioning_method)
def default_matter_test_main(argv=None, **kwargs):
"""Execute the test class in a test module.
This is the default entry point for running a test script file directly.
In this case, only one test class in a test script is allowed.
To make your test script executable, add the following to your file:
.. code-block:: python
from import default_matter_test_main
if __name__ == '__main__':
argv: A list that is then parsed as command line args. If None, defaults to sys.argv
matter_test_config = parse_matter_test_args(argv)
# Allow override of command line from optional arguments
if not matter_test_config.controller_cat_tags and "controller_cat_tags" in kwargs:
matter_test_config.controller_cat_tags = kwargs["controller_cat_tags"]
# Find the test class in the test script.
test_class = _find_test_class()
# Load test config file.
test_config = generate_mobly_test_config(matter_test_config)
# Parse test specifiers if exist.
tests = None
if len(matter_test_config.tests) > 0:
tests = matter_test_config.tests
# This is required in case we need any testing with maximized certificate chains.
# We need *all* issuers from the start, even for default controller, to use
# maximized chains, before MatterStackState init, others some stale certs
# may not chain properly.
if "maximize_cert_chains" in kwargs:
matter_test_config.maximize_cert_chains = kwargs["maximize_cert_chains"]
stack = MatterStackState(matter_test_config)
test_config.user_params["matter_stack"] = stash_globally(stack)
# TODO: Steer to right FabricAdmin!
# TODO: If CASE Admin Subject is a CAT tag range, then make sure to issue NOC with that CAT tag
default_controller = stack.certificate_authorities[0].adminList[0].NewController(nodeId=matter_test_config.controller_node_id,
paaTrustStorePath=str(matter_test_config.paa_trust_store_path), catTags=matter_test_config.controller_cat_tags)
test_config.user_params["default_controller"] = stash_globally(default_controller)
test_config.user_params["matter_test_config"] = stash_globally(matter_test_config)
test_config.user_params["certificate_authority_manager"] = stash_globally(stack.certificate_authority_manager)
# Execute the test class with the config
ok = True
runner = TestRunner(log_dir=test_config.log_path,
with runner.mobly_logger():
if matter_test_config.commissioning_method is not None:
runner.add_test_class(test_config, CommissionDeviceTest, None)
# Add the tests selected unless we have a commission-only request
if not matter_test_config.commission_only:
runner.add_test_class(test_config, test_class, tests)
ok = runner.results.is_all_pass and ok
except signals.TestAbortAll:
ok = False
except Exception:
logging.exception('Exception when executing %s.', test_config.testbed_name)
ok = False
# Shutdown the stack when all done
if ok:"Final result: PASS !")
logging.error("Final result: FAIL !")