blob: 16586a7a20e15dc5e5d1fa93dbebf2a0afb37ff1 [file] [log] [blame]
# vim: set syntax=python ts=4 :
#
# Copyright (c) 2018-2022 Intel Corporation
# Copyright 2022 NXP
# Copyright (c) 2024 Arm Limited (or its affiliates). All rights reserved.
#
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
from enum import Enum
import os
import hashlib
import random
import logging
import shutil
import glob
import csv
from twisterlib.environment import TwisterEnv
from twisterlib.testsuite import TestCase, TestSuite
from twisterlib.platform import Platform
from twisterlib.error import BuildError, StatusAttributeError
from twisterlib.size_calc import SizeCalculator
from twisterlib.statuses import TwisterStatus
from twisterlib.handlers import (
Handler,
SimulationHandler,
BinaryHandler,
QEMUHandler,
QEMUWinHandler,
DeviceHandler,
SUPPORTED_SIMS,
SUPPORTED_SIMS_IN_PYTEST,
)
logger = logging.getLogger('twister')
logger.setLevel(logging.DEBUG)
class TestInstance:
"""Class representing the execution of a particular TestSuite on a platform
@param test The TestSuite object we want to build/execute
@param platform Platform object that we want to build and run against
@param base_outdir Base directory for all test results. The actual
out directory used is <outdir>/<platform>/<test case name>
"""
__test__ = False
def __init__(self, testsuite, platform, outdir):
self.testsuite: TestSuite = testsuite
self.platform: Platform = platform
self._status = TwisterStatus.NONE
self.reason = "Unknown"
self.metrics = dict()
self.handler = None
self.recording = None
self.outdir = outdir
self.execution_time = 0
self.build_time = 0
self.retries = 0
self.name = os.path.join(platform.name, testsuite.name)
self.dut = None
if testsuite.detailed_test_id:
self.build_dir = os.path.join(outdir, platform.normalized_name, testsuite.name)
else:
# if suite is not in zephyr, keep only the part after ".." in reconstructed dir structure
source_dir_rel = testsuite.source_dir_rel.rsplit(os.pardir+os.path.sep, 1)[-1]
self.build_dir = os.path.join(outdir, platform.normalized_name, source_dir_rel, testsuite.name)
self.run_id = self._get_run_id()
self.domains = None
# Instance need to use sysbuild if a given suite or a platform requires it
self.sysbuild = testsuite.sysbuild or platform.sysbuild
self.run = False
self.testcases: list[TestCase] = []
self.init_cases()
self.filters = []
self.filter_type = None
def record(self, recording, fname_csv="recording.csv"):
if recording:
if self.recording is None:
self.recording = recording.copy()
else:
self.recording.extend(recording)
filename = os.path.join(self.build_dir, fname_csv)
with open(filename, "wt") as csvfile:
cw = csv.DictWriter(csvfile,
fieldnames = self.recording[0].keys(),
lineterminator = os.linesep,
quoting = csv.QUOTE_NONNUMERIC)
cw.writeheader()
cw.writerows(self.recording)
@property
def status(self) -> TwisterStatus:
return self._status
@status.setter
def status(self, value : TwisterStatus) -> None:
# Check for illegal assignments by value
try:
key = value.name if isinstance(value, Enum) else value
self._status = TwisterStatus[key]
except KeyError:
raise StatusAttributeError(self.__class__, value)
def add_filter(self, reason, filter_type):
self.filters.append({'type': filter_type, 'reason': reason })
self.status = TwisterStatus.FILTER
self.reason = reason
self.filter_type = filter_type
# Fix an issue with copying objects from testsuite, need better solution.
def init_cases(self):
for c in self.testsuite.testcases:
self.add_testcase(c.name, freeform=c.freeform)
def _get_run_id(self):
""" generate run id from instance unique identifier and a random
number
If exist, get cached run id from previous run."""
run_id = ""
run_id_file = os.path.join(self.build_dir, "run_id.txt")
if os.path.exists(run_id_file):
with open(run_id_file, "r") as fp:
run_id = fp.read()
else:
hash_object = hashlib.md5(self.name.encode())
random_str = f"{random.getrandbits(64)}".encode()
hash_object.update(random_str)
run_id = hash_object.hexdigest()
os.makedirs(self.build_dir, exist_ok=True)
with open(run_id_file, 'w+') as fp:
fp.write(run_id)
return run_id
def add_missing_case_status(self, status, reason=None):
for case in self.testcases:
if case.status == TwisterStatus.STARTED:
case.status = TwisterStatus.FAIL
elif case.status == TwisterStatus.NONE:
case.status = status
if reason:
case.reason = reason
else:
case.reason = self.reason
def __getstate__(self):
d = self.__dict__.copy()
return d
def __setstate__(self, d):
self.__dict__.update(d)
def __lt__(self, other):
return self.name < other.name
def set_case_status_by_name(self, name, status, reason=None):
tc = self.get_case_or_create(name)
tc.status = status
if reason:
tc.reason = reason
return tc
def add_testcase(self, name, freeform=False):
tc = TestCase(name=name)
tc.freeform = freeform
self.testcases.append(tc)
return tc
def get_case_by_name(self, name):
for c in self.testcases:
if c.name == name:
return c
return None
def get_case_or_create(self, name):
for c in self.testcases:
if c.name == name:
return c
logger.debug(f"Could not find a matching testcase for {name}")
tc = TestCase(name=name)
self.testcases.append(tc)
return tc
@staticmethod
def testsuite_runnable(testsuite, fixtures):
can_run = False
# console harness allows us to run the test and capture data.
if testsuite.harness in [ 'console', 'ztest', 'pytest', 'test', 'gtest', 'robot']:
can_run = True
# if we have a fixture that is also being supplied on the
# command-line, then we need to run the test, not just build it.
fixture = testsuite.harness_config.get('fixture')
if fixture:
can_run = fixture in map(lambda f: f.split(sep=':')[0], fixtures)
return can_run
def setup_handler(self, env: TwisterEnv):
# only setup once.
if self.handler:
return
options = env.options
common_args = (options, env.generator_cmd, not options.disable_suite_name_check)
if options.device_testing:
handler = DeviceHandler(self, "device", *common_args)
handler.call_make_run = False
handler.ready = True
elif self.platform.simulation != "na":
if self.platform.simulation == "qemu":
if os.name != "nt":
handler = QEMUHandler(self, "qemu", *common_args)
else:
handler = QEMUWinHandler(self, "qemu", *common_args)
handler.args.append(f"QEMU_PIPE={handler.get_fifo()}")
handler.ready = True
else:
handler = SimulationHandler(self, self.platform.simulation, *common_args)
if self.platform.simulation_exec and shutil.which(self.platform.simulation_exec):
handler.ready = True
elif self.testsuite.type == "unit":
handler = BinaryHandler(self, "unit", *common_args)
handler.binary = os.path.join(self.build_dir, "testbinary")
if options.enable_coverage:
handler.args.append("COVERAGE=1")
handler.call_make_run = False
handler.ready = True
else:
handler = Handler(self, "", *common_args)
self.handler = handler
# Global testsuite parameters
def check_runnable(self, enable_slow=False, filter='buildable', fixtures=[], hardware_map=None):
if os.name == 'nt':
# running on simulators is currently supported only for QEMU on Windows
if self.platform.simulation not in ('na', 'qemu'):
return False
# check presence of QEMU on Windows
if self.platform.simulation == 'qemu' and 'QEMU_BIN_PATH' not in os.environ:
return False
# we asked for build-only on the command line
if self.testsuite.build_only:
return False
# Do not run slow tests:
skip_slow = self.testsuite.slow and not enable_slow
if skip_slow:
return False
target_ready = bool(self.testsuite.type == "unit" or \
self.platform.type == "native" or \
(self.platform.simulation in SUPPORTED_SIMS and \
self.platform.simulation not in self.testsuite.simulation_exclude) or \
filter == 'runnable')
# check if test is runnable in pytest
if self.testsuite.harness == 'pytest':
target_ready = bool(filter == 'runnable' or self.platform.simulation in SUPPORTED_SIMS_IN_PYTEST)
SUPPORTED_SIMS_WITH_EXEC = ['nsim', 'mdb-nsim', 'renode', 'tsim', 'native', 'simics']
if filter != 'runnable' and \
self.platform.simulation in SUPPORTED_SIMS_WITH_EXEC and \
self.platform.simulation_exec:
if not shutil.which(self.platform.simulation_exec):
target_ready = False
testsuite_runnable = self.testsuite_runnable(self.testsuite, fixtures)
if hardware_map:
for h in hardware_map.duts:
if (h.platform == self.platform.name and
self.testsuite_runnable(self.testsuite, h.fixtures)):
testsuite_runnable = True
break
return testsuite_runnable and target_ready
def create_overlay(self, platform, enable_asan=False, enable_ubsan=False, enable_coverage=False, coverage_platform=[]):
# Create this in a "twister/" subdirectory otherwise this
# will pass this overlay to kconfig.py *twice* and kconfig.cmake
# will silently give that second time precedence over any
# --extra-args=CONFIG_*
subdir = os.path.join(self.build_dir, "twister")
content = ""
if self.testsuite.extra_configs:
new_config_list = []
# some configs might be conditional on arch or platform, see if we
# have a namespace defined and apply only if the namespace matches.
# we currently support both arch: and platform:
for config in self.testsuite.extra_configs:
cond_config = config.split(":")
if cond_config[0] == "arch" and len(cond_config) == 3:
if self.platform.arch == cond_config[1]:
new_config_list.append(cond_config[2])
elif cond_config[0] == "platform" and len(cond_config) == 3:
if self.platform.name == cond_config[1]:
new_config_list.append(cond_config[2])
else:
new_config_list.append(config)
content = "\n".join(new_config_list)
if enable_coverage:
if platform.name in coverage_platform:
content = content + "\nCONFIG_COVERAGE=y"
content = content + "\nCONFIG_COVERAGE_DUMP=y"
if enable_asan:
if platform.type == "native":
content = content + "\nCONFIG_ASAN=y"
if enable_ubsan:
if platform.type == "native":
content = content + "\nCONFIG_UBSAN=y"
if content:
os.makedirs(subdir, exist_ok=True)
file = os.path.join(subdir, "testsuite_extra.conf")
with open(file, "w", encoding='utf-8') as f:
f.write(content)
return content
def calculate_sizes(self, from_buildlog: bool = False, generate_warning: bool = True) -> SizeCalculator:
"""Get the RAM/ROM sizes of a test case.
This can only be run after the instance has been executed by
MakeGenerator, otherwise there won't be any binaries to measure.
@return A SizeCalculator object
"""
elf_filepath = self.get_elf_file()
buildlog_filepath = self.get_buildlog_file() if from_buildlog else ''
return SizeCalculator(elf_filename=elf_filepath,
extra_sections=self.testsuite.extra_sections,
buildlog_filepath=buildlog_filepath,
generate_warning=generate_warning)
def get_elf_file(self) -> str:
if self.sysbuild:
build_dir = self.domains.get_default_domain().build_dir
else:
build_dir = self.build_dir
fns = glob.glob(os.path.join(build_dir, "zephyr", "*.elf"))
fns.extend(glob.glob(os.path.join(build_dir, "testbinary")))
blocklist = [
'remapped', # used for xtensa plaforms
'zefi', # EFI for Zephyr
'qemu', # elf files generated after running in qemu
'_pre']
fns = [x for x in fns if not any(bad in os.path.basename(x) for bad in blocklist)]
if not fns:
raise BuildError("Missing output binary")
elif len(fns) > 1:
logger.warning(f"multiple ELF files detected: {', '.join(fns)}")
return fns[0]
def get_buildlog_file(self) -> str:
"""Get path to build.log file.
@raises BuildError: Incorrect amount (!=1) of build logs.
@return: Path to build.log (str).
"""
buildlog_paths = glob.glob(os.path.join(self.build_dir, "build.log"))
if len(buildlog_paths) != 1:
raise BuildError("Missing/multiple build.log file.")
return buildlog_paths[0]
def __repr__(self):
return "<TestSuite %s on %s>" % (self.testsuite.name, self.platform.name)