blob: d93565ecdf47905963301a2e4c68718459960e9d [file] [log] [blame]
#
# Copyright (c) 2021 Project CHIP Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import shlex
import shutil
import subprocess
import tempfile
import threading
import time
import typing
from collections.abc import Iterable
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum, StrEnum, auto
from pathlib import Path
from types import MappingProxyType
from .accessories import AppsRegister
from .runner import LogPipe, Runner, SubprocessInfo, SubprocessKind
log = logging.getLogger(__name__)
TEST_NODE_ID = '0x12344321'
TEST_DISCRIMINATOR = '3840'
TEST_PASSCODE = '20202021'
TEST_SETUP_QR_CODE = 'MT:-24J042C00KA0648G00'
TEST_THREAD_DATASET = '0e08000000000001000000030000104a0300001635060004001fffe0020884fa18779329ac770708fd269658e44aa21a030f4f70656e5468726561642d32386335010228c50c0402a0f7f8051000112233445566778899aabbccddeeff041000112233445566778899aabbccddeeff'
class App:
def __init__(self, runner: Runner, subproc: SubprocessInfo):
self.process: subprocess.Popen[bytes] | None = None
self.outpipe: LogPipe | None = None
self.runner = runner
self.subproc = subproc
self.cv_stopped = threading.Condition()
self.stopped = True
self.lastLogIndex = 0
self.kvsPathSet = {'/tmp/chip_kvs'}
self.options: dict[str, str] | None = None
self.killed = False
self.setupCode: str | None = None
def __repr__(self) -> str:
return repr(self.subproc)
@property
def returncode(self):
"""Exposes return code of the underlying process, so that
common code can be used between subprocess.Popen and Apps.
"""
if not self.process:
return None
return self.process.returncode
def start(self, options: dict[str, str] | None = None) -> bool:
if not self.process:
# Cache command line options to be used for reboots
if options:
self.options = options
# Make sure to assign self.process before we do any operations that
# might fail, so attempts to kill us on failure actually work.
self.process, self.outpipe, _ = self.__startServer()
self.waitForApplicationUp()
self.__updateSetUpCode()
with self.cv_stopped:
self.stopped = False
self.cv_stopped.notify()
return True
return False
def stop(self) -> bool:
if self.process:
with self.cv_stopped:
self.stopped = True
self.cv_stopped.notify()
ok = self.__terminateProcess()
if not ok:
# For now just raise an exception; no other way to get tests to fail in this situation.
raise RuntimeError('Stopped subprocess terminated abnormally')
return True
return False
def factoryReset(self) -> bool:
wasRunning = (not self.killed) and self.stop()
for kvs in self.kvsPathSet:
if os.path.exists(kvs):
os.unlink(kvs)
if wasRunning:
return self.start()
return True
def waitForApplicationUp(self):
# Watch for both mDNS advertisement start as well as event loop start.
# These two messages can appear in any order depending on the implementation.
# Waiting for both makes the startup detection more robust.
#
# For a Thread-only node, the mDNS advertisement is not started immediately
# after start, so it is skipped for Thread node.
assert self.process is not None and self.outpipe is not None, "waitForApplicationUp can be called only after start()"
what = ["APP STATUS: Starting event loop"]
if not any(arg.startswith("--thread-node-id=") for arg in self.subproc.args):
what += ["mDNS service published:"]
self.__waitFor(what)
def waitForMessage(self, message: str, timeoutInSeconds: float = 10):
self.__waitFor([message], timeoutInSeconds=timeoutInSeconds)
return True
def kill(self) -> bool:
ok = self.__terminateProcess()
self.killed = True
return ok
def wait(self, timeout: float | None = None) -> int:
while True:
# If the App was never started, AND was killed, exit immediately
if self.killed:
return 0
# If the App was never started, wait cannot be called on the process
if self.process is None:
time.sleep(0.1)
continue
code = self.process.wait(timeout)
with self.cv_stopped:
if not self.stopped:
return code
# When the server is manually stopped, process waiting is
# overridden so the other processes that depends on the
# accessory being alive does not stop.
while self.stopped:
self.cv_stopped.wait()
def __startServer(self) -> tuple[subprocess.Popen[bytes], LogPipe, LogPipe]:
subproc = self.subproc.with_args('--interface-id', '-1')
if not self.options:
log.debug("Executing application under test with default args")
else:
log.debug("Executing application under test with the following args:")
for key, value in self.options.items():
log.debug(" %s: %s", key, value)
subproc = subproc.with_args(key, value)
if key == '--KVS':
self.kvsPathSet.add(value)
return self.runner.RunSubprocess(subproc, name='APP ', wait=False)
def __waitFor(self, patterns: Iterable[str], timeoutInSeconds: float = 10):
"""
Wait for all provided pattern strings to appear in the process output pipe (capture log).
"""
assert self.process is not None and self.outpipe is not None, "__waitFor can be called only after start()"
log.debug('Waiting for all patterns %r', patterns)
start_time = time.monotonic()
def allPatternsFound() -> int | None:
assert self.outpipe is not None
lastLogIndex = self.lastLogIndex
for p in patterns:
found, index = self.outpipe.CapturedLogContains(p, self.lastLogIndex)
if not found:
return None
lastLogIndex = max(lastLogIndex, index)
return lastLogIndex
lastLogIndex = allPatternsFound()
while lastLogIndex is None:
if self.process.poll() is not None:
died_str = f'Server died while waiting for {patterns!r}, returncode {self.process.returncode}'
log.error(died_str)
raise RuntimeError(died_str)
if time.monotonic() - start_time > timeoutInSeconds:
raise TimeoutError(f'Timeout while waiting for {patterns!r}')
time.sleep(0.1)
lastLogIndex = allPatternsFound()
self.lastLogIndex = lastLogIndex + 1
log.debug('Success waiting for: %r', patterns)
def __updateSetUpCode(self):
assert self.outpipe is not None, "__updateSetUpCode needs to happen after __startServer"
qrLine = self.outpipe.FindLastMatchingLine('.*SetupQRCode: *\\[(.*)]')
if not qrLine:
raise RuntimeError("Unable to find QR code")
self.setupCode = qrLine.group(1)
def __terminateProcess(self):
"""
Returns False if the process existed and had a nonzero exit code.
"""
if self.process:
self.process.terminate() # sends SIGTERM
try:
exit_code = self.process.wait(10)
if exit_code:
log.error("Subprocess failed with exit code: %d", exit_code)
return False
except subprocess.TimeoutExpired:
log.debug("Subprocess did not terminate on SIGTERM, killing it now")
self.process.kill()
# The exit code when using Python subprocess will be the signal used to kill it.
# Ideally, we would recover the original exit code, but the process was already
# ignoring SIGTERM, indicating something was already wrong.
self.process.wait(10)
self.process = None
self.outpipe = None
self.lastLogIndex = 0
return True
@dataclass
class TestTarget:
name: str
# command to execute. MUST be a placeholder like tv or lock
command: str
# arguments to pass in to the command to execute
arguments: list[str] = field(default_factory=list)
@dataclass
class KnownSubprocessEntry:
kind: SubprocessKind
target_name: str | None = None
BUILTIN_SUBPROC_DATA = MappingProxyType({
# Matter applications
'all-clusters': KnownSubprocessEntry(kind=SubprocessKind.APP, target_name='chip-all-clusters-app'),
'all-devices': KnownSubprocessEntry(kind=SubprocessKind.APP, target_name='all-devices-app'),
'air-purifier': KnownSubprocessEntry(kind=SubprocessKind.APP, target_name='chip-air-purifier-app'),
'bridge': KnownSubprocessEntry(kind=SubprocessKind.APP, target_name='chip-bridge-app'),
'camera': KnownSubprocessEntry(kind=SubprocessKind.APP, target_name='chip-camera-app'),
'camera-controller': KnownSubprocessEntry(kind=SubprocessKind.APP, target_name='chip-camera-controller'),
'closure': KnownSubprocessEntry(kind=SubprocessKind.APP, target_name='closure-app'),
'energy-gateway': KnownSubprocessEntry(kind=SubprocessKind.APP, target_name='chip-energy-gateway-app'),
'evse': KnownSubprocessEntry(kind=SubprocessKind.APP, target_name='chip-evse-app'),
'fabric-bridge': KnownSubprocessEntry(kind=SubprocessKind.APP, target_name='fabric-bridge-app'),
'fabric-admin': KnownSubprocessEntry(kind=SubprocessKind.APP, target_name='fabric-admin'),
'fabric-sync': KnownSubprocessEntry(kind=SubprocessKind.APP, target_name='fabric-sync'),
'jf-control': KnownSubprocessEntry(kind=SubprocessKind.APP, target_name='jfc-app'),
'jf-admin': KnownSubprocessEntry(kind=SubprocessKind.APP, target_name='jfa-app'),
'light': KnownSubprocessEntry(kind=SubprocessKind.APP, target_name='chip-lighting-app'),
'lit-icd': KnownSubprocessEntry(kind=SubprocessKind.APP, target_name='lit-icd-app'),
'lock': KnownSubprocessEntry(kind=SubprocessKind.APP, target_name='chip-lock-app'),
'microwave-oven': KnownSubprocessEntry(kind=SubprocessKind.APP, target_name='chip-microwave-oven-app'),
'network-manager': KnownSubprocessEntry(kind=SubprocessKind.APP, target_name='matter-network-manager-app'),
'ota-provider': KnownSubprocessEntry(kind=SubprocessKind.APP, target_name='chip-ota-provider-app'),
'ota-requestor': KnownSubprocessEntry(kind=SubprocessKind.APP, target_name='chip-ota-requestor-app'),
'rvc': KnownSubprocessEntry(kind=SubprocessKind.APP, target_name='chip-rvc-app'),
'terms-and-conditions': KnownSubprocessEntry(kind=SubprocessKind.APP, target_name='chip-terms-and-conditions-app'),
'tv': KnownSubprocessEntry(kind=SubprocessKind.APP, target_name='chip-tv-app'),
'water-heater': KnownSubprocessEntry(kind=SubprocessKind.APP, target_name='matter-water-heater-app'),
'water-leak-detector': KnownSubprocessEntry(kind=SubprocessKind.APP, target_name='water-leak-detector-app'),
# Tools
'chip-tool': KnownSubprocessEntry(kind=SubprocessKind.TOOL, target_name='chip-tool'),
'darwin-framework-tool': KnownSubprocessEntry(kind=SubprocessKind.TOOL, target_name='darwin-framework-tool'),
'matter-repl-yaml-tester': KnownSubprocessEntry(kind=SubprocessKind.TOOL, target_name='yamltest_with_matter_repl_tester.py'),
# No target_name as this is either chiptool.py or darwinframework.py depending on the selected TestRunTime
'chip-tool-with-python': KnownSubprocessEntry(kind=SubprocessKind.TOOL)
})
class PathsFinderProto(typing.Protocol):
def get(self, target_name: str) -> Path | None:
pass
class SubprocessInfoRepo(dict):
# We don't want to explicitly reference PathsFinder type because we
# don't want to create a dependency on the diskcache module which PathsFinder imports.
# Instead we just want a dict-like object
def __init__(self, paths: PathsFinderProto,
subproc_knowhow: MappingProxyType[str, KnownSubprocessEntry] = BUILTIN_SUBPROC_DATA,
*args, **kwargs):
super().__init__(*args, **kwargs)
self.paths = paths
self.subproc_knowhow = subproc_knowhow
def addSpec(self, spec: str, kind: SubprocessKind | None = None):
"""Add a path to the repo as specified on the command line"""
el = spec.split(':')
if len(el) == 2:
# <key>:<path>
key, path_s = el
path = Path(path_s)
if kind is None:
if key not in self.subproc_knowhow:
raise ValueError(f"Kind not provided for key '{key}' and not specified in know-how")
kind = self.subproc_knowhow[key].kind
else:
raise ValueError(f"Cannot parse path spec '{spec}'")
if key not in self.subproc_knowhow:
log.warning("Key '%s' is not present in the subprocess repo know-how, possible typo", key)
s = SubprocessInfo(kind=kind, path=path)
if path.suffix == '.py':
s = s.wrap_with('python3')
self[key] = s
def missing_keys(self):
"""
Return a list of keys for tools or apps missing (not specified) based on the
know-how dictionary.
"""
return [k for k in self.subproc_knowhow if k not in self]
def discover(self):
"""
Try to discover paths to all apps and tools in the know-how which we are still missing.
Reuse the `require` method but ignore failures, we expect the test-cases to fail if they
depend on missing paths.
"""
log.info("Discovering missing paths")
start_ts = time.time()
discovered_count = 0
for key in self.missing_keys():
try:
self.require(key)
discovered_count += 1
except (LookupError, ValueError) as e:
# Stack trace is not needed here
log.warning("Exception while trying to discover '%s': %r", key, e)
log.info("Discovery of %d paths took %.2f seconds", discovered_count, time.time() - start_ts)
def require(self, key: str, kind: SubprocessKind | None = None, target_name: str | None = None):
"""
Indicate that a subprocess path is required. Throw exception if it's not already in the repo
and can't be discovered using the paths finder.
"""
if key in self:
return self[key]
if kind is None:
if key not in self.subproc_knowhow:
raise ValueError(f"Key '{key}': kind neither provided nor specified in know-how")
kind = self.subproc_knowhow[key].kind
if target_name is None:
if key not in self.subproc_knowhow:
raise ValueError(f"Key '{key}': target name neither provided nor specified in know-how")
target_name = self.subproc_knowhow[key].target_name
if target_name is None:
raise ValueError(f"Key '{key}': Key exists in know-how but no target name specified")
if (path := self.paths.get(target_name)) is None:
raise LookupError(f"Cannot find path for required {kind} key '{key}'")
log.info("Discovered required key '%s' path '%s'", key, path)
s = SubprocessInfo(kind=kind, path=path)
if path.suffix == '.py':
s = s.wrap_with('python3')
self[key] = s
return self[key]
@dataclass
class CaptureLine:
when: datetime
source: str
line: str
class ExecutionCapture:
"""
Keeps track of output lines in a process, to help debug failures.
"""
def __init__(self) -> None:
self.lock = threading.Lock()
self.captures: list[CaptureLine] = []
def Log(self, source: str, line: str):
with self.lock:
self.captures.append(CaptureLine(
when=datetime.now(),
source=source,
line=line.strip('\n')
))
def LogContents(self):
log.error("================ CAPTURED LOG START ==================")
with self.lock:
for entry in self.captures:
log.error("%02d:%02d:%02d.%03d - %-10s: %s",
entry.when.hour,
entry.when.minute,
entry.when.second,
entry.when.microsecond/1000,
entry.source,
entry.line
)
log.error("================ CAPTURED LOG END ====================")
class TestTag(StrEnum):
MANUAL = auto() # requires manual input. Generally not run automatically
SLOW = auto() # test uses Sleep and is generally slow (>=10s is a typical threshold)
FLAKY = auto() # test is considered flaky (usually a bug/time dependent issue)
IN_DEVELOPMENT = auto() # test may not pass or undergoes changes
CHIP_TOOL_PYTHON_ONLY = auto() # test uses YAML features only supported by the CHIP_TOOL_PYTHON runner.
EXTRA_SLOW = auto() # test uses Sleep and is generally _very_ slow (>= 60s is a typical threshold)
PURPOSEFUL_FAILURE = auto() # test fails on purpose
def to_s(self):
for (k, v) in TestTag.__members__.items():
if self == v:
return k
raise KeyError(f"Unknown tag: {self!r}")
class TestRunTime(Enum):
CHIP_TOOL_PYTHON = auto() # use the python yaml test parser with chip-tool
DARWIN_FRAMEWORK_TOOL_PYTHON = auto() # use the python yaml test parser with chip-tool
MATTER_REPL_PYTHON = auto() # use the python yaml test runner
@dataclass
class TestDefinition:
name: str
run_name: str
targets: list[TestTarget]
tags: set[TestTag] = field(default_factory=set)
@property
def is_manual(self) -> bool:
return TestTag.MANUAL in self.tags
@property
def is_slow(self) -> bool:
return TestTag.SLOW in self.tags
@property
def is_flaky(self) -> bool:
return TestTag.FLAKY in self.tags
def tags_str(self) -> str:
"""Get a human readable list of tags applied to this test"""
return ", ".join([t.to_s() for t in self.tags])
def Run(self, runner: Runner, apps_register: AppsRegister, subproc_info_repo: SubprocessInfoRepo,
pics_file: Path, timeout_seconds: int | None, dry_run: bool = False,
test_runtime: TestRunTime = TestRunTime.CHIP_TOOL_PYTHON,
ble_controller_app: int | None = None,
ble_controller_tool: int | None = None,
op_network: str = 'WiFi',
thread_ba_host: str | None = None,
thread_ba_port: int | None = None,
):
"""
Executes the given test case using the provided runner for execution.
Will iterate and execute every target.
"""
for target in self.targets:
log.info('Executing %s::%s', self.name, target.name)
self._RunImpl(target, runner, apps_register, subproc_info_repo, pics_file, timeout_seconds, dry_run,
test_runtime, ble_controller_app, ble_controller_tool, op_network, thread_ba_host, thread_ba_port)
def _RunImpl(self, target: TestTarget, runner: Runner, apps_register: AppsRegister, subproc_info_repo: SubprocessInfoRepo,
pics_file: Path, timeout_seconds: int | None, dry_run: bool = False,
test_runtime: TestRunTime = TestRunTime.CHIP_TOOL_PYTHON,
ble_controller_app: int | None = None,
ble_controller_tool: int | None = None,
op_network: str = 'WiFi',
thread_ba_host: str | None = None,
thread_ba_port: int | None = None):
runner.capture_delegate = ExecutionCapture()
tool_storage_dir = None
loggedCapturedLogs = False
try:
if target.command not in subproc_info_repo:
log.warning("Path to default target '%s' for test '%s' is not known, test will likely fail",
target.command, self.name)
if not dry_run:
for key, subproc in subproc_info_repo.items():
# Do not add tools to the register
if subproc.kind == SubprocessKind.TOOL:
continue
# For the app indicated by target, give it the 'default' key to add to the register
if key == target.command:
key = 'default'
for arg in target.arguments:
subproc = subproc.with_args(arg)
if op_network == 'Thread':
# The node id must not conflict with ThreadBorderRouter.NODE_ID
subproc = subproc.with_args("--thread-node-id=2")
if ble_controller_app is not None:
subproc = subproc.with_args("--ble-controller", str(ble_controller_app))
if op_network == 'WiFi':
subproc = subproc.with_args("--wifi")
app = App(runner, subproc)
# Add the App to the register immediately, so if it fails during
# start() we will be able to clean things up properly.
apps_register.add(key, app)
# Remove server application storage (factory reset),
# so it will be commissionable again.
app.factoryReset()
# It may sometimes be useful to run the same app multiple times depending
# on the implementation. So this code creates a duplicate entry but with a different
# key.
app = App(runner, subproc)
apps_register.add(f'{key}#2', app)
app.factoryReset()
if dry_run:
tool_storage_dir = None
tool_storage_args = []
else:
tool_storage_dir = tempfile.mkdtemp()
tool_storage_args = ['--storage-directory', tool_storage_dir]
# Only start and pair the default app
if dry_run:
setupCode = '${SETUP_PAYLOAD}'
else:
app = apps_register.get('default')
app.start()
assert app.setupCode is not None, "Setup code should have been set in app.start()"
setupCode = app.setupCode
if test_runtime == TestRunTime.MATTER_REPL_PYTHON:
assert 'matter-repl-yaml-tester' in subproc_info_repo, \
"Matter REPL YAML tester should have been set for selected test runtime"
python_cmd = subproc_info_repo['matter-repl-yaml-tester'].with_args(
'--setup-code', setupCode, '--yaml-path', self.run_name, "--pics-file", str(pics_file))
if dry_run:
log.info(shlex.join(python_cmd.to_cmd()))
else:
runner.RunSubprocess(python_cmd, name='MATTER_REPL_YAML_TESTER',
dependencies=[apps_register], timeout_seconds=timeout_seconds)
else: # CHIP_TOOL_PYTHON
assert 'chip-tool' in subproc_info_repo, \
"Chip tool should have been set for selected test runtime"
assert 'chip-tool-with-python' in subproc_info_repo, \
"Chip tool with Python should have been set for selected test runtime"
pairing_server_args = []
pairing_cmd = subproc_info_repo['chip-tool-with-python']
if ble_controller_tool is not None:
if op_network == 'WiFi':
pairing_cmd = pairing_cmd.with_args(
"pairing", "code-wifi", TEST_NODE_ID, "MatterAP", "MatterAPPassword", TEST_SETUP_QR_CODE)
pairing_server_args = ["--ble-controller", str(ble_controller_tool)]
elif op_network == 'Thread':
pairing_cmd = pairing_cmd.with_args(
"pairing", "code-thread", TEST_NODE_ID, f"hex:{TEST_THREAD_DATASET}", TEST_SETUP_QR_CODE)
pairing_server_args = ["--ble-controller", str(ble_controller_tool)]
elif op_network == 'Thread' and thread_ba_host is not None and thread_ba_port is not None:
pairing_cmd = pairing_cmd.with_args(
"pairing", "thread-meshcop", TEST_NODE_ID, f"hex:{TEST_THREAD_DATASET}", setupCode,
"--thread-ba-host", thread_ba_host, "--thread-ba-port", str(thread_ba_port))
else:
pairing_cmd = pairing_cmd.with_args('pairing', 'code', TEST_NODE_ID, setupCode)
if target.command == 'lit-icd' and test_runtime == TestRunTime.CHIP_TOOL_PYTHON:
pairing_cmd = pairing_cmd.with_args('--icd-registration', 'true')
test_cmd = subproc_info_repo['chip-tool-with-python'].with_args('tests', self.run_name, '--PICS', str(pics_file))
interactive_server_args = ['interactive server'] + tool_storage_args + pairing_server_args
if test_runtime == TestRunTime.CHIP_TOOL_PYTHON:
interactive_server_args = interactive_server_args + ['--interface-id', '-1']
server_args = (
'--server_path', str(subproc_info_repo['chip-tool'].path),
'--server_arguments', ' '.join(interactive_server_args))
pairing_cmd = pairing_cmd.with_args(*server_args)
test_cmd = test_cmd.with_args(*server_args)
if dry_run:
log.info("Pairing command: %s", shlex.join(pairing_cmd.to_cmd()))
log.info("Testcase command: %s", shlex.join(test_cmd.to_cmd()))
else:
runner.RunSubprocess(pairing_cmd,
name='PAIR', dependencies=[apps_register])
runner.RunSubprocess(
test_cmd,
name='TEST', dependencies=[apps_register],
timeout_seconds=timeout_seconds)
except BaseException:
log.error("!!!!!!!!!!!!!!!!!!!! ERROR !!!!!!!!!!!!!!!!!!!!!!")
runner.capture_delegate.LogContents()
loggedCapturedLogs = True
raise
finally:
ok = apps_register.kill_all()
apps_register.factory_reset_all()
apps_register.remove_all()
if tool_storage_dir is not None:
shutil.rmtree(tool_storage_dir, ignore_errors=True)
# If loggedCapturedLogs then we are already throwing, so no need to
# try to trigger test failure due to our abnormal termination.
if not ok and not loggedCapturedLogs:
log.error("!!!!!!!!!!!!!!!!!!!! ERROR !!!!!!!!!!!!!!!!!!!!!!")
runner.capture_delegate.LogContents()
raise RuntimeError('Subprocess terminated abnormally')