blob: a2860cc97b55e39aff72e9870878fa099de06a33 [file] [log] [blame]
#!/usr/bin/env -S python3 -B
# Copyright (c) 2022 Project CHIP Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import contextlib
import datetime
import glob
import io
import logging
import os
import os.path
import pathlib
import re
import select
import shlex
import sys
import threading
import time
import typing
import uuid
import click
import coloredlogs
from colorama import Fore, Style
from matter.testing.metadata import Metadata, MetadataReader
from matter.testing.tasks import Subprocess
DEFAULT_CHIP_ROOT = os.path.abspath(
os.path.join(os.path.dirname(__file__), '..', '..'))
MATTER_DEVELOPMENT_PAA_ROOT_CERTS = "credentials/development/paa-root-certs"
TAG_PROCESS_APP = f"[{Fore.GREEN}APP {Style.RESET_ALL}]".encode()
TAG_PROCESS_TEST = f"[{Fore.GREEN}TEST{Style.RESET_ALL}]".encode()
TAG_STDOUT = f"[{Fore.YELLOW}STDOUT{Style.RESET_ALL}]".encode()
TAG_STDERR = f"[{Fore.RED}STDERR{Style.RESET_ALL}]".encode()
# RegExp which matches the timestamp in the output of CHIP application
OUTPUT_TIMESTAMP_MATCH = re.compile(r'(?P<prefix>.*)\[(?P<ts>\d+\.\d+)\](?P<suffix>\[\d+:\d+\].*)'.encode())
def chip_output_extract_timestamp(line: bytes) -> (float, bytes):
"""Try to extract timestamp from a CHIP application output line."""
if match := OUTPUT_TIMESTAMP_MATCH.match(line):
return float(match.group(2)), match.group(1) + match.group(3) + b'\n'
return time.time(), line
def process_chip_output(line: bytes, is_stderr: bool, process_tag: bytes = b"") -> bytes:
"""Rewrite the output line to add the timestamp and the process tag."""
timestamp, line = chip_output_extract_timestamp(line)
timestamp = datetime.datetime.fromtimestamp(timestamp).isoformat(sep=' ')
return f"[{timestamp}]".encode() + process_tag + (TAG_STDERR if is_stderr else TAG_STDOUT) + line
def process_chip_app_output(line, is_stderr):
return process_chip_output(line, is_stderr, TAG_PROCESS_APP)
def process_test_script_output(line, is_stderr):
return process_chip_output(line, is_stderr, TAG_PROCESS_TEST)
def forward_fifo(path: str, f_out: typing.BinaryIO, stop_event: threading.Event):
"""Forward the content of a named pipe to a file-like object."""
if not os.path.exists(path):
with contextlib.suppress(OSError):
os.mkfifo(path)
with open(os.open(path, os.O_RDONLY | os.O_NONBLOCK), 'rb') as f_in:
while not stop_event.is_set():
if select.select([f_in], [], [], 0.5)[0]:
line = f_in.readline()
if not line:
break
f_out.write(line)
f_out.flush()
with contextlib.suppress(OSError):
os.unlink(path)
class AppProcessManager:
def __init__(self, app: str, app_args: str, app_ready_pattern: typing.Optional[str], stream_output: typing.BinaryIO, app_stdin_pipe: typing.Optional[str] = None):
self.app = app
self.app_args = app_args
self.app_ready_pattern = app_ready_pattern
self.stream_output = stream_output
self.app_stdin_pipe = app_stdin_pipe
self.app_process = None
self.stdin_thread = None
self.stdin_stop_event = threading.Event()
def start(self):
logging.info(f"Starting app with args: {self.app_args}")
if self.app_ready_pattern and isinstance(self.app_ready_pattern, str):
ready_pattern = re.compile(self.app_ready_pattern.encode())
else:
ready_pattern = self.app_ready_pattern
self.app_process = Subprocess(self.app, *shlex.split(self.app_args),
output_cb=process_chip_app_output,
f_stdout=self.stream_output,
f_stderr=self.stream_output)
self.app_process.start(expected_output=ready_pattern, timeout=30)
if self.app_stdin_pipe:
logging.info("Forwarding stdin from '%s' to app", self.app_stdin_pipe)
self.stdin_stop_event.clear()
self.stdin_thread = threading.Thread(
target=forward_fifo, args=(self.app_stdin_pipe, self.app_process.p.stdin, self.stdin_stop_event))
self.stdin_thread.start()
else:
self.app_process.p.stdin.close()
def stop(self):
if self.stdin_thread:
self.stdin_stop_event.set()
self.stdin_thread.join()
self.stdin_thread = None
if self.app_process:
self.app_process.terminate()
self.app_process = None
def restart(self):
self.stop()
self.start()
def get_process(self):
return self.app_process
@click.command()
@click.option("--app", type=click.Path(exists=True), default=None,
help='Path to local application to use, omit to use external apps.')
@click.option("--factory-reset/--no-factory-reset", default=None,
help='Remove app config and repl configs (/tmp/chip* and /tmp/repl*) before running the tests.')
@click.option("--factory-reset-app-only/--no-factory-reset-app-only", default=None,
help='Remove app config and repl configs (/tmp/chip* and /tmp/repl*) before running the tests, but not the controller config')
@click.option("--app-args", type=str, default='',
help='The extra arguments passed to the device. Can use placeholders like {SCRIPT_BASE_NAME}')
@click.option("--app-ready-pattern", type=str, default=None,
help='Delay test script start until given regular expression pattern is found in the application output.')
@click.option("--app-stdin-pipe", type=str, default=None,
help='Path for a standard input redirection named pipe to be used by the test script.')
@click.option("--script", type=click.Path(exists=True), default=os.path.join(DEFAULT_CHIP_ROOT,
'src',
'controller',
'python',
'test',
'test_scripts',
'mobile-device-test.py'), help='Test script to use.')
@click.option("--script-args", type=str, default='',
help='Script arguments, can use placeholders like {SCRIPT_BASE_NAME}.')
@click.option("--script-gdb/--no-script-gdb", default=None,
help='Run script through gdb')
@click.option("--quiet/--no-quiet", default=None,
help="Do not print output from passing tests. Use this flag in CI to keep GitHub log size manageable.")
@click.option("--load-from-env", default=None, help="YAML file that contains values for environment variables.")
@click.option("--run", type=str, multiple=True, help="Run only the specified test run(s).")
@click.option("--app-filter", type=str, default=None, help="Run only for the specified app(s). Comma separated.")
def main(app: str, factory_reset: bool, factory_reset_app_only: bool, app_args: str,
app_ready_pattern: str, app_stdin_pipe: str, script: str, script_args: str,
script_gdb: bool, quiet: bool, load_from_env, run, app_filter):
if load_from_env:
reader = MetadataReader(load_from_env)
runs = reader.parse_script(script)
else:
runs = [
Metadata(
py_script_path=script,
run="cmd-run",
app=app,
app_args=app_args,
app_ready_pattern=app_ready_pattern,
app_stdin_pipe=app_stdin_pipe,
script_args=script_args,
script_gdb=script_gdb,
)
]
if not runs:
raise click.ClickException(
"No valid runs were found. Make sure you add runs to your file, see "
"https://github.com/project-chip/connectedhomeip/blob/master/docs/testing/python.md document for reference/example.")
if run:
# Filter runs based on the command line arguments
runs = [r for r in runs if r.run in run]
if app_filter:
allowed_apps = [s.strip() for s in app_filter.split(',')]
# app name in metadata is like "${APP_NAME}"
allowed_apps_with_format = [f"${{{app}}}" for app in allowed_apps]
runs = [r for r in runs if r.app in allowed_apps_with_format]
# Override runs Metadata with the command line options
for run in runs:
if factory_reset is not None:
run.factory_reset = factory_reset
if factory_reset_app_only is not None:
run.factory_reset_app_only = factory_reset_app_only
if script_gdb is not None:
run.script_gdb = script_gdb
if quiet is not None:
run.quiet = quiet
for run in runs:
logging.info("Executing %s %s", run.py_script_path.split('/')[-1], run.run)
main_impl(run.app, run.factory_reset, run.factory_reset_app_only, run.app_args or "", run.app_ready_pattern,
run.app_stdin_pipe, run.py_script_path, run.script_args or "", run.script_gdb, run.quiet)
def main_impl(app: str, factory_reset: bool, factory_reset_app_only: bool, app_args: str,
app_ready_pattern: str, app_stdin_pipe: str, script: str, script_args: str,
script_gdb: bool, quiet: bool):
app_args = app_args.replace('{SCRIPT_BASE_NAME}', os.path.splitext(os.path.basename(script))[0])
script_args = script_args.replace('{SCRIPT_BASE_NAME}', os.path.splitext(os.path.basename(script))[0])
# Generate unique test run ID to avoid conflicts in concurrent test runs
test_run_id = str(uuid.uuid4())[:8] # Use first 8 characters for shorter paths
restart_flag_file = f"/tmp/chip_test_restart_app_{test_run_id}"
if factory_reset or factory_reset_app_only:
# Remove native app config
for path in glob.glob('/tmp/chip*') + glob.glob('/tmp/repl*'):
pathlib.Path(path).unlink(missing_ok=True)
# Remove native app KVS if that was used
if match := re.search(r"--KVS (?P<path>[^ ]+)", app_args):
logging.info("Removing KVS path: %s" % match.group("path"))
pathlib.Path(match.group("path")).unlink(missing_ok=True)
if factory_reset:
# Remove Python test admin storage if provided
if match := re.search(r"--storage-path (?P<path>[^ ]+)", script_args):
logging.info("Removing storage path: %s" % match.group("path"))
pathlib.Path(match.group("path")).unlink(missing_ok=True)
app_manager_ref = None
app_manager_lock = threading.Lock()
restart_monitor_thread = None
app_exit_code = 0
stream_output = sys.stdout.buffer
if quiet:
stream_output = io.BytesIO()
if app:
if not os.path.exists(app):
if app is None:
raise FileNotFoundError(f"{app} not found")
app_manager = AppProcessManager(app, app_args, app_ready_pattern, stream_output, app_stdin_pipe)
app_manager.start()
app_manager_ref = [app_manager]
restart_monitor_thread = threading.Thread(
target=monitor_app_restart_requests,
args=(
app_manager_ref,
app_manager_lock,
app,
app_args,
app_ready_pattern,
stream_output,
app_stdin_pipe,
restart_flag_file),
daemon=True)
restart_monitor_thread.start()
# TODO: Remove this below workaround once we understand if mobile-device-test needs to be run through Cirque and through this script for CI test pipeline, task PR: https://github.com/project-chip/matter-test-scripts/issues/681
if "mobile-device-test.py" not in script:
script_args += f" --restart-flag-file {restart_flag_file}"
script_command = [
script,
"--fail-on-skipped",
"--paa-trust-store-path", os.path.join(DEFAULT_CHIP_ROOT, MATTER_DEVELOPMENT_PAA_ROOT_CERTS)
] + shlex.split(script_args)
if script_gdb:
#
# When running through Popen, we need to preserve some space-delimited args to GDB as a single logical argument.
# To do that, let's use '|' as a placeholder for the space character so that the initial split will not tokenize them,
# and then replace that with the space char there-after.
#
script_command = ("gdb -batch -return-child-result -q -ex run -ex "
"thread|apply|all|bt --args python3".split() + script_command)
else:
script_command = "/usr/bin/env python3 -X faulthandler".split() + script_command
final_script_command = [i.replace('|', ' ') for i in script_command]
test_script_process = Subprocess(final_script_command[0], *final_script_command[1:],
output_cb=process_test_script_output,
f_stdout=stream_output,
f_stderr=stream_output)
test_script_process.start()
test_script_process.p.stdin.close()
try:
test_script_exit_code = test_script_process.wait()
if test_script_exit_code != 0:
logging.error("Test script exited with returncode %d" % test_script_exit_code)
# Stop the restart monitor thread if it exists
if restart_monitor_thread and restart_monitor_thread.is_alive():
logging.info("Stopping app restart monitor thread")
restart_monitor_thread.join(2.0)
# Get the current app manager if it exists
current_app_manager = None
if app_manager_ref:
with app_manager_lock:
current_app_manager = app_manager_ref[0]
if current_app_manager:
logging.info("Stopping app with SIGTERM")
current_app_manager.stop()
if current_app_manager.get_process():
app_exit_code = current_app_manager.get_process().returncode
# We expect both app and test script should exit with 0
exit_code = test_script_exit_code or app_exit_code
if quiet:
if exit_code:
sys.stdout.write(stream_output.getvalue().decode('utf-8'))
else:
logging.info("Test completed successfully")
if exit_code != 0:
logging.error("SUBPROCESS failure: ")
logging.error(" TEST SCRIPT: %d (%r)", test_script_exit_code, final_script_command)
logging.error(" APP: %d (%r)", app_exit_code, [app] + shlex.split(app_args))
sys.exit(exit_code)
finally:
# Stop the restart monitor thread if it exists
if restart_monitor_thread and restart_monitor_thread.is_alive():
logging.info("Stopping app restart monitor thread")
restart_monitor_thread.join(2.0)
# Clean up any leftover flag files if they exist - ensure this always executes
logging.info("Cleaning up flag files")
if os.path.exists(restart_flag_file):
try:
os.unlink(restart_flag_file)
logging.info(f"Cleaned up flag file: {restart_flag_file}")
except Exception as e:
logging.warning(f"Failed to clean up flag file {restart_flag_file}: {e}")
def monitor_app_restart_requests(
app_manager_ref,
app_manager_lock,
app,
app_args,
app_ready_pattern,
stream_output,
app_stdin_pipe,
restart_flag_file):
while True:
try:
if os.path.exists(restart_flag_file):
logging.info("App restart requested by test script")
# Remove the flag file immediately to prevent multiple restarts
os.unlink(restart_flag_file)
new_app_manager = AppProcessManager(app, app_args, app_ready_pattern, stream_output, app_stdin_pipe)
new_app_manager.start()
with app_manager_lock:
app_manager_ref[0].stop()
app_manager_ref[0] = new_app_manager
# After restart is complete, we can exit the monitor thread
logging.info("App restart completed, monitor thread exiting")
break
else:
time.sleep(0.5)
except Exception as e:
logging.error(f"Error in app restart monitor: {e}")
if __name__ == '__main__':
coloredlogs.install(level='INFO')
main(auto_envvar_prefix='CHIP')