blob: 3ae14a50131cccdf6d16529a96bc0b0259ddec39 [file] [log] [blame]
# vim: set syntax=python ts=4 :
#
# Copyright (c) 2018-2025 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
import contextlib
import glob
import logging
import os
import pathlib
import re
import shutil
import subprocess
import sys
import tempfile
logger = logging.getLogger('twister')
logger.setLevel(logging.DEBUG)
supported_coverage_formats = {
"gcovr": ["html", "xml", "csv", "txt", "coveralls", "sonarqube"],
"lcov": ["html", "lcov"]
}
class CoverageTool:
""" Base class for every supported coverage tool
"""
def __init__(self):
self.gcov_tool = None
self.base_dir = None
self.output_formats = None
self.coverage_capture = True
self.coverage_report = True
self.coverage_per_instance = False
self.instances = {}
@staticmethod
def factory(tool, jobs=None):
if tool == 'lcov':
t = Lcov(jobs)
elif tool == 'gcovr':
t = Gcovr()
else:
logger.error(f"Unsupported coverage tool specified: {tool}")
return None
logger.debug(f"Select {tool} as the coverage tool...")
return t
@staticmethod
def retrieve_gcov_data(input_file):
logger.debug(f"Working on {input_file}")
extracted_coverage_info = {}
capture_data = False
capture_complete = False
with open(input_file) as fp:
for line in fp.readlines():
if re.search("GCOV_COVERAGE_DUMP_START", line):
capture_data = True
capture_complete = False
continue
if re.search("GCOV_COVERAGE_DUMP_END", line):
capture_complete = True
# Keep searching for additional dumps
# Loop until the coverage data is found.
if not capture_data:
continue
if line.startswith("*"):
sp = line.split("<")
if len(sp) > 1:
# Remove the leading delimiter "*"
file_name = sp[0][1:]
# Remove the trailing new line char
hex_dump = sp[1][:-1]
else:
continue
else:
continue
if file_name in extracted_coverage_info:
extracted_coverage_info[file_name].append(hex_dump)
else:
extracted_coverage_info[file_name] = [hex_dump]
if not capture_data:
capture_complete = True
return {'complete': capture_complete, 'data': extracted_coverage_info}
def merge_hexdumps(self, hexdumps):
# Only one hexdump
if len(hexdumps) == 1:
return hexdumps[0]
with tempfile.TemporaryDirectory() as dir:
# Write each hexdump to a dedicated temporary folder
dirs = []
for idx, dump in enumerate(hexdumps):
subdir = dir + f'/{idx}'
os.mkdir(subdir)
dirs.append(subdir)
with open(f'{subdir}/tmp.gcda', 'wb') as fp:
fp.write(bytes.fromhex(dump))
# Iteratively call gcov-tool (not gcov) to merge the files
merge_tool = self.gcov_tool + '-tool'
for d1, d2 in zip(dirs[:-1], dirs[1:], strict=False):
cmd = [merge_tool, 'merge', d1, d2, '--output', d2]
subprocess.call(cmd)
# Read back the final output file
with open(f'{dirs[-1]}/tmp.gcda', 'rb') as fp:
return fp.read(-1).hex()
def create_gcda_files(self, extracted_coverage_info):
gcda_created = True
logger.debug(f"Generating {len(extracted_coverage_info)} gcda files")
for filename, hexdumps in extracted_coverage_info.items():
# if kobject_hash is given for coverage gcovr fails
# hence skipping it problem only in gcovr v4.1
if "kobject_hash" in filename:
filename = (filename[:-4]) + "gcno"
with contextlib.suppress(Exception):
os.remove(filename)
continue
try:
hexdump_val = self.merge_hexdumps(hexdumps)
hex_bytes = bytes.fromhex(hexdump_val)
with open(filename, 'wb') as fp:
fp.write(hex_bytes)
except ValueError:
logger.exception(f"Unable to convert hex data for file: {filename}")
gcda_created = False
except FileNotFoundError:
logger.exception(f"Unable to create gcda file: {filename}")
gcda_created = False
return gcda_created
def capture_data(self, outdir):
coverage_completed = True
for filename in glob.glob(f"{outdir}/**/handler.log", recursive=True):
gcov_data = self.__class__.retrieve_gcov_data(filename)
capture_complete = gcov_data['complete']
extracted_coverage_info = gcov_data['data']
if capture_complete:
gcda_created = self.create_gcda_files(extracted_coverage_info)
if gcda_created:
logger.debug(f"Gcov data captured: {filename}")
else:
logger.error(f"Gcov data invalid for: {filename}")
coverage_completed = False
else:
logger.error(f"Gcov data capture incomplete: {filename}")
coverage_completed = False
return coverage_completed
def generate(self, outdir):
coverage_completed = self.capture_data(outdir) if self.coverage_capture else True
if not coverage_completed or not self.coverage_report:
return coverage_completed, {}
build_dirs = None
if not self.coverage_capture and self.coverage_report and self.coverage_per_instance:
build_dirs = [instance.build_dir for instance in self.instances.values()]
reports = {}
with open(os.path.join(outdir, "coverage.log"), "a") as coveragelog:
ret, reports = self._generate(outdir, coveragelog, build_dirs)
if ret == 0:
report_log = {
"html": "HTML report generated: {}".format(
os.path.join(outdir, "coverage", "index.html")
),
"lcov": "LCOV report generated: {}".format(
os.path.join(outdir, "coverage.info")
),
"xml": "XML report generated: {}".format(
os.path.join(outdir, "coverage", "coverage.xml")
),
"csv": "CSV report generated: {}".format(
os.path.join(outdir, "coverage", "coverage.csv")
),
"txt": "TXT report generated: {}".format(
os.path.join(outdir, "coverage", "coverage.txt")
),
"coveralls": "Coveralls report generated: {}".format(
os.path.join(outdir, "coverage", "coverage.coveralls.json")
),
"sonarqube": "Sonarqube report generated: {}".format(
os.path.join(outdir, "coverage", "coverage.sonarqube.xml")
)
}
for r in self.output_formats.split(','):
logger.info(report_log[r])
else:
coverage_completed = False
logger.debug(f"All coverage data processed: {coverage_completed}")
return coverage_completed, reports
class Lcov(CoverageTool):
def __init__(self, jobs=None):
super().__init__()
self.ignores = []
self.ignore_branch_patterns = []
self.output_formats = "lcov,html"
self.version = self.get_version()
self.jobs = jobs
def get_version(self):
try:
result = subprocess.run(
['lcov', '--version'],
capture_output=True,
text=True,
check=True
)
version_output = result.stdout.strip().replace('lcov: LCOV version ', '')
return version_output
except subprocess.CalledProcessError as e:
logger.error(f"Unable to determine lcov version: {e}")
sys.exit(1)
except FileNotFoundError as e:
logger.error(f"Unable to find lcov tool: {e}")
sys.exit(1)
def add_ignore_file(self, pattern):
self.ignores.append('*' + pattern + '*')
def add_ignore_directory(self, pattern):
self.ignores.append('*/' + pattern + '/*')
def add_ignore_branch_pattern(self, pattern):
self.ignore_branch_patterns.append(pattern)
@property
def is_lcov_v2(self):
return self.version.startswith("2")
def run_command(self, cmd, coveragelog):
if self.is_lcov_v2:
# The --ignore-errors source option is added for genhtml as well as
# lcov to avoid it exiting due to
# samples/application_development/external_lib/
cmd += [
"--ignore-errors", "inconsistent,inconsistent",
"--ignore-errors", "negative,negative",
"--ignore-errors", "unused,unused",
"--ignore-errors", "empty,empty",
"--ignore-errors", "mismatch,mismatch",
]
cmd_str = " ".join(cmd)
logger.debug(f"Running {cmd_str}...")
return subprocess.call(cmd, stdout=coveragelog)
def run_lcov(self, args, coveragelog):
if self.is_lcov_v2:
branch_coverage = "branch_coverage=1"
if self.jobs is None:
# Default: --parallel=0 will autodetect appropriate parallelism
parallel = ["--parallel", "0"]
elif self.jobs == 1:
# Serial execution requested, don't parallelize at all
parallel = []
else:
parallel = ["--parallel", str(self.jobs)]
else:
branch_coverage = "lcov_branch_coverage=1"
parallel = []
cmd = [
"lcov", "--gcov-tool", self.gcov_tool,
"--rc", branch_coverage,
] + parallel + args
return self.run_command(cmd, coveragelog)
def _generate(self, outdir, coveragelog, build_dirs=None):
coveragefile = os.path.join(outdir, "coverage.info")
ztestfile = os.path.join(outdir, "ztest.info")
if build_dirs:
files = []
for dir_ in build_dirs:
files_ = [fname for fname in
[os.path.join(dir_, "coverage.info"),
os.path.join(dir_, "ztest.info")]
if os.path.exists(fname)]
if not files_:
logger.debug("Coverage merge no files in: %s", dir_)
continue
files += files_
logger.debug("Coverage merge %d reports in %s", len(files), outdir)
cmd = ["--output-file", coveragefile]
for filename in files:
cmd.append("--add-tracefile")
cmd.append(filename)
else:
cmd = ["--capture", "--directory", outdir, "--output-file", coveragefile]
if self.coverage_per_instance and len(self.instances) == 1:
invalid_chars = re.compile(r"[^A-Za-z0-9_]")
cmd.append("--test-name")
cmd.append(invalid_chars.sub("_", next(iter(self.instances))))
ret = self.run_lcov(cmd, coveragelog)
if ret:
logger.error("LCOV capture report stage failed with %s", ret)
return ret, {}
# We want to remove tests/* and tests/ztest/test/* but save tests/ztest
cmd = ["--extract", coveragefile,
os.path.join(self.base_dir, "tests", "ztest", "*"),
"--output-file", ztestfile]
ret = self.run_lcov(cmd, coveragelog)
if ret:
logger.error("LCOV extract report stage failed with %s", ret)
return ret, {}
files = []
if os.path.exists(ztestfile) and os.path.getsize(ztestfile) > 0:
cmd = ["--remove", ztestfile,
os.path.join(self.base_dir, "tests/ztest/test/*"),
"--output-file", ztestfile]
ret = self.run_lcov(cmd, coveragelog)
if ret:
logger.error("LCOV remove ztest report stage failed with %s", ret)
return ret, {}
files = [coveragefile, ztestfile]
else:
files = [coveragefile]
for i in self.ignores:
cmd = ["--remove", coveragefile, i, "--output-file", coveragefile]
ret = self.run_lcov(cmd, coveragelog)
if ret:
logger.error("LCOV remove ignores report stage failed with %s", ret)
return ret, {}
if 'html' not in self.output_formats.split(','):
return 0, {}
cmd = ["genhtml", "--legend", "--branch-coverage",
"--prefix", self.base_dir,
"-output-directory", os.path.join(outdir, "coverage")]
if self.coverage_per_instance:
cmd.append("--show-details")
cmd += files
ret = self.run_command(cmd, coveragelog)
if ret:
logger.error("LCOV genhtml report stage failed with %s", ret)
# TODO: Add LCOV summary coverage report.
return ret, { 'report': coveragefile, 'ztest': ztestfile, 'summary': None }
class Gcovr(CoverageTool):
def __init__(self):
super().__init__()
self.ignores = []
self.ignore_branch_patterns = []
self.output_formats = "html"
self.version = self.get_version()
# Different ifdef-ed implementations of the same function should not be
# in conflict treated by GCOVR as separate objects for coverage statistics.
self.options = ["-v", "--merge-mode-functions=separate"]
def get_version(self):
try:
result = subprocess.run(
['gcovr', '--version'],
capture_output=True,
text=True,
check=True
)
version_lines = result.stdout.strip().split('\n')
if version_lines:
version_output = version_lines[0].replace('gcovr ', '')
return version_output
except subprocess.CalledProcessError as e:
logger.error(f"Unable to determine gcovr version: {e}")
sys.exit(1)
except FileNotFoundError as e:
logger.error(f"Unable to find gcovr tool: {e}")
sys.exit(1)
def add_ignore_file(self, pattern):
self.ignores.append('.*' + pattern + '.*')
def add_ignore_directory(self, pattern):
self.ignores.append(".*/" + pattern + '/.*')
def add_ignore_branch_pattern(self, pattern):
self.ignore_branch_patterns.append(pattern)
@staticmethod
def _interleave_list(prefix, list):
tuple_list = [(prefix, item) for item in list]
return [item for sublist in tuple_list for item in sublist]
@staticmethod
def _flatten_list(list):
return [a for b in list for a in b]
def collect_coverage(self, outdir, coverage_file, ztest_file, coveragelog):
excludes = Gcovr._interleave_list("-e", self.ignores)
if len(self.ignore_branch_patterns) > 0:
# Last pattern overrides previous values, so merge all patterns together
merged_regex = "|".join([f"({p})" for p in self.ignore_branch_patterns])
excludes += ["--exclude-branches-by-pattern", merged_regex]
# We want to remove tests/* and tests/ztest/test/* but save tests/ztest
cmd = ["gcovr", "-r", self.base_dir,
"--gcov-ignore-parse-errors=negative_hits.warn_once_per_file",
"--gcov-executable", self.gcov_tool,
"-e", "tests/*"]
cmd += excludes + self.options + ["--json", "-o", coverage_file, outdir]
cmd_str = " ".join(cmd)
logger.debug(f"Running: {cmd_str}")
coveragelog.write(f"Running: {cmd_str}\n")
coveragelog.flush()
ret = subprocess.call(cmd, stdout=coveragelog, stderr=coveragelog)
if ret:
logger.error(f"GCOVR failed with {ret}")
return ret, []
cmd = ["gcovr", "-r", self.base_dir] + self.options
cmd += ["--gcov-executable", self.gcov_tool,
"-f", "tests/ztest", "-e", "tests/ztest/test/*",
"--json", "-o", ztest_file, outdir]
cmd_str = " ".join(cmd)
logger.debug(f"Running: {cmd_str}")
coveragelog.write(f"Running: {cmd_str}\n")
coveragelog.flush()
ret = subprocess.call(cmd, stdout=coveragelog, stderr=coveragelog)
if ret:
logger.error(f"GCOVR ztest stage failed with {ret}")
return ret, []
return ret, [file_ for file_ in [coverage_file, ztest_file]
if os.path.exists(file_) and os.path.getsize(file_) > 0]
def _generate(self, outdir, coveragelog, build_dirs=None):
coverage_file = os.path.join(outdir, "coverage.json")
coverage_summary = os.path.join(outdir, "coverage_summary.json")
ztest_file = os.path.join(outdir, "ztest.json")
ret = 0
cmd_ = []
files = []
if build_dirs:
for dir_ in build_dirs:
files_ = [fname for fname in
[os.path.join(dir_, "coverage.json"),
os.path.join(dir_, "ztest.json")]
if os.path.exists(fname)]
if not files_:
logger.debug(f"Coverage merge no files in: {dir_}")
continue
files += files_
logger.debug(f"Coverage merge {len(files)} reports in {outdir}")
ztest_file = None
cmd_ = ["--json-pretty", "--json", coverage_file]
else:
ret, files = self.collect_coverage(outdir, coverage_file, ztest_file, coveragelog)
logger.debug(f"Coverage collected {len(files)} reports from: {outdir}")
if not files:
logger.warning(f"No coverage files to compose report for {outdir}")
return ret, {}
subdir = os.path.join(outdir, "coverage")
os.makedirs(subdir, exist_ok=True)
tracefiles = self._interleave_list("--add-tracefile", files)
# Convert command line argument (comma-separated list) to gcovr flags
report_options = {
"html": ["--html", os.path.join(subdir, "index.html"), "--html-details"],
"xml": ["--xml", os.path.join(subdir, "coverage.xml"), "--xml-pretty"],
"csv": ["--csv", os.path.join(subdir, "coverage.csv")],
"txt": ["--txt", os.path.join(subdir, "coverage.txt")],
"coveralls": ["--coveralls", os.path.join(subdir, "coverage.coveralls.json"),
"--coveralls-pretty"],
"sonarqube": ["--sonarqube", os.path.join(subdir, "coverage.sonarqube.xml")]
}
gcovr_options = self._flatten_list(
[report_options[r] for r in self.output_formats.split(',')]
)
cmd = ["gcovr", "-r", self.base_dir] + self.options + gcovr_options + tracefiles
cmd += cmd_
cmd += ["--json-summary-pretty", "--json-summary", coverage_summary]
cmd_str = " ".join(cmd)
logger.debug(f"Running: {cmd_str}")
coveragelog.write(f"Running: {cmd_str}\n")
coveragelog.flush()
ret = subprocess.call(cmd, stdout=coveragelog, stderr=coveragelog)
if ret:
logger.error(f"GCOVR merge report stage failed with {ret}")
return ret, { 'report': coverage_file, 'ztest': ztest_file, 'summary': coverage_summary }
def choose_gcov_tool(options, is_system_gcov):
gcov_tool = None
if not options.gcov_tool:
zephyr_sdk_gcov_tool = os.path.join(
os.environ.get("ZEPHYR_SDK_INSTALL_DIR", default=""),
"x86_64-zephyr-elf/bin/x86_64-zephyr-elf-gcov")
if os.environ.get("ZEPHYR_TOOLCHAIN_VARIANT") == "llvm":
llvm_path = os.environ.get("LLVM_TOOLCHAIN_PATH")
if llvm_path is not None:
llvm_path = os.path.join(llvm_path, "bin")
llvm_cov = shutil.which("llvm-cov", path=llvm_path)
llvm_cov_ext = pathlib.Path(llvm_cov).suffix
gcov_lnk = os.path.join(options.outdir, f"gcov{llvm_cov_ext}")
try:
os.symlink(llvm_cov, gcov_lnk)
except OSError:
shutil.copy(llvm_cov, gcov_lnk)
gcov_tool = gcov_lnk
elif is_system_gcov:
gcov_tool = "gcov"
elif os.path.exists(zephyr_sdk_gcov_tool):
gcov_tool = zephyr_sdk_gcov_tool
else:
logger.error(
"Can't find a suitable gcov tool. Use --gcov-tool or set ZEPHYR_SDK_INSTALL_DIR."
)
sys.exit(1)
else:
gcov_tool = str(options.gcov_tool)
return gcov_tool
def run_coverage_tool(options, outdir, is_system_gcov, instances,
coverage_capture, coverage_report):
coverage_tool = CoverageTool.factory(options.coverage_tool, jobs=options.jobs)
if not coverage_tool:
return False, {}
coverage_tool.gcov_tool = str(choose_gcov_tool(options, is_system_gcov))
logger.debug(f"Using gcov tool: {coverage_tool.gcov_tool}")
coverage_tool.instances = instances
coverage_tool.coverage_per_instance = options.coverage_per_instance
coverage_tool.coverage_capture = coverage_capture
coverage_tool.coverage_report = coverage_report
coverage_tool.base_dir = os.path.abspath(options.coverage_basedir)
# Apply output format default
if options.coverage_formats is not None:
coverage_tool.output_formats = options.coverage_formats
coverage_tool.add_ignore_file('generated')
coverage_tool.add_ignore_directory('tests')
coverage_tool.add_ignore_directory('samples')
# Ignore branch coverage on LOG_* and LOG_HEXDUMP_* macros
# Branch misses are due to the implementation of Z_LOG2 and cannot be avoided
coverage_tool.add_ignore_branch_pattern(r"^\s*LOG_(?:HEXDUMP_)?(?:DBG|INF|WRN|ERR)\(.*")
# Ignore branch coverage on __ASSERT* macros
# Covering the failing case is not desirable as it will immediately terminate the test.
coverage_tool.add_ignore_branch_pattern(r"^\s*__ASSERT(?:_EVAL|_NO_MSG|_POST_ACTION)?\(.*")
return coverage_tool.generate(outdir)
def has_system_gcov(platform):
return platform and (platform.type in {"native", "unit"})
def run_coverage(options, testplan):
""" Summary code coverage over the full test plan's scope.
"""
is_system_gcov = False
for plat in options.coverage_platform:
if has_system_gcov(testplan.get_platform(plat)):
is_system_gcov = True
break
return run_coverage_tool(options, options.outdir, is_system_gcov,
instances=testplan.instances,
coverage_capture=False,
coverage_report=True)
def run_coverage_instance(options, instance):
""" Per-instance code coverage called by ProjectBuilder ('coverage' operation).
"""
is_system_gcov = has_system_gcov(instance.platform)
return run_coverage_tool(options, instance.build_dir, is_system_gcov,
instances={instance.name: instance},
coverage_capture=True,
coverage_report=options.coverage_per_instance)