blob: 3073a7c4ca6fb951f978ce2a3d43060cacc306d1 [file] [log] [blame]
# vim: set syntax=python ts=4 :
#
# Copyright (c) 20180-2022 Intel Corporation
# Copyright 2022 NXP
# SPDX-License-Identifier: Apache-2.0
import logging
import multiprocessing
import os
import pickle
import queue
import re
import shutil
import subprocess
import sys
import time
import traceback
import yaml
from multiprocessing import Lock, Process, Value
from multiprocessing.managers import BaseManager
from typing import List
from packaging import version
from colorama import Fore
from domains import Domains
from twisterlib.cmakecache import CMakeCache
from twisterlib.environment import canonical_zephyr_base
from twisterlib.error import BuildError, ConfigurationError
import elftools
from elftools.elf.elffile import ELFFile
from elftools.elf.sections import SymbolTableSection
if version.parse(elftools.__version__) < version.parse('0.24'):
sys.exit("pyelftools is out of date, need version 0.24 or later")
# Job server only works on Linux for now.
if sys.platform == 'linux':
from twisterlib.jobserver import GNUMakeJobClient, GNUMakeJobServer, JobClient
from twisterlib.log_helper import log_command
from twisterlib.testinstance import TestInstance
from twisterlib.environment import TwisterEnv
from twisterlib.testsuite import TestSuite
from twisterlib.platform import Platform
from twisterlib.testplan import change_skip_to_error_if_integration
from twisterlib.harness import HarnessImporter, Pytest
try:
from yaml import CSafeLoader as SafeLoader
except ImportError:
from yaml import SafeLoader
logger = logging.getLogger('twister')
logger.setLevel(logging.DEBUG)
import expr_parser
class ExecutionCounter(object):
def __init__(self, total=0):
'''
Most of the stats are at test instance level
Except that "_cases" and "_skipped_cases" are for cases of ALL test instances
total complete = done + skipped_filter
total = yaml test scenarios * applicable platforms
complete perctenage = (done + skipped_filter) / total
pass rate = passed / (total - skipped_configs)
'''
# instances that go through the pipeline
# updated by report_out()
self._done = Value('i', 0)
# iteration
self._iteration = Value('i', 0)
# instances that actually executed and passed
# updated by report_out()
self._passed = Value('i', 0)
# static filter + runtime filter + build skipped
# updated by update_counting_before_pipeline() and report_out()
self._skipped_configs = Value('i', 0)
# cmake filter + build skipped
# updated by report_out()
self._skipped_runtime = Value('i', 0)
# staic filtered at yaml parsing time
# updated by update_counting_before_pipeline()
self._skipped_filter = Value('i', 0)
# updated by update_counting_before_pipeline() and report_out()
self._skipped_cases = Value('i', 0)
# updated by report_out() in pipeline
self._error = Value('i', 0)
self._failed = Value('i', 0)
# initialized to number of test instances
self._total = Value('i', total)
# updated in report_out
self._cases = Value('i', 0)
self.lock = Lock()
def summary(self):
print("--------------------------------")
print(f"Total test suites: {self.total}") # actually test instances
print(f"Total test cases: {self.cases}")
print(f"Executed test cases: {self.cases - self.skipped_cases}")
print(f"Skipped test cases: {self.skipped_cases}")
print(f"Completed test suites: {self.done}")
print(f"Passing test suites: {self.passed}")
print(f"Failing test suites: {self.failed}")
print(f"Skipped test suites: {self.skipped_configs}")
print(f"Skipped test suites (runtime): {self.skipped_runtime}")
print(f"Skipped test suites (filter): {self.skipped_filter}")
print(f"Errors: {self.error}")
print("--------------------------------")
@property
def cases(self):
with self._cases.get_lock():
return self._cases.value
@cases.setter
def cases(self, value):
with self._cases.get_lock():
self._cases.value = value
@property
def skipped_cases(self):
with self._skipped_cases.get_lock():
return self._skipped_cases.value
@skipped_cases.setter
def skipped_cases(self, value):
with self._skipped_cases.get_lock():
self._skipped_cases.value = value
@property
def error(self):
with self._error.get_lock():
return self._error.value
@error.setter
def error(self, value):
with self._error.get_lock():
self._error.value = value
@property
def iteration(self):
with self._iteration.get_lock():
return self._iteration.value
@iteration.setter
def iteration(self, value):
with self._iteration.get_lock():
self._iteration.value = value
@property
def done(self):
with self._done.get_lock():
return self._done.value
@done.setter
def done(self, value):
with self._done.get_lock():
self._done.value = value
@property
def passed(self):
with self._passed.get_lock():
return self._passed.value
@passed.setter
def passed(self, value):
with self._passed.get_lock():
self._passed.value = value
@property
def skipped_configs(self):
with self._skipped_configs.get_lock():
return self._skipped_configs.value
@skipped_configs.setter
def skipped_configs(self, value):
with self._skipped_configs.get_lock():
self._skipped_configs.value = value
@property
def skipped_filter(self):
with self._skipped_filter.get_lock():
return self._skipped_filter.value
@skipped_filter.setter
def skipped_filter(self, value):
with self._skipped_filter.get_lock():
self._skipped_filter.value = value
@property
def skipped_runtime(self):
with self._skipped_runtime.get_lock():
return self._skipped_runtime.value
@skipped_runtime.setter
def skipped_runtime(self, value):
with self._skipped_runtime.get_lock():
self._skipped_runtime.value = value
@property
def failed(self):
with self._failed.get_lock():
return self._failed.value
@failed.setter
def failed(self, value):
with self._failed.get_lock():
self._failed.value = value
@property
def total(self):
with self._total.get_lock():
return self._total.value
class CMake:
config_re = re.compile('(CONFIG_[A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$')
dt_re = re.compile('([A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$')
def __init__(self, testsuite: TestSuite, platform: Platform, source_dir, build_dir, jobserver):
self.cwd = None
self.capture_output = True
self.defconfig = {}
self.cmake_cache = {}
self.instance = None
self.testsuite = testsuite
self.platform = platform
self.source_dir = source_dir
self.build_dir = build_dir
self.log = "build.log"
self.default_encoding = sys.getdefaultencoding()
self.jobserver = jobserver
def parse_generated(self, filter_stages=[]):
self.defconfig = {}
return {}
def run_build(self, args=[]):
logger.debug("Building %s for %s" % (self.source_dir, self.platform.name))
cmake_args = []
cmake_args.extend(args)
cmake = shutil.which('cmake')
cmd = [cmake] + cmake_args
kwargs = dict()
if self.capture_output:
kwargs['stdout'] = subprocess.PIPE
# CMake sends the output of message() to stderr unless it's STATUS
kwargs['stderr'] = subprocess.STDOUT
if self.cwd:
kwargs['cwd'] = self.cwd
start_time = time.time()
if sys.platform == 'linux':
p = self.jobserver.popen(cmd, **kwargs)
else:
p = subprocess.Popen(cmd, **kwargs)
logger.debug(f'Running {" ".join(cmd)}')
out, _ = p.communicate()
ret = {}
duration = time.time() - start_time
self.instance.build_time += duration
if p.returncode == 0:
msg = f"Finished building {self.source_dir} for {self.platform.name} in {duration:.2f} seconds"
logger.debug(msg)
self.instance.status = "passed"
if not self.instance.run:
self.instance.add_missing_case_status("skipped", "Test was built only")
ret = {"returncode": p.returncode}
if out:
log_msg = out.decode(self.default_encoding)
with open(os.path.join(self.build_dir, self.log), "a", encoding=self.default_encoding) as log:
log.write(log_msg)
else:
return None
else:
# A real error occurred, raise an exception
log_msg = ""
if out:
log_msg = out.decode(self.default_encoding)
with open(os.path.join(self.build_dir, self.log), "a", encoding=self.default_encoding) as log:
log.write(log_msg)
if log_msg:
overflow_found = re.findall("region `(FLASH|ROM|RAM|ICCM|DCCM|SRAM|dram\\d_\\d_seg)' overflowed by", log_msg)
imgtool_overflow_found = re.findall(r"Error: Image size \(.*\) \+ trailer \(.*\) exceeds requested size", log_msg)
if overflow_found and not self.options.overflow_as_errors:
logger.debug("Test skipped due to {} Overflow".format(overflow_found[0]))
self.instance.status = "skipped"
self.instance.reason = "{} overflow".format(overflow_found[0])
change_skip_to_error_if_integration(self.options, self.instance)
elif imgtool_overflow_found and not self.options.overflow_as_errors:
self.instance.status = "skipped"
self.instance.reason = "imgtool overflow"
change_skip_to_error_if_integration(self.options, self.instance)
else:
self.instance.status = "error"
self.instance.reason = "Build failure"
ret = {
"returncode": p.returncode
}
return ret
def run_cmake(self, args="", filter_stages=[]):
if not self.options.disable_warnings_as_errors:
warnings_as_errors = 'y'
gen_defines_args = "--edtlib-Werror"
else:
warnings_as_errors = 'n'
gen_defines_args = ""
warning_command = 'CONFIG_COMPILER_WARNINGS_AS_ERRORS'
if self.testsuite.sysbuild:
warning_command = 'SB_' + warning_command
logger.debug("Running cmake on %s for %s" % (self.source_dir, self.platform.name))
cmake_args = [
f'-B{self.build_dir}',
f'-DTC_RUNID={self.instance.run_id}',
f'-D{warning_command}={warnings_as_errors}',
f'-DEXTRA_GEN_DEFINES_ARGS={gen_defines_args}',
f'-G{self.env.generator}'
]
# If needed, run CMake using the package_helper script first, to only run
# a subset of all cmake modules. This output will be used to filter
# testcases, and the full CMake configuration will be run for
# testcases that should be built
if filter_stages:
cmake_filter_args = [
f'-DMODULES={",".join(filter_stages)}',
f'-P{canonical_zephyr_base}/cmake/package_helper.cmake',
]
if self.testsuite.sysbuild and not filter_stages:
logger.debug("Building %s using sysbuild" % (self.source_dir))
source_args = [
f'-S{canonical_zephyr_base}/share/sysbuild',
f'-DAPP_DIR={self.source_dir}'
]
else:
source_args = [
f'-S{self.source_dir}'
]
cmake_args.extend(source_args)
cmake_args.extend(args)
cmake_opts = ['-DBOARD={}'.format(self.platform.name)]
cmake_args.extend(cmake_opts)
if self.instance.testsuite.required_snippets:
cmake_opts = ['-DSNIPPET={}'.format(';'.join(self.instance.testsuite.required_snippets))]
cmake_args.extend(cmake_opts)
cmake = shutil.which('cmake')
cmd = [cmake] + cmake_args
if filter_stages:
cmd += cmake_filter_args
kwargs = dict()
log_command(logger, "Calling cmake", cmd)
if self.capture_output:
kwargs['stdout'] = subprocess.PIPE
# CMake sends the output of message() to stderr unless it's STATUS
kwargs['stderr'] = subprocess.STDOUT
if self.cwd:
kwargs['cwd'] = self.cwd
start_time = time.time()
if sys.platform == 'linux':
p = self.jobserver.popen(cmd, **kwargs)
else:
p = subprocess.Popen(cmd, **kwargs)
out, _ = p.communicate()
duration = time.time() - start_time
self.instance.build_time += duration
if p.returncode == 0:
filter_results = self.parse_generated(filter_stages)
msg = f"Finished running cmake {self.source_dir} for {self.platform.name} in {duration:.2f} seconds"
logger.debug(msg)
ret = {
'returncode': p.returncode,
'filter': filter_results
}
else:
self.instance.status = "error"
self.instance.reason = "Cmake build failure"
for tc in self.instance.testcases:
tc.status = self.instance.status
logger.error("Cmake build failure: %s for %s" % (self.source_dir, self.platform.name))
ret = {"returncode": p.returncode}
if out:
os.makedirs(self.build_dir, exist_ok=True)
with open(os.path.join(self.build_dir, self.log), "a", encoding=self.default_encoding) as log:
log_msg = out.decode(self.default_encoding)
log.write(log_msg)
return ret
class FilterBuilder(CMake):
def __init__(self, testsuite: TestSuite, platform: Platform, source_dir, build_dir, jobserver):
super().__init__(testsuite, platform, source_dir, build_dir, jobserver)
self.log = "config-twister.log"
def parse_generated(self, filter_stages=[]):
if self.platform.name == "unit_testing":
return {}
if self.testsuite.sysbuild and not filter_stages:
# Load domain yaml to get default domain build directory
domain_path = os.path.join(self.build_dir, "domains.yaml")
domains = Domains.from_file(domain_path)
logger.debug("Loaded sysbuild domain data from %s" % (domain_path))
self.instance.domains = domains
domain_build = domains.get_default_domain().build_dir
cmake_cache_path = os.path.join(domain_build, "CMakeCache.txt")
defconfig_path = os.path.join(domain_build, "zephyr", ".config")
edt_pickle = os.path.join(domain_build, "zephyr", "edt.pickle")
else:
cmake_cache_path = os.path.join(self.build_dir, "CMakeCache.txt")
# .config is only available after kconfig stage in cmake. If only dt based filtration is required
# package helper call won't produce .config
if not filter_stages or "kconfig" in filter_stages:
defconfig_path = os.path.join(self.build_dir, "zephyr", ".config")
# dt is compiled before kconfig, so edt_pickle is available regardless of choice of filter stages
edt_pickle = os.path.join(self.build_dir, "zephyr", "edt.pickle")
if not filter_stages or "kconfig" in filter_stages:
with open(defconfig_path, "r") as fp:
defconfig = {}
for line in fp.readlines():
m = self.config_re.match(line)
if not m:
if line.strip() and not line.startswith("#"):
sys.stderr.write("Unrecognized line %s\n" % line)
continue
defconfig[m.group(1)] = m.group(2).strip()
self.defconfig = defconfig
cmake_conf = {}
try:
cache = CMakeCache.from_file(cmake_cache_path)
except FileNotFoundError:
cache = {}
for k in iter(cache):
cmake_conf[k.name] = k.value
self.cmake_cache = cmake_conf
filter_data = {
"ARCH": self.platform.arch,
"PLATFORM": self.platform.name
}
filter_data.update(os.environ)
if not filter_stages or "kconfig" in filter_stages:
filter_data.update(self.defconfig)
filter_data.update(self.cmake_cache)
if self.testsuite.sysbuild and self.env.options.device_testing:
# Verify that twister's arguments support sysbuild.
# Twister sysbuild flashing currently only works with west, so
# --west-flash must be passed. Additionally, erasing the DUT
# before each test with --west-flash=--erase will inherently not
# work with sysbuild.
if self.env.options.west_flash is None:
logger.warning("Sysbuild test will be skipped. " +
"West must be used for flashing.")
return {os.path.join(self.platform.name, self.testsuite.name): True}
elif "--erase" in self.env.options.west_flash:
logger.warning("Sysbuild test will be skipped, " +
"--erase is not supported with --west-flash")
return {os.path.join(self.platform.name, self.testsuite.name): True}
if self.testsuite and self.testsuite.filter:
try:
if os.path.exists(edt_pickle):
with open(edt_pickle, 'rb') as f:
edt = pickle.load(f)
else:
edt = None
ret = expr_parser.parse(self.testsuite.filter, filter_data, edt)
except (ValueError, SyntaxError) as se:
sys.stderr.write(
"Failed processing %s\n" % self.testsuite.yamlfile)
raise se
if not ret:
return {os.path.join(self.platform.name, self.testsuite.name): True}
else:
return {os.path.join(self.platform.name, self.testsuite.name): False}
else:
self.platform.filter_data = filter_data
return filter_data
class ProjectBuilder(FilterBuilder):
def __init__(self, instance: TestInstance, env: TwisterEnv, jobserver, **kwargs):
super().__init__(instance.testsuite, instance.platform, instance.testsuite.source_dir, instance.build_dir, jobserver)
self.log = "build.log"
self.instance = instance
self.filtered_tests = 0
self.options = env.options
self.env = env
self.duts = None
def log_info(self, filename, inline_logs, log_testcases=False):
filename = os.path.abspath(os.path.realpath(filename))
if inline_logs:
logger.info("{:-^100}".format(filename))
try:
with open(filename) as fp:
data = fp.read()
except Exception as e:
data = "Unable to read log data (%s)\n" % (str(e))
logger.error(data)
logger.info("{:-^100}".format(filename))
if log_testcases:
for tc in self.instance.testcases:
if not tc.reason:
continue
logger.info(
f"\n{str(tc.name).center(100, '_')}\n"
f"{tc.reason}\n"
f"{100*'_'}\n"
f"{tc.output}"
)
else:
logger.error("see: " + Fore.YELLOW + filename + Fore.RESET)
def log_info_file(self, inline_logs):
build_dir = self.instance.build_dir
h_log = "{}/handler.log".format(build_dir)
b_log = "{}/build.log".format(build_dir)
v_log = "{}/valgrind.log".format(build_dir)
d_log = "{}/device.log".format(build_dir)
pytest_log = "{}/twister_harness.log".format(build_dir)
if os.path.exists(v_log) and "Valgrind" in self.instance.reason:
self.log_info("{}".format(v_log), inline_logs)
elif os.path.exists(pytest_log) and os.path.getsize(pytest_log) > 0:
self.log_info("{}".format(pytest_log), inline_logs, log_testcases=True)
elif os.path.exists(h_log) and os.path.getsize(h_log) > 0:
self.log_info("{}".format(h_log), inline_logs)
elif os.path.exists(d_log) and os.path.getsize(d_log) > 0:
self.log_info("{}".format(d_log), inline_logs)
else:
self.log_info("{}".format(b_log), inline_logs)
def process(self, pipeline, done, message, lock, results):
op = message.get('op')
self.instance.setup_handler(self.env)
if op == "filter":
ret = self.cmake(filter_stages=self.instance.filter_stages)
if self.instance.status in ["failed", "error"]:
pipeline.put({"op": "report", "test": self.instance})
else:
# Here we check the dt/kconfig filter results coming from running cmake
if self.instance.name in ret['filter'] and ret['filter'][self.instance.name]:
logger.debug("filtering %s" % self.instance.name)
self.instance.status = "filtered"
self.instance.reason = "runtime filter"
results.skipped_runtime += 1
self.instance.add_missing_case_status("skipped")
pipeline.put({"op": "report", "test": self.instance})
else:
pipeline.put({"op": "cmake", "test": self.instance})
# The build process, call cmake and build with configured generator
elif op == "cmake":
ret = self.cmake()
if self.instance.status in ["failed", "error"]:
pipeline.put({"op": "report", "test": self.instance})
elif self.options.cmake_only:
if self.instance.status is None:
self.instance.status = "passed"
pipeline.put({"op": "report", "test": self.instance})
else:
# Here we check the runtime filter results coming from running cmake
if self.instance.name in ret['filter'] and ret['filter'][self.instance.name]:
logger.debug("filtering %s" % self.instance.name)
self.instance.status = "filtered"
self.instance.reason = "runtime filter"
results.skipped_runtime += 1
self.instance.add_missing_case_status("skipped")
pipeline.put({"op": "report", "test": self.instance})
else:
pipeline.put({"op": "build", "test": self.instance})
elif op == "build":
logger.debug("build test: %s" % self.instance.name)
ret = self.build()
if not ret:
self.instance.status = "error"
self.instance.reason = "Build Failure"
pipeline.put({"op": "report", "test": self.instance})
else:
# Count skipped cases during build, for example
# due to ram/rom overflow.
if self.instance.status == "skipped":
results.skipped_runtime += 1
self.instance.add_missing_case_status("skipped", self.instance.reason)
if ret.get('returncode', 1) > 0:
self.instance.add_missing_case_status("blocked", self.instance.reason)
pipeline.put({"op": "report", "test": self.instance})
else:
logger.debug(f"Determine test cases for test instance: {self.instance.name}")
try:
self.determine_testcases(results)
pipeline.put({"op": "gather_metrics", "test": self.instance})
except BuildError as e:
logger.error(str(e))
self.instance.status = "error"
self.instance.reason = str(e)
pipeline.put({"op": "report", "test": self.instance})
elif op == "gather_metrics":
ret = self.gather_metrics(self.instance)
if not ret or ret.get('returncode', 1) > 0:
self.instance.status = "error"
self.instance.reason = "Build Failure at gather_metrics."
pipeline.put({"op": "report", "test": self.instance})
elif self.instance.run and self.instance.handler.ready:
pipeline.put({"op": "run", "test": self.instance})
else:
pipeline.put({"op": "report", "test": self.instance})
# Run the generated binary using one of the supported handlers
elif op == "run":
logger.debug("run test: %s" % self.instance.name)
self.run()
logger.debug(f"run status: {self.instance.name} {self.instance.status}")
try:
# to make it work with pickle
self.instance.handler.thread = None
self.instance.handler.duts = None
pipeline.put({
"op": "report",
"test": self.instance,
"status": self.instance.status,
"reason": self.instance.reason
}
)
except RuntimeError as e:
logger.error(f"RuntimeError: {e}")
traceback.print_exc()
# Report results and output progress to screen
elif op == "report":
with lock:
done.put(self.instance)
self.report_out(results)
if not self.options.coverage:
if self.options.prep_artifacts_for_testing:
pipeline.put({"op": "cleanup", "mode": "device", "test": self.instance})
elif self.options.runtime_artifact_cleanup == "pass" and self.instance.status == "passed":
pipeline.put({"op": "cleanup", "mode": "passed", "test": self.instance})
elif self.options.runtime_artifact_cleanup == "all":
pipeline.put({"op": "cleanup", "mode": "all", "test": self.instance})
elif op == "cleanup":
mode = message.get("mode")
if mode == "device":
self.cleanup_device_testing_artifacts()
elif mode == "passed" or (mode == "all" and self.instance.reason != "Cmake build failure"):
self.cleanup_artifacts()
def determine_testcases(self, results):
yaml_testsuite_name = self.instance.testsuite.id
logger.debug(f"Determine test cases for test suite: {yaml_testsuite_name}")
elf_file = self.instance.get_elf_file()
elf = ELFFile(open(elf_file, "rb"))
logger.debug(f"Test instance {self.instance.name} already has {len(self.instance.testcases)} cases.")
new_ztest_unit_test_regex = re.compile(r"z_ztest_unit_test__([^\s]+?)__([^\s]*)")
detected_cases = []
for section in elf.iter_sections():
if isinstance(section, SymbolTableSection):
for sym in section.iter_symbols():
# It is only meant for new ztest fx because only new ztest fx exposes test functions
# precisely.
# The 1st capture group is new ztest suite name.
# The 2nd capture group is new ztest unit test name.
matches = new_ztest_unit_test_regex.findall(sym.name)
if matches:
for m in matches:
# new_ztest_suite = m[0] # not used for now
test_func_name = m[1].replace("test_", "", 1)
testcase_id = f"{yaml_testsuite_name}.{test_func_name}"
detected_cases.append(testcase_id)
if detected_cases:
logger.debug(f"{', '.join(detected_cases)} in {elf_file}")
self.instance.testcases.clear()
self.instance.testsuite.testcases.clear()
# When the old regex-based test case collection is fully deprecated,
# this will be the sole place where test cases get added to the test instance.
# Then we can further include the new_ztest_suite info in the testcase_id.
for testcase_id in detected_cases:
self.instance.add_testcase(name=testcase_id)
self.instance.testsuite.add_testcase(name=testcase_id)
def cleanup_artifacts(self, additional_keep: List[str] = []):
logger.debug("Cleaning up {}".format(self.instance.build_dir))
allow = [
os.path.join('zephyr', '.config'),
'handler.log',
'build.log',
'device.log',
'recording.csv',
# below ones are needed to make --test-only work as well
'Makefile',
'CMakeCache.txt',
'build.ninja',
os.path.join('CMakeFiles', 'rules.ninja')
]
allow += additional_keep
if self.options.runtime_artifact_cleanup == 'all':
allow += [os.path.join('twister', 'testsuite_extra.conf')]
allow = [os.path.join(self.instance.build_dir, file) for file in allow]
for dirpath, dirnames, filenames in os.walk(self.instance.build_dir, topdown=False):
for name in filenames:
path = os.path.join(dirpath, name)
if path not in allow:
os.remove(path)
# Remove empty directories and symbolic links to directories
for dir in dirnames:
path = os.path.join(dirpath, dir)
if os.path.islink(path):
os.remove(path)
elif not os.listdir(path):
os.rmdir(path)
def cleanup_device_testing_artifacts(self):
logger.debug("Cleaning up for Device Testing {}".format(self.instance.build_dir))
files_to_keep = self._get_binaries()
files_to_keep.append(os.path.join('zephyr', 'runners.yaml'))
if self.testsuite.sysbuild:
files_to_keep.append('domains.yaml')
for domain in self.instance.domains.get_domains():
files_to_keep += self._get_artifact_allow_list_for_domain(domain.name)
self.cleanup_artifacts(files_to_keep)
self._sanitize_files()
def _get_artifact_allow_list_for_domain(self, domain: str) -> List[str]:
"""
Return a list of files needed to test a given domain.
"""
allow = [
os.path.join(domain, 'build.ninja'),
os.path.join(domain, 'CMakeCache.txt'),
os.path.join(domain, 'CMakeFiles', 'rules.ninja'),
os.path.join(domain, 'Makefile'),
os.path.join(domain, 'zephyr', '.config'),
os.path.join(domain, 'zephyr', 'runners.yaml')
]
return allow
def _get_binaries(self) -> List[str]:
"""
Get list of binaries paths (absolute or relative to the
self.instance.build_dir), basing on information from platform.binaries
or runners.yaml. If they are not found take default binaries like
"zephyr/zephyr.hex" etc.
"""
binaries: List[str] = []
platform = self.instance.platform
if platform.binaries:
for binary in platform.binaries:
binaries.append(os.path.join('zephyr', binary))
# Get binaries for a single-domain build
binaries += self._get_binaries_from_runners()
# Get binaries in the case of a multiple-domain build
if self.testsuite.sysbuild:
for domain in self.instance.domains.get_domains():
binaries += self._get_binaries_from_runners(domain.name)
# if binaries was not found in platform.binaries and runners.yaml take default ones
if len(binaries) == 0:
binaries = [
os.path.join('zephyr', 'zephyr.hex'),
os.path.join('zephyr', 'zephyr.bin'),
os.path.join('zephyr', 'zephyr.elf'),
os.path.join('zephyr', 'zephyr.exe'),
]
return binaries
def _get_binaries_from_runners(self, domain='') -> List[str]:
"""
Get list of binaries paths (absolute or relative to the
self.instance.build_dir) from runners.yaml file. May be used for
multiple-domain builds by passing in one domain at a time.
"""
runners_file_path: str = os.path.join(self.instance.build_dir,
domain, 'zephyr', 'runners.yaml')
if not os.path.exists(runners_file_path):
return []
with open(runners_file_path, 'r') as file:
runners_content: dict = yaml.load(file, Loader=SafeLoader)
if 'config' not in runners_content:
return []
runners_config: dict = runners_content['config']
binary_keys: List[str] = ['elf_file', 'hex_file', 'bin_file']
binaries: List[str] = []
for binary_key in binary_keys:
binary_path = runners_config.get(binary_key)
if binary_path is None:
continue
if os.path.isabs(binary_path):
binaries.append(binary_path)
else:
binaries.append(os.path.join(domain, 'zephyr', binary_path))
return binaries
def _sanitize_files(self):
"""
Sanitize files to make it possible to flash those file on different
computer/system.
"""
self._sanitize_runners_file()
self._sanitize_zephyr_base_from_files()
def _sanitize_runners_file(self):
"""
Replace absolute paths of binary files for relative ones. The base
directory for those files is f"{self.instance.build_dir}/zephyr"
"""
runners_dir_path: str = os.path.join(self.instance.build_dir, 'zephyr')
runners_file_path: str = os.path.join(runners_dir_path, 'runners.yaml')
if not os.path.exists(runners_file_path):
return
with open(runners_file_path, 'rt') as file:
runners_content_text = file.read()
runners_content_yaml: dict = yaml.load(runners_content_text, Loader=SafeLoader)
if 'config' not in runners_content_yaml:
return
runners_config: dict = runners_content_yaml['config']
binary_keys: List[str] = ['elf_file', 'hex_file', 'bin_file']
for binary_key in binary_keys:
binary_path = runners_config.get(binary_key)
# sanitize only paths which exist and are absolute
if binary_path is None or not os.path.isabs(binary_path):
continue
binary_path_relative = os.path.relpath(binary_path, start=runners_dir_path)
runners_content_text = runners_content_text.replace(binary_path, binary_path_relative)
with open(runners_file_path, 'wt') as file:
file.write(runners_content_text)
def _sanitize_zephyr_base_from_files(self):
"""
Remove Zephyr base paths from selected files.
"""
files_to_sanitize = [
'CMakeCache.txt',
os.path.join('zephyr', 'runners.yaml'),
]
for file_path in files_to_sanitize:
file_path = os.path.join(self.instance.build_dir, file_path)
if not os.path.exists(file_path):
continue
with open(file_path, "rt") as file:
data = file.read()
# add trailing slash at the end of canonical_zephyr_base if it does not exist:
path_to_remove = os.path.join(canonical_zephyr_base, "")
data = data.replace(path_to_remove, "")
with open(file_path, "wt") as file:
file.write(data)
def report_out(self, results):
total_to_do = results.total
total_tests_width = len(str(total_to_do))
results.done += 1
instance = self.instance
if results.iteration == 1:
results.cases += len(instance.testcases)
if instance.status in ["error", "failed"]:
if instance.status == "error":
results.error += 1
txt = " ERROR "
else:
results.failed += 1
txt = " FAILED "
if self.options.verbose:
status = Fore.RED + txt + Fore.RESET + instance.reason
else:
logger.error(
"{:<25} {:<50} {}{}{}: {}".format(
instance.platform.name,
instance.testsuite.name,
Fore.RED,
txt,
Fore.RESET,
instance.reason))
if not self.options.verbose:
self.log_info_file(self.options.inline_logs)
elif instance.status in ["skipped", "filtered"]:
status = Fore.YELLOW + "SKIPPED" + Fore.RESET
results.skipped_configs += 1
# test cases skipped at the test instance level
results.skipped_cases += len(instance.testsuite.testcases)
elif instance.status == "passed":
status = Fore.GREEN + "PASSED" + Fore.RESET
results.passed += 1
for case in instance.testcases:
# test cases skipped at the test case level
if case.status == 'skipped':
results.skipped_cases += 1
else:
logger.debug(f"Unknown status = {instance.status}")
status = Fore.YELLOW + "UNKNOWN" + Fore.RESET
if self.options.verbose:
if self.options.cmake_only:
more_info = "cmake"
elif instance.status in ["skipped", "filtered"]:
more_info = instance.reason
else:
if instance.handler.ready and instance.run:
more_info = instance.handler.type_str
htime = instance.execution_time
if instance.dut:
more_info += f": {instance.dut},"
if htime:
more_info += " {:.3f}s".format(htime)
else:
more_info = "build"
if ( instance.status in ["error", "failed", "timeout", "flash_error"]
and hasattr(self.instance.handler, 'seed')
and self.instance.handler.seed is not None ):
more_info += "/seed: " + str(self.options.seed)
logger.info("{:>{}}/{} {:<25} {:<50} {} ({})".format(
results.done, total_tests_width, total_to_do , instance.platform.name,
instance.testsuite.name, status, more_info))
if instance.status in ["error", "failed", "timeout"]:
self.log_info_file(self.options.inline_logs)
else:
completed_perc = 0
if total_to_do > 0:
completed_perc = int((float(results.done) / total_to_do) * 100)
sys.stdout.write("INFO - Total complete: %s%4d/%4d%s %2d%% skipped: %s%4d%s, failed: %s%4d%s, error: %s%4d%s\r" % (
Fore.GREEN,
results.done,
total_to_do,
Fore.RESET,
completed_perc,
Fore.YELLOW if results.skipped_configs > 0 else Fore.RESET,
results.skipped_configs,
Fore.RESET,
Fore.RED if results.failed > 0 else Fore.RESET,
results.failed,
Fore.RESET,
Fore.RED if results.error > 0 else Fore.RESET,
results.error,
Fore.RESET
)
)
sys.stdout.flush()
@staticmethod
def cmake_assemble_args(extra_args, handler, extra_conf_files, extra_overlay_confs,
extra_dtc_overlay_files, cmake_extra_args,
build_dir):
# Retain quotes around config options
config_options = [arg for arg in extra_args if arg.startswith("CONFIG_")]
args = [arg for arg in extra_args if not arg.startswith("CONFIG_")]
args_expanded = ["-D{}".format(a.replace('"', '\"')) for a in config_options]
if handler.ready:
args.extend(handler.args)
if extra_conf_files:
args.append(f"CONF_FILE=\"{';'.join(extra_conf_files)}\"")
if extra_dtc_overlay_files:
args.append(f"DTC_OVERLAY_FILE=\"{';'.join(extra_dtc_overlay_files)}\"")
# merge overlay files into one variable
overlays = extra_overlay_confs.copy()
additional_overlay_path = os.path.join(
build_dir, "twister", "testsuite_extra.conf"
)
if os.path.exists(additional_overlay_path):
overlays.append(additional_overlay_path)
if overlays:
args.append("OVERLAY_CONFIG=\"%s\"" % (" ".join(overlays)))
# Build the final argument list
args_expanded.extend(["-D{}".format(a.replace('"', '\"')) for a in cmake_extra_args])
args_expanded.extend(["-D{}".format(a.replace('"', '')) for a in args])
return args_expanded
def cmake(self, filter_stages=[]):
args = self.cmake_assemble_args(
self.testsuite.extra_args.copy(), # extra_args from YAML
self.instance.handler,
self.testsuite.extra_conf_files,
self.testsuite.extra_overlay_confs,
self.testsuite.extra_dtc_overlay_files,
self.options.extra_args, # CMake extra args
self.instance.build_dir,
)
return self.run_cmake(args,filter_stages)
def build(self):
harness = HarnessImporter.get_harness(self.instance.testsuite.harness.capitalize())
build_result = self.run_build(['--build', self.build_dir])
try:
if harness:
harness.instance = self.instance
harness.build()
except ConfigurationError as error:
self.instance.status = "error"
self.instance.reason = str(error)
logger.error(self.instance.reason)
return
return build_result
def run(self):
instance = self.instance
if instance.handler.ready:
logger.debug(f"Reset instance status from '{instance.status}' to None before run.")
instance.status = None
if instance.handler.type_str == "device":
instance.handler.duts = self.duts
if(self.options.seed is not None and instance.platform.name.startswith("native_")):
self.parse_generated()
if('CONFIG_FAKE_ENTROPY_NATIVE_POSIX' in self.defconfig and
self.defconfig['CONFIG_FAKE_ENTROPY_NATIVE_POSIX'] == 'y'):
instance.handler.seed = self.options.seed
if self.options.extra_test_args and instance.platform.arch == "posix":
instance.handler.extra_test_args = self.options.extra_test_args
harness = HarnessImporter.get_harness(instance.testsuite.harness.capitalize())
try:
harness.configure(instance)
except ConfigurationError as error:
instance.status = "error"
instance.reason = str(error)
logger.error(instance.reason)
return
#
if isinstance(harness, Pytest):
harness.pytest_run(instance.handler.get_test_timeout())
else:
instance.handler.handle(harness)
sys.stdout.flush()
def gather_metrics(self, instance: TestInstance):
build_result = {"returncode": 0}
if self.options.create_rom_ram_report:
build_result = self.run_build(['--build', self.build_dir, "--target", "footprint"])
if self.options.enable_size_report and not self.options.cmake_only:
self.calc_size(instance=instance, from_buildlog=self.options.footprint_from_buildlog)
else:
instance.metrics["used_ram"] = 0
instance.metrics["used_rom"] = 0
instance.metrics["available_rom"] = 0
instance.metrics["available_ram"] = 0
instance.metrics["unrecognized"] = []
return build_result
@staticmethod
def calc_size(instance: TestInstance, from_buildlog: bool):
if instance.status not in ["error", "failed", "skipped"]:
if not instance.platform.type in ["native", "qemu", "unit"]:
generate_warning = bool(instance.platform.type == "mcu")
size_calc = instance.calculate_sizes(from_buildlog=from_buildlog, generate_warning=generate_warning)
instance.metrics["used_ram"] = size_calc.get_used_ram()
instance.metrics["used_rom"] = size_calc.get_used_rom()
instance.metrics["available_rom"] = size_calc.get_available_rom()
instance.metrics["available_ram"] = size_calc.get_available_ram()
instance.metrics["unrecognized"] = size_calc.unrecognized_sections()
else:
instance.metrics["used_ram"] = 0
instance.metrics["used_rom"] = 0
instance.metrics["available_rom"] = 0
instance.metrics["available_ram"] = 0
instance.metrics["unrecognized"] = []
instance.metrics["handler_time"] = instance.execution_time
class TwisterRunner:
def __init__(self, instances, suites, env=None) -> None:
self.pipeline = None
self.options = env.options
self.env = env
self.instances = instances
self.suites = suites
self.duts = None
self.jobs = 1
self.results = None
self.jobserver = None
def run(self):
retries = self.options.retry_failed + 1
BaseManager.register('LifoQueue', queue.LifoQueue)
manager = BaseManager()
manager.start()
self.results = ExecutionCounter(total=len(self.instances))
self.iteration = 0
pipeline = manager.LifoQueue()
done_queue = manager.LifoQueue()
# Set number of jobs
if self.options.jobs:
self.jobs = self.options.jobs
elif self.options.build_only:
self.jobs = multiprocessing.cpu_count() * 2
else:
self.jobs = multiprocessing.cpu_count()
if sys.platform == "linux":
if os.name == 'posix':
self.jobserver = GNUMakeJobClient.from_environ(jobs=self.options.jobs)
if not self.jobserver:
self.jobserver = GNUMakeJobServer(self.jobs)
elif self.jobserver.jobs:
self.jobs = self.jobserver.jobs
# TODO: Implement this on windows/mac also
else:
self.jobserver = JobClient()
logger.info("JOBS: %d", self.jobs)
self.update_counting_before_pipeline()
while True:
self.results.iteration += 1
if self.results.iteration > 1:
logger.info("%d Iteration:" % (self.results.iteration))
time.sleep(self.options.retry_interval) # waiting for the system to settle down
self.results.done = self.results.total - self.results.failed - self.results.error
self.results.failed = 0
if self.options.retry_build_errors:
self.results.error = 0
else:
self.results.done = self.results.skipped_filter
self.execute(pipeline, done_queue)
while True:
try:
inst = done_queue.get_nowait()
except queue.Empty:
break
else:
inst.metrics.update(self.instances[inst.name].metrics)
inst.metrics["handler_time"] = inst.execution_time
inst.metrics["unrecognized"] = []
self.instances[inst.name] = inst
print("")
retry_errors = False
if self.results.error and self.options.retry_build_errors:
retry_errors = True
retries = retries - 1
if retries == 0 or ( self.results.failed == 0 and not retry_errors):
break
self.show_brief()
def update_counting_before_pipeline(self):
'''
Updating counting before pipeline is necessary because statically filterd
test instance never enter the pipeline. While some pipeline output needs
the static filter stats. So need to prepare them before pipline starts.
'''
for instance in self.instances.values():
if instance.status == 'filtered' and not instance.reason == 'runtime filter':
self.results.skipped_filter += 1
self.results.skipped_configs += 1
self.results.skipped_cases += len(instance.testsuite.testcases)
self.results.cases += len(instance.testsuite.testcases)
elif instance.status == 'error':
self.results.error += 1
def show_brief(self):
logger.info("%d test scenarios (%d test instances) selected, "
"%d configurations skipped (%d by static filter, %d at runtime)." %
(len(self.suites), len(self.instances),
self.results.skipped_configs,
self.results.skipped_filter,
self.results.skipped_configs - self.results.skipped_filter))
def add_tasks_to_queue(self, pipeline, build_only=False, test_only=False, retry_build_errors=False):
for instance in self.instances.values():
if build_only:
instance.run = False
no_retry_statuses = ['passed', 'skipped', 'filtered']
if not retry_build_errors:
no_retry_statuses.append("error")
if instance.status not in no_retry_statuses:
logger.debug(f"adding {instance.name}")
if instance.status:
instance.retries += 1
instance.status = None
# Check if cmake package_helper script can be run in advance.
instance.filter_stages = []
if instance.testsuite.filter:
instance.filter_stages = self.get_cmake_filter_stages(instance.testsuite.filter, expr_parser.reserved.keys())
if test_only and instance.run:
pipeline.put({"op": "run", "test": instance})
elif instance.filter_stages and "full" not in instance.filter_stages:
pipeline.put({"op": "filter", "test": instance})
else:
cache_file = os.path.join(instance.build_dir, "CMakeCache.txt")
if os.path.exists(cache_file) and self.env.options.aggressive_no_clean:
pipeline.put({"op": "build", "test": instance})
else:
pipeline.put({"op": "cmake", "test": instance})
def pipeline_mgr(self, pipeline, done_queue, lock, results):
if sys.platform == 'linux':
with self.jobserver.get_job():
while True:
try:
task = pipeline.get_nowait()
except queue.Empty:
break
else:
instance = task['test']
pb = ProjectBuilder(instance, self.env, self.jobserver)
pb.duts = self.duts
pb.process(pipeline, done_queue, task, lock, results)
return True
else:
while True:
try:
task = pipeline.get_nowait()
except queue.Empty:
break
else:
instance = task['test']
pb = ProjectBuilder(instance, self.env, self.jobserver)
pb.duts = self.duts
pb.process(pipeline, done_queue, task, lock, results)
return True
def execute(self, pipeline, done):
lock = Lock()
logger.info("Adding tasks to the queue...")
self.add_tasks_to_queue(pipeline, self.options.build_only, self.options.test_only,
retry_build_errors=self.options.retry_build_errors)
logger.info("Added initial list of jobs to queue")
processes = []
for _ in range(self.jobs):
p = Process(target=self.pipeline_mgr, args=(pipeline, done, lock, self.results, ))
processes.append(p)
p.start()
logger.debug(f"Launched {self.jobs} jobs")
try:
for p in processes:
p.join()
except KeyboardInterrupt:
logger.info("Execution interrupted")
for p in processes:
p.terminate()
@staticmethod
def get_cmake_filter_stages(filt, logic_keys):
""" Analyze filter expressions from test yaml and decide if dts and/or kconfig based filtering will be needed."""
dts_required = False
kconfig_required = False
full_required = False
filter_stages = []
# Compress args in expressions like "function('x', 'y')" so they are not split when splitting by whitespaces
filt = filt.replace(", ", ",")
# Remove logic words
for k in logic_keys:
filt = filt.replace(f"{k} ", "")
# Remove brackets
filt = filt.replace("(", "")
filt = filt.replace(")", "")
# Splite by whitespaces
filt = filt.split()
for expression in filt:
if expression.startswith("dt_"):
dts_required = True
elif expression.startswith("CONFIG"):
kconfig_required = True
else:
full_required = True
if full_required:
return ["full"]
if dts_required:
filter_stages.append("dts")
if kconfig_required:
filter_stages.append("kconfig")
return filter_stages