|  | #!/usr/bin/env python3 | 
|  | # vim: set syntax=python ts=4 : | 
|  | # SPDX-License-Identifier: Apache-2.0 | 
|  | """Zephyr Sanity Tests | 
|  |  | 
|  | Also check the "User and Developer Guides" at https://docs.zephyrproject.org/ | 
|  |  | 
|  | This script scans for the set of unit test applications in the git | 
|  | repository and attempts to execute them. By default, it tries to | 
|  | build each test case on one platform per architecture, using a precedence | 
|  | list defined in an architecture configuration file, and if possible | 
|  | run the tests in any available emulators or simulators on the system. | 
|  |  | 
|  | Test cases are detected by the presence of a 'testcase.yaml' or a sample.yaml | 
|  | files in the application's project directory. This file may contain one or more | 
|  | blocks, each identifying a test scenario. The title of the block is a name for | 
|  | the test case, which only needs to be unique for the test cases specified in | 
|  | that testcase meta-data. The full canonical name for each test case is <path to | 
|  | test case>/<block>. | 
|  |  | 
|  | Each test block in the testcase meta data can define the following key/value | 
|  | pairs: | 
|  |  | 
|  | tags: <list of tags> (required) | 
|  | A set of string tags for the testcase. Usually pertains to | 
|  | functional domains but can be anything. Command line invocations | 
|  | of this script can filter the set of tests to run based on tag. | 
|  |  | 
|  | skip: <True|False> (default False) | 
|  | skip testcase unconditionally. This can be used for broken tests. | 
|  |  | 
|  | slow: <True|False> (default False) | 
|  | Don't build or run this test case unless --enable-slow was passed | 
|  | in on the command line. Intended for time-consuming test cases | 
|  | that are only run under certain circumstances, like daily | 
|  | builds. | 
|  |  | 
|  | extra_args: <list of extra arguments> | 
|  | Extra cache entries to pass to CMake when building or running the | 
|  | test case. | 
|  |  | 
|  | extra_configs: <list of extra configurations> | 
|  | Extra configuration options to be merged with a master prj.conf | 
|  | when building or running the test case. | 
|  |  | 
|  | build_only: <True|False> (default False) | 
|  | If true, don't try to run the test even if the selected platform | 
|  | supports it. | 
|  |  | 
|  | build_on_all: <True|False> (default False) | 
|  | If true, attempt to build test on all available platforms. | 
|  |  | 
|  | depends_on: <list of features> | 
|  | A board or platform can announce what features it supports, this option | 
|  | will enable the test only those platforms that provide this feature. | 
|  |  | 
|  | min_ram: <integer> | 
|  | minimum amount of RAM needed for this test to build and run. This is | 
|  | compared with information provided by the board metadata. | 
|  |  | 
|  | min_flash: <integer> | 
|  | minimum amount of ROM needed for this test to build and run. This is | 
|  | compared with information provided by the board metadata. | 
|  |  | 
|  | timeout: <number of seconds> | 
|  | Length of time to run test in emulator before automatically killing it. | 
|  | Default to 60 seconds. | 
|  |  | 
|  | arch_whitelist: <list of arches, such as x86, arm, arc> | 
|  | Set of architectures that this test case should only be run for. | 
|  |  | 
|  | arch_exclude: <list of arches, such as x86, arm, arc> | 
|  | Set of architectures that this test case should not run on. | 
|  |  | 
|  | platform_whitelist: <list of platforms> | 
|  | Set of platforms that this test case should only be run for. | 
|  |  | 
|  | platform_exclude: <list of platforms> | 
|  | Set of platforms that this test case should not run on. | 
|  |  | 
|  | extra_sections: <list of extra binary sections> | 
|  | When computing sizes, sanitycheck will report errors if it finds | 
|  | extra, unexpected sections in the Zephyr binary unless they are named | 
|  | here. They will not be included in the size calculation. | 
|  |  | 
|  | filter: <expression> | 
|  | Filter whether the testcase should be run by evaluating an expression | 
|  | against an environment containing the following values: | 
|  |  | 
|  | { ARCH : <architecture>, | 
|  | PLATFORM : <platform>, | 
|  | <all CONFIG_* key/value pairs in the test's generated defconfig>, | 
|  | <all DT_* key/value pairs in the test's generated device tree file>, | 
|  | <all CMake key/value pairs in the test's generated CMakeCache.txt file>, | 
|  | *<env>: any environment variable available | 
|  | } | 
|  |  | 
|  | The grammar for the expression language is as follows: | 
|  |  | 
|  | expression ::= expression "and" expression | 
|  | | expression "or" expression | 
|  | | "not" expression | 
|  | | "(" expression ")" | 
|  | | symbol "==" constant | 
|  | | symbol "!=" constant | 
|  | | symbol "<" number | 
|  | | symbol ">" number | 
|  | | symbol ">=" number | 
|  | | symbol "<=" number | 
|  | | symbol "in" list | 
|  | | symbol ":" string | 
|  | | symbol | 
|  |  | 
|  | list ::= "[" list_contents "]" | 
|  |  | 
|  | list_contents ::= constant | 
|  | | list_contents "," constant | 
|  |  | 
|  | constant ::= number | 
|  | | string | 
|  |  | 
|  |  | 
|  | For the case where expression ::= symbol, it evaluates to true | 
|  | if the symbol is defined to a non-empty string. | 
|  |  | 
|  | Operator precedence, starting from lowest to highest: | 
|  |  | 
|  | or (left associative) | 
|  | and (left associative) | 
|  | not (right associative) | 
|  | all comparison operators (non-associative) | 
|  |  | 
|  | arch_whitelist, arch_exclude, platform_whitelist, platform_exclude | 
|  | are all syntactic sugar for these expressions. For instance | 
|  |  | 
|  | arch_exclude = x86 arc | 
|  |  | 
|  | Is the same as: | 
|  |  | 
|  | filter = not ARCH in ["x86", "arc"] | 
|  |  | 
|  | The ':' operator compiles the string argument as a regular expression, | 
|  | and then returns a true value only if the symbol's value in the environment | 
|  | matches. For example, if CONFIG_SOC="stm32f107xc" then | 
|  |  | 
|  | filter = CONFIG_SOC : "stm.*" | 
|  |  | 
|  | Would match it. | 
|  |  | 
|  | The set of test cases that actually run depends on directives in the testcase | 
|  | filed and options passed in on the command line. If there is any confusion, | 
|  | running with -v or examining the discard report (sanitycheck_discard.csv) | 
|  | can help show why particular test cases were skipped. | 
|  |  | 
|  | Metrics (such as pass/fail state and binary size) for the last code | 
|  | release are stored in scripts/sanity_chk/sanity_last_release.csv. | 
|  | To update this, pass the --all --release options. | 
|  |  | 
|  | To load arguments from a file, write '+' before the file name, e.g., | 
|  | +file_name. File content must be one or more valid arguments separated by | 
|  | line break instead of white spaces. | 
|  |  | 
|  | Most everyday users will run with no arguments. | 
|  |  | 
|  | """ | 
|  |  | 
|  | import os | 
|  | import contextlib | 
|  | import string | 
|  | import mmap | 
|  | import argparse | 
|  | import sys | 
|  | import re | 
|  | import subprocess | 
|  | import multiprocessing | 
|  | import select | 
|  | import shutil | 
|  | import shlex | 
|  | import signal | 
|  | import threading | 
|  | import concurrent.futures | 
|  | from threading import BoundedSemaphore | 
|  | import queue | 
|  | import time | 
|  | import csv | 
|  | import yaml | 
|  | import glob | 
|  | import serial | 
|  | import concurrent | 
|  | import xml.etree.ElementTree as ET | 
|  | import logging | 
|  | from colorama import Fore | 
|  | from collections import OrderedDict | 
|  | from itertools import islice | 
|  | from pathlib import Path | 
|  | from distutils.spawn import find_executable | 
|  |  | 
|  | try: | 
|  | from anytree import Node, RenderTree, find | 
|  | except ImportError: | 
|  | print("Install the anytree module to use the --test-tree option") | 
|  |  | 
|  | try: | 
|  | from tabulate import tabulate | 
|  | except ImportError: | 
|  | print("Install tabulate python module with pip to use --device-testing option.") | 
|  |  | 
|  | ZEPHYR_BASE = os.getenv("ZEPHYR_BASE") | 
|  | if not ZEPHYR_BASE: | 
|  | sys.exit("$ZEPHYR_BASE environment variable undefined") | 
|  |  | 
|  | sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts", "dts")) | 
|  | import edtlib | 
|  |  | 
|  | hw_map_local = threading.Lock() | 
|  | report_lock = threading.Lock() | 
|  |  | 
|  | # Use this for internal comparisons; that's what canonicalization is | 
|  | # for. Don't use it when invoking other components of the build system | 
|  | # to avoid confusing and hard to trace inconsistencies in error messages | 
|  | # and logs, generated Makefiles, etc. compared to when users invoke these | 
|  | # components directly. | 
|  | # Note "normalization" is different from canonicalization, see os.path. | 
|  | canonical_zephyr_base = os.path.realpath(ZEPHYR_BASE) | 
|  |  | 
|  | sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts/")) | 
|  |  | 
|  | from sanity_chk import scl | 
|  | from sanity_chk import expr_parser | 
|  |  | 
|  | VERBOSE = 0 | 
|  |  | 
|  | RELEASE_DATA = os.path.join(ZEPHYR_BASE, "scripts", "sanity_chk", | 
|  | "sanity_last_release.csv") | 
|  |  | 
|  |  | 
|  | logger = logging.getLogger('sanitycheck') | 
|  | logger.setLevel(logging.DEBUG) | 
|  |  | 
|  |  | 
|  | class CMakeCacheEntry: | 
|  | '''Represents a CMake cache entry. | 
|  |  | 
|  | This class understands the type system in a CMakeCache.txt, and | 
|  | converts the following cache types to Python types: | 
|  |  | 
|  | Cache Type    Python type | 
|  | ----------    ------------------------------------------- | 
|  | FILEPATH      str | 
|  | PATH          str | 
|  | STRING        str OR list of str (if ';' is in the value) | 
|  | BOOL          bool | 
|  | INTERNAL      str OR list of str (if ';' is in the value) | 
|  | ----------    ------------------------------------------- | 
|  | ''' | 
|  |  | 
|  | # Regular expression for a cache entry. | 
|  | # | 
|  | # CMake variable names can include escape characters, allowing a | 
|  | # wider set of names than is easy to match with a regular | 
|  | # expression. To be permissive here, use a non-greedy match up to | 
|  | # the first colon (':'). This breaks if the variable name has a | 
|  | # colon inside, but it's good enough. | 
|  | CACHE_ENTRY = re.compile( | 
|  | r'''(?P<name>.*?)                               # name | 
|  | :(?P<type>FILEPATH|PATH|STRING|BOOL|INTERNAL)  # type | 
|  | =(?P<value>.*)                                 # value | 
|  | ''', re.X) | 
|  |  | 
|  | @classmethod | 
|  | def _to_bool(cls, val): | 
|  | # Convert a CMake BOOL string into a Python bool. | 
|  | # | 
|  | #   "True if the constant is 1, ON, YES, TRUE, Y, or a | 
|  | #   non-zero number. False if the constant is 0, OFF, NO, | 
|  | #   FALSE, N, IGNORE, NOTFOUND, the empty string, or ends in | 
|  | #   the suffix -NOTFOUND. Named boolean constants are | 
|  | #   case-insensitive. If the argument is not one of these | 
|  | #   constants, it is treated as a variable." | 
|  | # | 
|  | # https://cmake.org/cmake/help/v3.0/command/if.html | 
|  | val = val.upper() | 
|  | if val in ('ON', 'YES', 'TRUE', 'Y'): | 
|  | return 1 | 
|  | elif val in ('OFF', 'NO', 'FALSE', 'N', 'IGNORE', 'NOTFOUND', ''): | 
|  | return 0 | 
|  | elif val.endswith('-NOTFOUND'): | 
|  | return 0 | 
|  | else: | 
|  | try: | 
|  | v = int(val) | 
|  | return v != 0 | 
|  | except ValueError as exc: | 
|  | raise ValueError('invalid bool {}'.format(val)) from exc | 
|  |  | 
|  | @classmethod | 
|  | def from_line(cls, line, line_no): | 
|  | # Comments can only occur at the beginning of a line. | 
|  | # (The value of an entry could contain a comment character). | 
|  | if line.startswith('//') or line.startswith('#'): | 
|  | return None | 
|  |  | 
|  | # Whitespace-only lines do not contain cache entries. | 
|  | if not line.strip(): | 
|  | return None | 
|  |  | 
|  | m = cls.CACHE_ENTRY.match(line) | 
|  | if not m: | 
|  | return None | 
|  |  | 
|  | name, type_, value = (m.group(g) for g in ('name', 'type', 'value')) | 
|  | if type_ == 'BOOL': | 
|  | try: | 
|  | value = cls._to_bool(value) | 
|  | except ValueError as exc: | 
|  | args = exc.args + ('on line {}: {}'.format(line_no, line),) | 
|  | raise ValueError(args) from exc | 
|  | elif type_ in ['STRING', 'INTERNAL']: | 
|  | # If the value is a CMake list (i.e. is a string which | 
|  | # contains a ';'), convert to a Python list. | 
|  | if ';' in value: | 
|  | value = value.split(';') | 
|  |  | 
|  | return CMakeCacheEntry(name, value) | 
|  |  | 
|  | def __init__(self, name, value): | 
|  | self.name = name | 
|  | self.value = value | 
|  |  | 
|  | def __str__(self): | 
|  | fmt = 'CMakeCacheEntry(name={}, value={})' | 
|  | return fmt.format(self.name, self.value) | 
|  |  | 
|  |  | 
|  | class CMakeCache: | 
|  | '''Parses and represents a CMake cache file.''' | 
|  |  | 
|  | @staticmethod | 
|  | def from_file(cache_file): | 
|  | return CMakeCache(cache_file) | 
|  |  | 
|  | def __init__(self, cache_file): | 
|  | self.cache_file = cache_file | 
|  | self.load(cache_file) | 
|  |  | 
|  | def load(self, cache_file): | 
|  | entries = [] | 
|  | with open(cache_file, 'r') as cache: | 
|  | for line_no, line in enumerate(cache): | 
|  | entry = CMakeCacheEntry.from_line(line, line_no) | 
|  | if entry: | 
|  | entries.append(entry) | 
|  | self._entries = OrderedDict((e.name, e) for e in entries) | 
|  |  | 
|  | def get(self, name, default=None): | 
|  | entry = self._entries.get(name) | 
|  | if entry is not None: | 
|  | return entry.value | 
|  | else: | 
|  | return default | 
|  |  | 
|  | def get_list(self, name, default=None): | 
|  | if default is None: | 
|  | default = [] | 
|  | entry = self._entries.get(name) | 
|  | if entry is not None: | 
|  | value = entry.value | 
|  | if isinstance(value, list): | 
|  | return value | 
|  | elif isinstance(value, str): | 
|  | return [value] if value else [] | 
|  | else: | 
|  | msg = 'invalid value {} type {}' | 
|  | raise RuntimeError(msg.format(value, type(value))) | 
|  | else: | 
|  | return default | 
|  |  | 
|  | def __contains__(self, name): | 
|  | return name in self._entries | 
|  |  | 
|  | def __getitem__(self, name): | 
|  | return self._entries[name].value | 
|  |  | 
|  | def __setitem__(self, name, entry): | 
|  | if not isinstance(entry, CMakeCacheEntry): | 
|  | msg = 'improper type {} for value {}, expecting CMakeCacheEntry' | 
|  | raise TypeError(msg.format(type(entry), entry)) | 
|  | self._entries[name] = entry | 
|  |  | 
|  | def __delitem__(self, name): | 
|  | del self._entries[name] | 
|  |  | 
|  | def __iter__(self): | 
|  | return iter(self._entries.values()) | 
|  |  | 
|  |  | 
|  | class SanityCheckException(Exception): | 
|  | pass | 
|  |  | 
|  |  | 
|  | class SanityRuntimeError(SanityCheckException): | 
|  | pass | 
|  |  | 
|  |  | 
|  | class ConfigurationError(SanityCheckException): | 
|  | def __init__(self, cfile, message): | 
|  | SanityCheckException.__init__(self, cfile + ": " + message) | 
|  |  | 
|  |  | 
|  | class BuildError(SanityCheckException): | 
|  | pass | 
|  |  | 
|  |  | 
|  | class ExecutionError(SanityCheckException): | 
|  | pass | 
|  |  | 
|  |  | 
|  | class HarnessImporter: | 
|  |  | 
|  | def __init__(self, name): | 
|  | sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts/sanity_chk")) | 
|  | module = __import__("harness") | 
|  | if name: | 
|  | my_class = getattr(module, name) | 
|  | else: | 
|  | my_class = getattr(module, "Test") | 
|  |  | 
|  | self.instance = my_class() | 
|  |  | 
|  |  | 
|  | class Handler: | 
|  | def __init__(self, instance, type_str="build"): | 
|  | """Constructor | 
|  |  | 
|  | """ | 
|  | self.lock = threading.Lock() | 
|  |  | 
|  | self.state = "waiting" | 
|  | self.run = False | 
|  | self.duration = 0 | 
|  | self.type_str = type_str | 
|  |  | 
|  | self.binary = None | 
|  | self.pid_fn = None | 
|  | self.call_make_run = False | 
|  |  | 
|  | self.name = instance.name | 
|  | self.instance = instance | 
|  | self.timeout = instance.testcase.timeout | 
|  | self.sourcedir = instance.testcase.source_dir | 
|  | self.build_dir = instance.build_dir | 
|  | self.log = os.path.join(self.build_dir, "handler.log") | 
|  | self.returncode = 0 | 
|  | self.set_state("running", self.duration) | 
|  |  | 
|  | self.args = [] | 
|  |  | 
|  | def set_state(self, state, duration): | 
|  | self.lock.acquire() | 
|  | self.state = state | 
|  | self.duration = duration | 
|  | self.lock.release() | 
|  |  | 
|  | def get_state(self): | 
|  | self.lock.acquire() | 
|  | ret = (self.state, self.duration) | 
|  | self.lock.release() | 
|  | return ret | 
|  |  | 
|  | def record(self, harness): | 
|  | if harness.recording: | 
|  | filename = os.path.join(self.build_dir, "recording.csv") | 
|  | with open(filename, "at") as csvfile: | 
|  | cw = csv.writer(csvfile, harness.fieldnames, lineterminator=os.linesep) | 
|  | cw.writerow(harness.fieldnames) | 
|  | for instance in harness.recording: | 
|  | cw.writerow(instance) | 
|  |  | 
|  |  | 
|  | class BinaryHandler(Handler): | 
|  | def __init__(self, instance, type_str): | 
|  | """Constructor | 
|  |  | 
|  | @param instance Test Instance | 
|  | """ | 
|  | super().__init__(instance, type_str) | 
|  |  | 
|  | self.terminated = False | 
|  |  | 
|  | # Tool options | 
|  | self.valgrind = False | 
|  | self.lsan = False | 
|  | self.asan = False | 
|  | self.coverage = False | 
|  |  | 
|  | def try_kill_process_by_pid(self): | 
|  | if self.pid_fn: | 
|  | pid = int(open(self.pid_fn).read()) | 
|  | os.unlink(self.pid_fn) | 
|  | self.pid_fn = None  # clear so we don't try to kill the binary twice | 
|  | try: | 
|  | os.kill(pid, signal.SIGTERM) | 
|  | except ProcessLookupError: | 
|  | pass | 
|  |  | 
|  | def terminate(self, proc): | 
|  | # encapsulate terminate functionality so we do it consistently where ever | 
|  | # we might want to terminate the proc.  We need try_kill_process_by_pid | 
|  | # because of both how newer ninja (1.6.0 or greater) and .NET / renode | 
|  | # work.  Newer ninja's don't seem to pass SIGTERM down to the children | 
|  | # so we need to use try_kill_process_by_pid. | 
|  | self.try_kill_process_by_pid() | 
|  | proc.terminate() | 
|  | self.terminated = True | 
|  |  | 
|  | def _output_reader(self, proc, harness): | 
|  | log_out_fp = open(self.log, "wt") | 
|  | for line in iter(proc.stdout.readline, b''): | 
|  | logger.debug("OUTPUT: {0}".format(line.decode('utf-8').rstrip())) | 
|  | log_out_fp.write(line.decode('utf-8')) | 
|  | log_out_fp.flush() | 
|  | harness.handle(line.decode('utf-8').rstrip()) | 
|  | if harness.state: | 
|  | try: | 
|  | # POSIX arch based ztests end on their own, | 
|  | # so let's give it up to 100ms to do so | 
|  | proc.wait(0.1) | 
|  | except subprocess.TimeoutExpired: | 
|  | self.terminate(proc) | 
|  | break | 
|  |  | 
|  | log_out_fp.close() | 
|  |  | 
|  | def handle(self): | 
|  |  | 
|  | harness_name = self.instance.testcase.harness.capitalize() | 
|  | harness_import = HarnessImporter(harness_name) | 
|  | harness = harness_import.instance | 
|  | harness.configure(self.instance) | 
|  |  | 
|  | if self.call_make_run: | 
|  | command = [get_generator()[0], "run"] | 
|  | else: | 
|  | command = [self.binary] | 
|  |  | 
|  | run_valgrind = False | 
|  | if self.valgrind and shutil.which("valgrind"): | 
|  | command = ["valgrind", "--error-exitcode=2", | 
|  | "--leak-check=full", | 
|  | "--suppressions=" + ZEPHYR_BASE + "/scripts/valgrind.supp", | 
|  | "--log-file=" + self.build_dir + "/valgrind.log" | 
|  | ] + command | 
|  | run_valgrind = True | 
|  |  | 
|  | logger.debug("Spawning process: " + | 
|  | " ".join(shlex.quote(word) for word in command) + os.linesep + | 
|  | "in directory: " + self.build_dir) | 
|  |  | 
|  | start_time = time.time() | 
|  |  | 
|  | env = os.environ.copy() | 
|  | if self.asan: | 
|  | env["ASAN_OPTIONS"] = "log_path=stdout:" + \ | 
|  | env.get("ASAN_OPTIONS", "") | 
|  | if not self.lsan: | 
|  | env["ASAN_OPTIONS"] += "detect_leaks=0" | 
|  |  | 
|  | with subprocess.Popen(command, stdout=subprocess.PIPE, | 
|  | stderr=subprocess.PIPE, cwd=self.build_dir, env=env) as proc: | 
|  | logger.debug("Spawning BinaryHandler Thread for %s" % self.name) | 
|  | t = threading.Thread(target=self._output_reader, args=(proc, harness,), daemon=True) | 
|  | t.start() | 
|  | t.join(self.timeout) | 
|  | if t.is_alive(): | 
|  | self.terminate(proc) | 
|  | t.join() | 
|  | proc.wait() | 
|  | self.returncode = proc.returncode | 
|  |  | 
|  | handler_time = time.time() - start_time | 
|  |  | 
|  | if self.coverage: | 
|  | subprocess.call(["GCOV_PREFIX=" + self.build_dir, | 
|  | "gcov", self.sourcedir, "-b", "-s", self.build_dir], shell=True) | 
|  |  | 
|  | self.try_kill_process_by_pid() | 
|  |  | 
|  | # FIXME: This is needed when killing the simulator, the console is | 
|  | # garbled and needs to be reset. Did not find a better way to do that. | 
|  |  | 
|  | subprocess.call(["stty", "sane"]) | 
|  | self.instance.results = harness.tests | 
|  |  | 
|  | if not self.terminated and self.returncode != 0: | 
|  | # When a process is killed, the default handler returns 128 + SIGTERM | 
|  | # so in that case the return code itself is not meaningful | 
|  | self.set_state("failed", handler_time) | 
|  | self.instance.reason = "Failed" | 
|  | elif run_valgrind and self.returncode == 2: | 
|  | self.set_state("failed", handler_time) | 
|  | self.instance.reason = "Valgrind error" | 
|  | elif harness.state: | 
|  | self.set_state(harness.state, handler_time) | 
|  | else: | 
|  | self.set_state("timeout", handler_time) | 
|  | self.instance.reason = "Timeout" | 
|  |  | 
|  | self.record(harness) | 
|  |  | 
|  |  | 
|  | class DeviceHandler(Handler): | 
|  |  | 
|  | def __init__(self, instance, type_str): | 
|  | """Constructor | 
|  |  | 
|  | @param instance Test Instance | 
|  | """ | 
|  | super().__init__(instance, type_str) | 
|  |  | 
|  | self.suite = None | 
|  |  | 
|  | def monitor_serial(self, ser, halt_fileno, harness): | 
|  | log_out_fp = open(self.log, "wt") | 
|  |  | 
|  | ser_fileno = ser.fileno() | 
|  | readlist = [halt_fileno, ser_fileno] | 
|  |  | 
|  | while ser.isOpen(): | 
|  | readable, _, _ = select.select(readlist, [], [], self.timeout) | 
|  |  | 
|  | if halt_fileno in readable: | 
|  | logger.debug('halted') | 
|  | ser.close() | 
|  | break | 
|  | if ser_fileno not in readable: | 
|  | continue  # Timeout. | 
|  |  | 
|  | serial_line = None | 
|  | try: | 
|  | serial_line = ser.readline() | 
|  | except TypeError: | 
|  | pass | 
|  | except serial.SerialException: | 
|  | ser.close() | 
|  | break | 
|  |  | 
|  | # Just because ser_fileno has data doesn't mean an entire line | 
|  | # is available yet. | 
|  | if serial_line: | 
|  | sl = serial_line.decode('utf-8', 'ignore') | 
|  | logger.debug("DEVICE: {0}".format(sl.rstrip())) | 
|  |  | 
|  | log_out_fp.write(sl) | 
|  | log_out_fp.flush() | 
|  | harness.handle(sl.rstrip()) | 
|  |  | 
|  | if harness.state: | 
|  | ser.close() | 
|  | break | 
|  |  | 
|  | log_out_fp.close() | 
|  |  | 
|  | def device_is_available(self, device): | 
|  | for i in self.suite.connected_hardware: | 
|  | if i['platform'] == device and i['available'] and i['serial']: | 
|  | return True | 
|  |  | 
|  | return False | 
|  |  | 
|  | def get_available_device(self, device): | 
|  | for i in self.suite.connected_hardware: | 
|  | if i['platform'] == device and i['available'] and i['serial']: | 
|  | i['available'] = False | 
|  | i['counter'] += 1 | 
|  | return i | 
|  |  | 
|  | return None | 
|  |  | 
|  | def make_device_available(self, serial): | 
|  | with hw_map_local: | 
|  | for i in self.suite.connected_hardware: | 
|  | if i['serial'] == serial: | 
|  | i['available'] = True | 
|  |  | 
|  | @staticmethod | 
|  | def run_custom_script(script, timeout): | 
|  | with subprocess.Popen(script, stderr=subprocess.PIPE, stdout=subprocess.PIPE) as proc: | 
|  | try: | 
|  | stdout, _ = proc.communicate(timeout=timeout) | 
|  | logger.debug(stdout.decode()) | 
|  |  | 
|  | except subprocess.TimeoutExpired: | 
|  | proc.kill() | 
|  | proc.communicate() | 
|  | logger.error("{} timed out".format(script)) | 
|  |  | 
|  | def handle(self): | 
|  | out_state = "failed" | 
|  |  | 
|  | if options.west_flash: | 
|  | command = ["west", "flash", "--skip-rebuild", "-d", self.build_dir] | 
|  | if options.west_runner: | 
|  | command.append("--runner") | 
|  | command.append(options.west_runner) | 
|  | # There are three ways this option is used. | 
|  | # 1) bare: --west-flash | 
|  | #    This results in options.west_flash == [] | 
|  | # 2) with a value: --west-flash="--board-id=42" | 
|  | #    This results in options.west_flash == "--board-id=42" | 
|  | # 3) Multiple values: --west-flash="--board-id=42,--erase" | 
|  | #    This results in options.west_flash == "--board-id=42 --erase" | 
|  | if options.west_flash != []: | 
|  | command.append('--') | 
|  | command.extend(options.west_flash.split(',')) | 
|  | else: | 
|  | command = [get_generator()[0], "-C", self.build_dir, "flash"] | 
|  |  | 
|  | while not self.device_is_available(self.instance.platform.name): | 
|  | logger.debug("Waiting for device {} to become available".format(self.instance.platform.name)) | 
|  | time.sleep(1) | 
|  |  | 
|  | hardware = self.get_available_device(self.instance.platform.name) | 
|  |  | 
|  | runner = hardware.get('runner', None) | 
|  | if runner: | 
|  | board_id = hardware.get("probe_id", hardware.get("id", None)) | 
|  | product = hardware.get("product", None) | 
|  | command = ["west", "flash", "--skip-rebuild", "-d", self.build_dir] | 
|  | command.append("--runner") | 
|  | command.append(hardware.get('runner', None)) | 
|  | if runner == "pyocd": | 
|  | command.append("--board-id") | 
|  | command.append(board_id) | 
|  | elif runner == "nrfjprog": | 
|  | command.append('--') | 
|  | command.append("--snr") | 
|  | command.append(board_id) | 
|  | elif runner == "openocd" and product == "STM32 STLink": | 
|  | command.append('--') | 
|  | command.append("--cmd-pre-init") | 
|  | command.append("hla_serial %s" % (board_id)) | 
|  | elif runner == "openocd" and product == "EDBG CMSIS-DAP": | 
|  | command.append('--') | 
|  | command.append("--cmd-pre-init") | 
|  | command.append("cmsis_dap_serial %s" % (board_id)) | 
|  | elif runner == "jlink": | 
|  | command.append("--tool-opt=-SelectEmuBySN  %s" % (board_id)) | 
|  |  | 
|  | serial_device = hardware['serial'] | 
|  |  | 
|  | try: | 
|  | ser = serial.Serial( | 
|  | serial_device, | 
|  | baudrate=115200, | 
|  | parity=serial.PARITY_NONE, | 
|  | stopbits=serial.STOPBITS_ONE, | 
|  | bytesize=serial.EIGHTBITS, | 
|  | timeout=self.timeout | 
|  | ) | 
|  | except serial.SerialException as e: | 
|  | self.set_state("failed", 0) | 
|  | self.instance.reason = "Failed" | 
|  | logger.error("Serial device error: %s" % (str(e))) | 
|  | self.make_device_available(serial_device) | 
|  | return | 
|  |  | 
|  | ser.flush() | 
|  |  | 
|  | harness_name = self.instance.testcase.harness.capitalize() | 
|  | harness_import = HarnessImporter(harness_name) | 
|  | harness = harness_import.instance | 
|  | harness.configure(self.instance) | 
|  | read_pipe, write_pipe = os.pipe() | 
|  | start_time = time.time() | 
|  |  | 
|  | pre_script = hardware.get('pre_script') | 
|  | post_flash_script = hardware.get('post_flash_script') | 
|  | post_script = hardware.get('post_script') | 
|  |  | 
|  | if pre_script: | 
|  | self.run_custom_script(pre_script, 30) | 
|  |  | 
|  | t = threading.Thread(target=self.monitor_serial, daemon=True, | 
|  | args=(ser, read_pipe, harness)) | 
|  | t.start() | 
|  |  | 
|  | d_log = "{}/device.log".format(self.instance.build_dir) | 
|  | logger.debug('Flash command: %s', command) | 
|  | try: | 
|  | stdout = stderr = None | 
|  | with subprocess.Popen(command, stderr=subprocess.PIPE, stdout=subprocess.PIPE) as proc: | 
|  | try: | 
|  | (stdout, stderr) = proc.communicate(timeout=30) | 
|  | logger.debug(stdout.decode()) | 
|  |  | 
|  | if proc.returncode != 0: | 
|  | self.instance.reason = "Device issue (Flash?)" | 
|  | with open(d_log, "w") as dlog_fp: | 
|  | dlog_fp.write(stderr.decode()) | 
|  | except subprocess.TimeoutExpired: | 
|  | proc.kill() | 
|  | (stdout, stderr) = proc.communicate() | 
|  | self.instance.reason = "Device issue (Timeout)" | 
|  |  | 
|  | with open(d_log, "w") as dlog_fp: | 
|  | dlog_fp.write(stderr.decode()) | 
|  |  | 
|  | except subprocess.CalledProcessError: | 
|  | os.write(write_pipe, b'x')  # halt the thread | 
|  |  | 
|  | if post_flash_script: | 
|  | self.run_custom_script(post_flash_script, 30) | 
|  |  | 
|  |  | 
|  | t.join(self.timeout) | 
|  | if t.is_alive(): | 
|  | logger.debug("Timed out while monitoring serial output on {}".format(self.instance.platform.name)) | 
|  | out_state = "timeout" | 
|  |  | 
|  | if ser.isOpen(): | 
|  | ser.close() | 
|  |  | 
|  | os.close(write_pipe) | 
|  | os.close(read_pipe) | 
|  |  | 
|  | handler_time = time.time() - start_time | 
|  |  | 
|  | if out_state == "timeout": | 
|  | for c in self.instance.testcase.cases: | 
|  | if c not in harness.tests: | 
|  | harness.tests[c] = "BLOCK" | 
|  |  | 
|  | self.instance.reason = "Timeout" | 
|  |  | 
|  | self.instance.results = harness.tests | 
|  |  | 
|  | if harness.state: | 
|  | self.set_state(harness.state, handler_time) | 
|  | if  harness.state == "failed": | 
|  | self.instance.reason = "Failed" | 
|  | else: | 
|  | self.set_state(out_state, handler_time) | 
|  |  | 
|  | if post_script: | 
|  | self.run_custom_script(post_script, 30) | 
|  |  | 
|  | self.make_device_available(serial_device) | 
|  |  | 
|  | self.record(harness) | 
|  |  | 
|  |  | 
|  | class QEMUHandler(Handler): | 
|  | """Spawns a thread to monitor QEMU output from pipes | 
|  |  | 
|  | We pass QEMU_PIPE to 'make run' and monitor the pipes for output. | 
|  | We need to do this as once qemu starts, it runs forever until killed. | 
|  | Test cases emit special messages to the console as they run, we check | 
|  | for these to collect whether the test passed or failed. | 
|  | """ | 
|  |  | 
|  | def __init__(self, instance, type_str): | 
|  | """Constructor | 
|  |  | 
|  | @param instance Test instance | 
|  | """ | 
|  |  | 
|  | super().__init__(instance, type_str) | 
|  | self.fifo_fn = os.path.join(instance.build_dir, "qemu-fifo") | 
|  |  | 
|  | self.pid_fn = os.path.join(instance.build_dir, "qemu.pid") | 
|  |  | 
|  | @staticmethod | 
|  | def _thread(handler, timeout, outdir, logfile, fifo_fn, pid_fn, results, harness): | 
|  | fifo_in = fifo_fn + ".in" | 
|  | fifo_out = fifo_fn + ".out" | 
|  |  | 
|  | # These in/out nodes are named from QEMU's perspective, not ours | 
|  | if os.path.exists(fifo_in): | 
|  | os.unlink(fifo_in) | 
|  | os.mkfifo(fifo_in) | 
|  | if os.path.exists(fifo_out): | 
|  | os.unlink(fifo_out) | 
|  | os.mkfifo(fifo_out) | 
|  |  | 
|  | # We don't do anything with out_fp but we need to open it for | 
|  | # writing so that QEMU doesn't block, due to the way pipes work | 
|  | out_fp = open(fifo_in, "wb") | 
|  | # Disable internal buffering, we don't | 
|  | # want read() or poll() to ever block if there is data in there | 
|  | in_fp = open(fifo_out, "rb", buffering=0) | 
|  | log_out_fp = open(logfile, "wt") | 
|  |  | 
|  | start_time = time.time() | 
|  | timeout_time = start_time + timeout | 
|  | p = select.poll() | 
|  | p.register(in_fp, select.POLLIN) | 
|  | out_state = None | 
|  |  | 
|  | line = "" | 
|  | timeout_extended = False | 
|  | while True: | 
|  | this_timeout = int((timeout_time - time.time()) * 1000) | 
|  | if this_timeout < 0 or not p.poll(this_timeout): | 
|  | if not out_state: | 
|  | out_state = "timeout" | 
|  | break | 
|  |  | 
|  | try: | 
|  | c = in_fp.read(1).decode("utf-8") | 
|  | except UnicodeDecodeError: | 
|  | # Test is writing something weird, fail | 
|  | out_state = "unexpected byte" | 
|  | break | 
|  |  | 
|  | if c == "": | 
|  | # EOF, this shouldn't happen unless QEMU crashes | 
|  | out_state = "unexpected eof" | 
|  | break | 
|  | line = line + c | 
|  | if c != "\n": | 
|  | continue | 
|  |  | 
|  | # line contains a full line of data output from QEMU | 
|  | log_out_fp.write(line) | 
|  | log_out_fp.flush() | 
|  | line = line.strip() | 
|  | logger.debug("QEMU: %s" % line) | 
|  |  | 
|  | harness.handle(line) | 
|  | if harness.state: | 
|  | # if we have registered a fail make sure the state is not | 
|  | # overridden by a false success message coming from the | 
|  | # testsuite | 
|  | if out_state != 'failed': | 
|  | out_state = harness.state | 
|  |  | 
|  | # if we get some state, that means test is doing well, we reset | 
|  | # the timeout and wait for 2 more seconds to catch anything | 
|  | # printed late. We wait much longer if code | 
|  | # coverage is enabled since dumping this information can | 
|  | # take some time. | 
|  | if not timeout_extended or harness.capture_coverage: | 
|  | timeout_extended = True | 
|  | if harness.capture_coverage: | 
|  | timeout_time = time.time() + 30 | 
|  | else: | 
|  | timeout_time = time.time() + 2 | 
|  | line = "" | 
|  |  | 
|  | handler.record(harness) | 
|  |  | 
|  | handler_time = time.time() - start_time | 
|  | logger.debug("QEMU complete (%s) after %f seconds" % | 
|  | (out_state, handler_time)) | 
|  | handler.set_state(out_state, handler_time) | 
|  | if out_state == "timeout": | 
|  | handler.instance.reason = "Timeout" | 
|  | elif out_state == "failed": | 
|  | handler.instance.reason = "Failed" | 
|  |  | 
|  | log_out_fp.close() | 
|  | out_fp.close() | 
|  | in_fp.close() | 
|  | if os.path.exists(pid_fn): | 
|  | pid = int(open(pid_fn).read()) | 
|  | os.unlink(pid_fn) | 
|  |  | 
|  | try: | 
|  | if pid: | 
|  | os.kill(pid, signal.SIGTERM) | 
|  | except ProcessLookupError: | 
|  | # Oh well, as long as it's dead! User probably sent Ctrl-C | 
|  | pass | 
|  |  | 
|  | os.unlink(fifo_in) | 
|  | os.unlink(fifo_out) | 
|  |  | 
|  | def handle(self): | 
|  | self.results = {} | 
|  | self.run = True | 
|  |  | 
|  | # We pass this to QEMU which looks for fifos with .in and .out | 
|  | # suffixes. | 
|  | self.fifo_fn = os.path.join(self.instance.build_dir, "qemu-fifo") | 
|  |  | 
|  | self.pid_fn = os.path.join(self.instance.build_dir, "qemu.pid") | 
|  | if os.path.exists(self.pid_fn): | 
|  | os.unlink(self.pid_fn) | 
|  |  | 
|  | self.log_fn = self.log | 
|  |  | 
|  | harness_import = HarnessImporter(self.instance.testcase.harness.capitalize()) | 
|  | harness = harness_import.instance | 
|  | harness.configure(self.instance) | 
|  | self.thread = threading.Thread(name=self.name, target=QEMUHandler._thread, | 
|  | args=(self, self.timeout, self.build_dir, | 
|  | self.log_fn, self.fifo_fn, | 
|  | self.pid_fn, self.results, harness)) | 
|  |  | 
|  | self.instance.results = harness.tests | 
|  | self.thread.daemon = True | 
|  | logger.debug("Spawning QEMUHandler Thread for %s" % self.name) | 
|  | self.thread.start() | 
|  | subprocess.call(["stty", "sane"]) | 
|  |  | 
|  | logger.debug("Running %s (%s)" % (self.name, self.type_str)) | 
|  | command = [get_generator()[0]] | 
|  | command += ["-C", self.build_dir, "run"] | 
|  |  | 
|  | with subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=self.build_dir) as proc: | 
|  | logger.debug("Spawning QEMUHandler Thread for %s" % self.name) | 
|  | proc.wait() | 
|  | self.returncode = proc.returncode | 
|  |  | 
|  | if self.returncode != 0: | 
|  | self.set_state("failed", 0) | 
|  | self.instance.reason = "Exited with {}".format(self.returncode) | 
|  |  | 
|  | def get_fifo(self): | 
|  | return self.fifo_fn | 
|  |  | 
|  |  | 
|  | class SizeCalculator: | 
|  | alloc_sections = [ | 
|  | "bss", | 
|  | "noinit", | 
|  | "app_bss", | 
|  | "app_noinit", | 
|  | "ccm_bss", | 
|  | "ccm_noinit" | 
|  | ] | 
|  |  | 
|  | rw_sections = [ | 
|  | "datas", | 
|  | "initlevel", | 
|  | "exceptions", | 
|  | "initshell", | 
|  | "_static_thread_area", | 
|  | "_k_timer_area", | 
|  | "_k_mem_slab_area", | 
|  | "_k_mem_pool_area", | 
|  | "sw_isr_table", | 
|  | "_k_sem_area", | 
|  | "_k_mutex_area", | 
|  | "app_shmem_regions", | 
|  | "_k_fifo_area", | 
|  | "_k_lifo_area", | 
|  | "_k_stack_area", | 
|  | "_k_msgq_area", | 
|  | "_k_mbox_area", | 
|  | "_k_pipe_area", | 
|  | "net_if", | 
|  | "net_if_dev", | 
|  | "net_stack", | 
|  | "net_l2_data", | 
|  | "_k_queue_area", | 
|  | "_net_buf_pool_area", | 
|  | "app_datas", | 
|  | "kobject_data", | 
|  | "mmu_tables", | 
|  | "app_pad", | 
|  | "priv_stacks", | 
|  | "ccm_data", | 
|  | "usb_descriptor", | 
|  | "usb_data", "usb_bos_desc", | 
|  | 'log_backends_sections', | 
|  | 'log_dynamic_sections', | 
|  | 'log_const_sections', | 
|  | "app_smem", | 
|  | 'shell_root_cmds_sections', | 
|  | 'log_const_sections', | 
|  | "font_entry_sections", | 
|  | "priv_stacks_noinit", | 
|  | "_TEXT_SECTION_NAME_2", | 
|  | "_GCOV_BSS_SECTION_NAME", | 
|  | "gcov", | 
|  | "nocache" | 
|  | ] | 
|  |  | 
|  | # These get copied into RAM only on non-XIP | 
|  | ro_sections = [ | 
|  | "text", | 
|  | "ctors", | 
|  | "init_array", | 
|  | "reset", | 
|  | "object_access", | 
|  | "rodata", | 
|  | "devconfig", | 
|  | "net_l2", | 
|  | "vector", | 
|  | "sw_isr_table", | 
|  | "_settings_handlers_area", | 
|  | "_bt_channels_area", | 
|  | "_bt_br_channels_area", | 
|  | "_bt_services_area", | 
|  | "vectors", | 
|  | "net_socket_register", | 
|  | "net_ppp_proto" | 
|  | ] | 
|  |  | 
|  | def __init__(self, filename, extra_sections): | 
|  | """Constructor | 
|  |  | 
|  | @param filename Path to the output binary | 
|  | The <filename> is parsed by objdump to determine section sizes | 
|  | """ | 
|  | # Make sure this is an ELF binary | 
|  | with open(filename, "rb") as f: | 
|  | magic = f.read(4) | 
|  |  | 
|  | try: | 
|  | if magic != b'\x7fELF': | 
|  | raise SanityRuntimeError("%s is not an ELF binary" % filename) | 
|  | except Exception as e: | 
|  | print(str(e)) | 
|  | sys.exit(2) | 
|  |  | 
|  | # Search for CONFIG_XIP in the ELF's list of symbols using NM and AWK. | 
|  | # GREP can not be used as it returns an error if the symbol is not | 
|  | # found. | 
|  | is_xip_command = "nm " + filename + \ | 
|  | " | awk '/CONFIG_XIP/ { print $3 }'" | 
|  | is_xip_output = subprocess.check_output( | 
|  | is_xip_command, shell=True, stderr=subprocess.STDOUT).decode( | 
|  | "utf-8").strip() | 
|  | try: | 
|  | if is_xip_output.endswith("no symbols"): | 
|  | raise SanityRuntimeError("%s has no symbol information" % filename) | 
|  | except Exception as e: | 
|  | print(str(e)) | 
|  | sys.exit(2) | 
|  |  | 
|  | self.is_xip = (len(is_xip_output) != 0) | 
|  |  | 
|  | self.filename = filename | 
|  | self.sections = [] | 
|  | self.rom_size = 0 | 
|  | self.ram_size = 0 | 
|  | self.extra_sections = extra_sections | 
|  |  | 
|  | self._calculate_sizes() | 
|  |  | 
|  | def get_ram_size(self): | 
|  | """Get the amount of RAM the application will use up on the device | 
|  |  | 
|  | @return amount of RAM, in bytes | 
|  | """ | 
|  | return self.ram_size | 
|  |  | 
|  | def get_rom_size(self): | 
|  | """Get the size of the data that this application uses on device's flash | 
|  |  | 
|  | @return amount of ROM, in bytes | 
|  | """ | 
|  | return self.rom_size | 
|  |  | 
|  | def unrecognized_sections(self): | 
|  | """Get a list of sections inside the binary that weren't recognized | 
|  |  | 
|  | @return list of unrecognized section names | 
|  | """ | 
|  | slist = [] | 
|  | for v in self.sections: | 
|  | if not v["recognized"]: | 
|  | slist.append(v["name"]) | 
|  | return slist | 
|  |  | 
|  | def _calculate_sizes(self): | 
|  | """ Calculate RAM and ROM usage by section """ | 
|  | objdump_command = "objdump -h " + self.filename | 
|  | objdump_output = subprocess.check_output( | 
|  | objdump_command, shell=True).decode("utf-8").splitlines() | 
|  |  | 
|  | for line in objdump_output: | 
|  | words = line.split() | 
|  |  | 
|  | if not words:  # Skip lines that are too short | 
|  | continue | 
|  |  | 
|  | index = words[0] | 
|  | if not index[0].isdigit():  # Skip lines that do not start | 
|  | continue  # with a digit | 
|  |  | 
|  | name = words[1]  # Skip lines with section names | 
|  | if name[0] == '.':  # starting with '.' | 
|  | continue | 
|  |  | 
|  | # TODO this doesn't actually reflect the size in flash or RAM as | 
|  | # it doesn't include linker-imposed padding between sections. | 
|  | # It is close though. | 
|  | size = int(words[2], 16) | 
|  | if size == 0: | 
|  | continue | 
|  |  | 
|  | load_addr = int(words[4], 16) | 
|  | virt_addr = int(words[3], 16) | 
|  |  | 
|  | # Add section to memory use totals (for both non-XIP and XIP scenarios) | 
|  | # Unrecognized section names are not included in the calculations. | 
|  | recognized = True | 
|  | if name in SizeCalculator.alloc_sections: | 
|  | self.ram_size += size | 
|  | stype = "alloc" | 
|  | elif name in SizeCalculator.rw_sections: | 
|  | self.ram_size += size | 
|  | self.rom_size += size | 
|  | stype = "rw" | 
|  | elif name in SizeCalculator.ro_sections: | 
|  | self.rom_size += size | 
|  | if not self.is_xip: | 
|  | self.ram_size += size | 
|  | stype = "ro" | 
|  | else: | 
|  | stype = "unknown" | 
|  | if name not in self.extra_sections: | 
|  | recognized = False | 
|  |  | 
|  | self.sections.append({"name": name, "load_addr": load_addr, | 
|  | "size": size, "virt_addr": virt_addr, | 
|  | "type": stype, "recognized": recognized}) | 
|  |  | 
|  |  | 
|  | # "list" - List of strings | 
|  | # "list:<type>" - List of <type> | 
|  | # "set" - Set of unordered, unique strings | 
|  | # "set:<type>" - Set of <type> | 
|  | # "float" - Floating point | 
|  | # "int" - Integer | 
|  | # "bool" - Boolean | 
|  | # "str" - String | 
|  |  | 
|  | # XXX Be sure to update __doc__ if you change any of this!! | 
|  |  | 
|  | platform_valid_keys = { | 
|  | "supported_toolchains": {"type": "list", "default": []}, | 
|  | "env": {"type": "list", "default": []} | 
|  | } | 
|  |  | 
|  | testcase_valid_keys = {"tags": {"type": "set", "required": False}, | 
|  | "type": {"type": "str", "default": "integration"}, | 
|  | "extra_args": {"type": "list"}, | 
|  | "extra_configs": {"type": "list"}, | 
|  | "build_only": {"type": "bool", "default": False}, | 
|  | "build_on_all": {"type": "bool", "default": False}, | 
|  | "skip": {"type": "bool", "default": False}, | 
|  | "slow": {"type": "bool", "default": False}, | 
|  | "timeout": {"type": "int", "default": 60}, | 
|  | "min_ram": {"type": "int", "default": 8}, | 
|  | "depends_on": {"type": "set"}, | 
|  | "min_flash": {"type": "int", "default": 32}, | 
|  | "arch_whitelist": {"type": "set"}, | 
|  | "arch_exclude": {"type": "set"}, | 
|  | "extra_sections": {"type": "list", "default": []}, | 
|  | "platform_exclude": {"type": "set"}, | 
|  | "platform_whitelist": {"type": "set"}, | 
|  | "toolchain_exclude": {"type": "set"}, | 
|  | "toolchain_whitelist": {"type": "set"}, | 
|  | "filter": {"type": "str"}, | 
|  | "harness": {"type": "str"}, | 
|  | "harness_config": {"type": "map", "default": {}} | 
|  | } | 
|  |  | 
|  |  | 
|  | class SanityConfigParser: | 
|  | """Class to read test case files with semantic checking | 
|  | """ | 
|  |  | 
|  | def __init__(self, filename, schema): | 
|  | """Instantiate a new SanityConfigParser object | 
|  |  | 
|  | @param filename Source .yaml file to read | 
|  | """ | 
|  | self.data = {} | 
|  | self.schema = schema | 
|  | self.filename = filename | 
|  | self.tests = {} | 
|  | self.common = {} | 
|  |  | 
|  | def load(self): | 
|  | self.data = scl.yaml_load_verify(self.filename, self.schema) | 
|  |  | 
|  | if 'tests' in self.data: | 
|  | self.tests = self.data['tests'] | 
|  | if 'common' in self.data: | 
|  | self.common = self.data['common'] | 
|  |  | 
|  | def _cast_value(self, value, typestr): | 
|  | if isinstance(value, str): | 
|  | v = value.strip() | 
|  | if typestr == "str": | 
|  | return v | 
|  |  | 
|  | elif typestr == "float": | 
|  | return float(value) | 
|  |  | 
|  | elif typestr == "int": | 
|  | return int(value) | 
|  |  | 
|  | elif typestr == "bool": | 
|  | return value | 
|  |  | 
|  | elif typestr.startswith("list") and isinstance(value, list): | 
|  | return value | 
|  | elif typestr.startswith("list") and isinstance(value, str): | 
|  | vs = v.split() | 
|  | if len(typestr) > 4 and typestr[4] == ":": | 
|  | return [self._cast_value(vsi, typestr[5:]) for vsi in vs] | 
|  | else: | 
|  | return vs | 
|  |  | 
|  | elif typestr.startswith("set"): | 
|  | vs = v.split() | 
|  | if len(typestr) > 3 and typestr[3] == ":": | 
|  | return {self._cast_value(vsi, typestr[4:]) for vsi in vs} | 
|  | else: | 
|  | return set(vs) | 
|  |  | 
|  | elif typestr.startswith("map"): | 
|  | return value | 
|  | else: | 
|  | raise ConfigurationError( | 
|  | self.filename, "unknown type '%s'" % value) | 
|  |  | 
|  | def get_test(self, name, valid_keys): | 
|  | """Get a dictionary representing the keys/values within a test | 
|  |  | 
|  | @param name The test in the .yaml file to retrieve data from | 
|  | @param valid_keys A dictionary representing the intended semantics | 
|  | for this test. Each key in this dictionary is a key that could | 
|  | be specified, if a key is given in the .yaml file which isn't in | 
|  | here, it will generate an error. Each value in this dictionary | 
|  | is another dictionary containing metadata: | 
|  |  | 
|  | "default" - Default value if not given | 
|  | "type" - Data type to convert the text value to. Simple types | 
|  | supported are "str", "float", "int", "bool" which will get | 
|  | converted to respective Python data types. "set" and "list" | 
|  | may also be specified which will split the value by | 
|  | whitespace (but keep the elements as strings). finally, | 
|  | "list:<type>" and "set:<type>" may be given which will | 
|  | perform a type conversion after splitting the value up. | 
|  | "required" - If true, raise an error if not defined. If false | 
|  | and "default" isn't specified, a type conversion will be | 
|  | done on an empty string | 
|  | @return A dictionary containing the test key-value pairs with | 
|  | type conversion and default values filled in per valid_keys | 
|  | """ | 
|  |  | 
|  | d = {} | 
|  | for k, v in self.common.items(): | 
|  | d[k] = v | 
|  |  | 
|  | for k, v in self.tests[name].items(): | 
|  | if k not in valid_keys: | 
|  | raise ConfigurationError( | 
|  | self.filename, | 
|  | "Unknown config key '%s' in definition for '%s'" % | 
|  | (k, name)) | 
|  |  | 
|  | if k in d: | 
|  | if isinstance(d[k], str): | 
|  | # By default, we just concatenate string values of keys | 
|  | # which appear both in "common" and per-test sections, | 
|  | # but some keys are handled in adhoc way based on their | 
|  | # semantics. | 
|  | if k == "filter": | 
|  | d[k] = "(%s) and (%s)" % (d[k], v) | 
|  | else: | 
|  | d[k] += " " + v | 
|  | else: | 
|  | d[k] = v | 
|  |  | 
|  | for k, kinfo in valid_keys.items(): | 
|  | if k not in d: | 
|  | if "required" in kinfo: | 
|  | required = kinfo["required"] | 
|  | else: | 
|  | required = False | 
|  |  | 
|  | if required: | 
|  | raise ConfigurationError( | 
|  | self.filename, | 
|  | "missing required value for '%s' in test '%s'" % | 
|  | (k, name)) | 
|  | else: | 
|  | if "default" in kinfo: | 
|  | default = kinfo["default"] | 
|  | else: | 
|  | default = self._cast_value("", kinfo["type"]) | 
|  | d[k] = default | 
|  | else: | 
|  | try: | 
|  | d[k] = self._cast_value(d[k], kinfo["type"]) | 
|  | except ValueError: | 
|  | raise ConfigurationError( | 
|  | self.filename, "bad %s value '%s' for key '%s' in name '%s'" % | 
|  | (kinfo["type"], d[k], k, name)) | 
|  |  | 
|  | return d | 
|  |  | 
|  |  | 
|  | class Platform: | 
|  | """Class representing metadata for a particular platform | 
|  |  | 
|  | Maps directly to BOARD when building""" | 
|  |  | 
|  | platform_schema = scl.yaml_load(os.path.join(ZEPHYR_BASE, | 
|  | "scripts", "sanity_chk", "platform-schema.yaml")) | 
|  |  | 
|  | def __init__(self): | 
|  | """Constructor. | 
|  |  | 
|  | """ | 
|  |  | 
|  | self.name = "" | 
|  | self.sanitycheck = True | 
|  | # if no RAM size is specified by the board, take a default of 128K | 
|  | self.ram = 128 | 
|  |  | 
|  | self.ignore_tags = [] | 
|  | self.default = False | 
|  | # if no flash size is specified by the board, take a default of 512K | 
|  | self.flash = 512 | 
|  | self.supported = set() | 
|  |  | 
|  | self.arch = "" | 
|  | self.type = "na" | 
|  | self.simulation = "na" | 
|  | self.supported_toolchains = [] | 
|  | self.env = [] | 
|  | self.env_satisfied = True | 
|  | self.filter_data = dict() | 
|  |  | 
|  | def load(self, platform_file): | 
|  | scp = SanityConfigParser(platform_file, self.platform_schema) | 
|  | scp.load() | 
|  | data = scp.data | 
|  |  | 
|  | self.name = data['identifier'] | 
|  | self.sanitycheck = data.get("sanitycheck", True) | 
|  | # if no RAM size is specified by the board, take a default of 128K | 
|  | self.ram = data.get("ram", 128) | 
|  | testing = data.get("testing", {}) | 
|  | self.ignore_tags = testing.get("ignore_tags", []) | 
|  | self.default = testing.get("default", False) | 
|  | # if no flash size is specified by the board, take a default of 512K | 
|  | self.flash = data.get("flash", 512) | 
|  | self.supported = set() | 
|  | for supp_feature in data.get("supported", []): | 
|  | for item in supp_feature.split(":"): | 
|  | self.supported.add(item) | 
|  |  | 
|  | self.arch = data['arch'] | 
|  | self.type = data.get('type', "na") | 
|  | self.simulation = data.get('simulation', "na") | 
|  | self.supported_toolchains = data.get("toolchain", []) | 
|  | self.env = data.get("env", []) | 
|  | self.env_satisfied = True | 
|  | for env in self.env: | 
|  | if not os.environ.get(env, None): | 
|  | self.env_satisfied = False | 
|  |  | 
|  | def __repr__(self): | 
|  | return "<%s on %s>" % (self.name, self.arch) | 
|  |  | 
|  |  | 
|  | class TestCase(object): | 
|  | """Class representing a test application | 
|  | """ | 
|  |  | 
|  | def __init__(self): | 
|  | """TestCase constructor. | 
|  |  | 
|  | This gets called by TestSuite as it finds and reads test yaml files. | 
|  | Multiple TestCase instances may be generated from a single testcase.yaml, | 
|  | each one corresponds to an entry within that file. | 
|  |  | 
|  | We need to have a unique name for every single test case. Since | 
|  | a testcase.yaml can define multiple tests, the canonical name for | 
|  | the test case is <workdir>/<name>. | 
|  |  | 
|  | @param testcase_root os.path.abspath() of one of the --testcase-root | 
|  | @param workdir Sub-directory of testcase_root where the | 
|  | .yaml test configuration file was found | 
|  | @param name Name of this test case, corresponding to the entry name | 
|  | in the test case configuration file. For many test cases that just | 
|  | define one test, can be anything and is usually "test". This is | 
|  | really only used to distinguish between different cases when | 
|  | the testcase.yaml defines multiple tests | 
|  | @param tc_dict Dictionary with test values for this test case | 
|  | from the testcase.yaml file | 
|  | """ | 
|  |  | 
|  | self.id = "" | 
|  | self.source_dir = "" | 
|  | self.yamlfile = "" | 
|  | self.cases = [] | 
|  | self.name = "" | 
|  |  | 
|  | self.type = None | 
|  | self.tags = None | 
|  | self.extra_args = None | 
|  | self.extra_configs = None | 
|  | self.arch_whitelist = None | 
|  | self.arch_exclude = None | 
|  | self.skip = None | 
|  | self.platform_exclude = None | 
|  | self.platform_whitelist = None | 
|  | self.toolchain_exclude = None | 
|  | self.toolchain_whitelist = None | 
|  | self.tc_filter = None | 
|  | self.timeout = 60 | 
|  | self.harness = "" | 
|  | self.harness_config = {} | 
|  | self.build_only = True | 
|  | self.build_on_all = False | 
|  | self.slow = False | 
|  | self.min_ram = None | 
|  | self.depends_on = None | 
|  | self.min_flash = None | 
|  | self.extra_sections = None | 
|  |  | 
|  | @staticmethod | 
|  | def get_unique(testcase_root, workdir, name): | 
|  |  | 
|  | canonical_testcase_root = os.path.realpath(testcase_root) | 
|  | if Path(canonical_zephyr_base) in Path(canonical_testcase_root).parents: | 
|  | # This is in ZEPHYR_BASE, so include path in name for uniqueness | 
|  | # FIXME: We should not depend on path of test for unique names. | 
|  | relative_tc_root = os.path.relpath(canonical_testcase_root, | 
|  | start=canonical_zephyr_base) | 
|  | else: | 
|  | relative_tc_root = "" | 
|  |  | 
|  | # workdir can be "." | 
|  | unique = os.path.normpath(os.path.join(relative_tc_root, workdir, name)) | 
|  | return unique | 
|  |  | 
|  | @staticmethod | 
|  | def scan_file(inf_name): | 
|  | suite_regex = re.compile( | 
|  | # do not match until end-of-line, otherwise we won't allow | 
|  | # stc_regex below to catch the ones that are declared in the same | 
|  | # line--as we only search starting the end of this match | 
|  | br"^\s*ztest_test_suite\(\s*(?P<suite_name>[a-zA-Z0-9_]+)\s*,", | 
|  | re.MULTILINE) | 
|  | stc_regex = re.compile( | 
|  | br"^\s*"  # empy space at the beginning is ok | 
|  | # catch the case where it is declared in the same sentence, e.g: | 
|  | # | 
|  | # ztest_test_suite(mutex_complex, ztest_user_unit_test(TESTNAME)); | 
|  | br"(?:ztest_test_suite\([a-zA-Z0-9_]+,\s*)?" | 
|  | # Catch ztest[_user]_unit_test-[_setup_teardown](TESTNAME) | 
|  | br"ztest_(?:1cpu_)?(?:user_)?unit_test(?:_setup_teardown)?" | 
|  | # Consume the argument that becomes the extra testcse | 
|  | br"\(\s*" | 
|  | br"(?P<stc_name>[a-zA-Z0-9_]+)" | 
|  | # _setup_teardown() variant has two extra arguments that we ignore | 
|  | br"(?:\s*,\s*[a-zA-Z0-9_]+\s*,\s*[a-zA-Z0-9_]+)?" | 
|  | br"\s*\)", | 
|  | # We don't check how it finishes; we don't care | 
|  | re.MULTILINE) | 
|  | suite_run_regex = re.compile( | 
|  | br"^\s*ztest_run_test_suite\((?P<suite_name>[a-zA-Z0-9_]+)\)", | 
|  | re.MULTILINE) | 
|  | achtung_regex = re.compile( | 
|  | br"(#ifdef|#endif)", | 
|  | re.MULTILINE) | 
|  | warnings = None | 
|  |  | 
|  | with open(inf_name) as inf: | 
|  | if os.name == 'nt': | 
|  | mmap_args = {'fileno': inf.fileno(), 'length': 0, 'access': mmap.ACCESS_READ} | 
|  | else: | 
|  | mmap_args = {'fileno': inf.fileno(), 'length': 0, 'flags': mmap.MAP_PRIVATE, 'prot': mmap.PROT_READ, | 
|  | 'offset': 0} | 
|  |  | 
|  | with contextlib.closing(mmap.mmap(**mmap_args)) as main_c: | 
|  | # contextlib makes pylint think main_c isn't subscriptable | 
|  | # pylint: disable=unsubscriptable-object | 
|  |  | 
|  | suite_regex_match = suite_regex.search(main_c) | 
|  | if not suite_regex_match: | 
|  | # can't find ztest_test_suite, maybe a client, because | 
|  | # it includes ztest.h | 
|  | return None, None | 
|  |  | 
|  | suite_run_match = suite_run_regex.search(main_c) | 
|  | if not suite_run_match: | 
|  | raise ValueError("can't find ztest_run_test_suite") | 
|  |  | 
|  | achtung_matches = re.findall( | 
|  | achtung_regex, | 
|  | main_c[suite_regex_match.end():suite_run_match.start()]) | 
|  | if achtung_matches: | 
|  | warnings = "found invalid %s in ztest_test_suite()" \ | 
|  | % ", ".join({match.decode() for match in achtung_matches}) | 
|  | _matches = re.findall( | 
|  | stc_regex, | 
|  | main_c[suite_regex_match.end():suite_run_match.start()]) | 
|  | matches = [match.decode().replace("test_", "") for match in _matches] | 
|  | return matches, warnings | 
|  |  | 
|  | def scan_path(self, path): | 
|  | subcases = [] | 
|  | for filename in glob.glob(os.path.join(path, "src", "*.c")): | 
|  | try: | 
|  | _subcases, warnings = self.scan_file(filename) | 
|  | if warnings: | 
|  | logger.error("%s: %s" % (filename, warnings)) | 
|  | if _subcases: | 
|  | subcases += _subcases | 
|  | except ValueError as e: | 
|  | logger.error("%s: can't find: %s" % (filename, e)) | 
|  | for filename in glob.glob(os.path.join(path, "*.c")): | 
|  | try: | 
|  | _subcases, warnings = self.scan_file(filename) | 
|  | if warnings: | 
|  | logger.error("%s: %s" % (filename, warnings)) | 
|  | if _subcases: | 
|  | subcases += _subcases | 
|  | except ValueError as e: | 
|  | logger.error("%s: can't find: %s" % (filename, e)) | 
|  | return subcases | 
|  |  | 
|  | def parse_subcases(self, test_path): | 
|  | results = self.scan_path(test_path) | 
|  | for sub in results: | 
|  | name = "{}.{}".format(self.id, sub) | 
|  | self.cases.append(name) | 
|  |  | 
|  | if not results: | 
|  | self.cases.append(self.id) | 
|  |  | 
|  | def __str__(self): | 
|  | return self.name | 
|  |  | 
|  |  | 
|  | class TestInstance: | 
|  | """Class representing the execution of a particular TestCase on a platform | 
|  |  | 
|  | @param test The TestCase object we want to build/execute | 
|  | @param platform Platform object that we want to build and run against | 
|  | @param base_outdir Base directory for all test results. The actual | 
|  | out directory used is <outdir>/<platform>/<test case name> | 
|  | """ | 
|  |  | 
|  | def __init__(self, testcase, platform, outdir): | 
|  |  | 
|  | self.testcase = testcase | 
|  | self.platform = platform | 
|  |  | 
|  | self.status = None | 
|  | self.reason = "Unknown" | 
|  | self.metrics = dict() | 
|  | self.handler = None | 
|  | self.outdir = outdir | 
|  |  | 
|  | self.name = os.path.join(platform.name, testcase.name) | 
|  | self.build_dir = os.path.join(outdir, platform.name, testcase.name) | 
|  |  | 
|  | self.build_only = True | 
|  | self.run = False | 
|  |  | 
|  | self.results = {} | 
|  |  | 
|  | def __lt__(self, other): | 
|  | return self.name < other.name | 
|  |  | 
|  | def check_build_or_run(self, build_only=False, enable_slow=False, device_testing=False, fixture=[]): | 
|  |  | 
|  | # right now we only support building on windows. running is still work | 
|  | # in progress. | 
|  | if os.name == 'nt': | 
|  | self.build_only = True | 
|  | self.run = False | 
|  | return | 
|  |  | 
|  | _build_only = True | 
|  |  | 
|  | # we asked for build-only on the command line | 
|  | if build_only or self.testcase.build_only: | 
|  | self.build_only = True | 
|  | self.run = False | 
|  | return | 
|  |  | 
|  | # Do not run slow tests: | 
|  | skip_slow = self.testcase.slow and not enable_slow | 
|  | if skip_slow: | 
|  | self.build_only = True | 
|  | self.run = False | 
|  | return | 
|  |  | 
|  | runnable = bool(self.testcase.type == "unit" or \ | 
|  | self.platform.type == "native" or \ | 
|  | self.platform.simulation in ["nsim", "renode", "qemu"] or \ | 
|  | device_testing) | 
|  |  | 
|  | if self.platform.simulation == "nsim": | 
|  | if not find_executable("nsimdrv"): | 
|  | runnable = False | 
|  |  | 
|  | if self.platform.simulation == "renode": | 
|  | if not find_executable("renode"): | 
|  | runnable = False | 
|  |  | 
|  | # console harness allows us to run the test and capture data. | 
|  | if self.testcase.harness == 'console': | 
|  |  | 
|  | # if we have a fixture that is also being supplied on the | 
|  | # command-line, then we need to run the test, not just build it. | 
|  | if "fixture" in self.testcase.harness_config: | 
|  | fixture_cfg = self.testcase.harness_config['fixture'] | 
|  | if fixture_cfg in fixture: | 
|  | _build_only = False | 
|  | else: | 
|  | _build_only = True | 
|  | else: | 
|  | _build_only = False | 
|  | elif self.testcase.harness: | 
|  | _build_only = True | 
|  | else: | 
|  | _build_only = False | 
|  |  | 
|  | self.build_only = not (not _build_only and runnable) | 
|  | self.run = not self.build_only | 
|  | return | 
|  |  | 
|  | def create_overlay(self, platform, enable_asan=False, enable_coverage=False, coverage_platform=[]): | 
|  | # Create this in a "sanitycheck/" subdirectory otherwise this | 
|  | # will pass this overlay to kconfig.py *twice* and kconfig.cmake | 
|  | # will silently give that second time precedence over any | 
|  | # --extra-args=CONFIG_* | 
|  | subdir = os.path.join(self.build_dir, "sanitycheck") | 
|  | os.makedirs(subdir, exist_ok=True) | 
|  | file = os.path.join(subdir, "testcase_extra.conf") | 
|  |  | 
|  | with open(file, "w") as f: | 
|  | content = "" | 
|  |  | 
|  | if self.testcase.extra_configs: | 
|  | content = "\n".join(self.testcase.extra_configs) | 
|  |  | 
|  | if enable_coverage: | 
|  | if platform.name in coverage_platform: | 
|  | content = content + "\nCONFIG_COVERAGE=y" | 
|  | content = content + "\nCONFIG_COVERAGE_DUMP=y" | 
|  |  | 
|  | if enable_asan: | 
|  | if platform.type == "native": | 
|  | content = content + "\nCONFIG_ASAN=y" | 
|  |  | 
|  | f.write(content) | 
|  |  | 
|  | def calculate_sizes(self): | 
|  | """Get the RAM/ROM sizes of a test case. | 
|  |  | 
|  | This can only be run after the instance has been executed by | 
|  | MakeGenerator, otherwise there won't be any binaries to measure. | 
|  |  | 
|  | @return A SizeCalculator object | 
|  | """ | 
|  | fns = glob.glob(os.path.join(self.build_dir, "zephyr", "*.elf")) | 
|  | fns.extend(glob.glob(os.path.join(self.build_dir, "zephyr", "*.exe"))) | 
|  | fns = [x for x in fns if not x.endswith('_prebuilt.elf')] | 
|  | if len(fns) != 1: | 
|  | raise BuildError("Missing/multiple output ELF binary") | 
|  |  | 
|  | return SizeCalculator(fns[0], self.testcase.extra_sections) | 
|  |  | 
|  | def __repr__(self): | 
|  | return "<TestCase %s on %s>" % (self.testcase.name, self.platform.name) | 
|  |  | 
|  |  | 
|  | class CMake(): | 
|  | config_re = re.compile('(CONFIG_[A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$') | 
|  | dt_re = re.compile('([A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$') | 
|  |  | 
|  | def __init__(self, testcase, platform, source_dir, build_dir): | 
|  |  | 
|  | self.cwd = None | 
|  | self.capture_output = True | 
|  |  | 
|  | self.defconfig = {} | 
|  | self.cmake_cache = {} | 
|  |  | 
|  | self.instance = None | 
|  | self.testcase = testcase | 
|  | self.platform = platform | 
|  | self.source_dir = source_dir | 
|  | self.build_dir = build_dir | 
|  | self.log = "build.log" | 
|  |  | 
|  | def parse_generated(self): | 
|  | self.defconfig = {} | 
|  | return {} | 
|  |  | 
|  | def run_build(self, args=[]): | 
|  |  | 
|  | logger.debug("Building %s for %s" % (self.source_dir, self.platform.name)) | 
|  |  | 
|  | cmake_args = [] | 
|  | cmake_args.extend(args) | 
|  | cmake = shutil.which('cmake') | 
|  | cmd = [cmake] + cmake_args | 
|  | kwargs = dict() | 
|  |  | 
|  | if self.capture_output: | 
|  | kwargs['stdout'] = subprocess.PIPE | 
|  | # CMake sends the output of message() to stderr unless it's STATUS | 
|  | kwargs['stderr'] = subprocess.STDOUT | 
|  |  | 
|  | if self.cwd: | 
|  | kwargs['cwd'] = self.cwd | 
|  |  | 
|  | p = subprocess.Popen(cmd, **kwargs) | 
|  | out, _ = p.communicate() | 
|  |  | 
|  | results = {} | 
|  | if p.returncode == 0: | 
|  | msg = "Finished building %s for %s" % (self.source_dir, self.platform.name) | 
|  |  | 
|  | self.instance.status = "passed" | 
|  | results = {'msg': msg, "returncode": p.returncode, "instance": self.instance} | 
|  |  | 
|  | if out: | 
|  | log_msg = out.decode(sys.getdefaultencoding()) | 
|  | with open(os.path.join(self.build_dir, self.log), "a") as log: | 
|  | log.write(log_msg) | 
|  |  | 
|  | else: | 
|  | return None | 
|  | else: | 
|  | # A real error occurred, raise an exception | 
|  | if out: | 
|  | log_msg = out.decode(sys.getdefaultencoding()) | 
|  | with open(os.path.join(self.build_dir, self.log), "a") as log: | 
|  | log.write(log_msg) | 
|  |  | 
|  | if log_msg: | 
|  | res = re.findall("region `(FLASH|RAM|SRAM)' overflowed by", log_msg) | 
|  | if res: | 
|  | logger.debug("Test skipped due to {} Overflow".format(res[0])) | 
|  | self.instance.status = "skipped" | 
|  | self.instance.reason = "{} overflow".format(res[0]) | 
|  | else: | 
|  | self.instance.status = "failed" | 
|  | self.instance.reason = "Build failure" | 
|  |  | 
|  | results = { | 
|  | "returncode": p.returncode, | 
|  | "instance": self.instance, | 
|  | } | 
|  |  | 
|  | return results | 
|  |  | 
|  | def run_cmake(self, args=[]): | 
|  |  | 
|  | ldflags = "-Wl,--fatal-warnings" | 
|  | logger.debug("Running cmake on %s for %s" % (self.source_dir, self.platform.name)) | 
|  |  | 
|  | # fixme: add additional cflags based on options | 
|  | cmake_args = [ | 
|  | '-B{}'.format(self.build_dir), | 
|  | '-S{}'.format(self.source_dir), | 
|  | '-DEXTRA_CFLAGS="-Werror ', | 
|  | '-DEXTRA_AFLAGS=-Wa,--fatal-warnings', | 
|  | '-DEXTRA_LDFLAGS="{}'.format(ldflags), | 
|  | '-G{}'.format(get_generator()[1]) | 
|  | ] | 
|  |  | 
|  | if options.cmake_only: | 
|  | cmake_args.append("-DCMAKE_EXPORT_COMPILE_COMMANDS=1") | 
|  |  | 
|  | args = ["-D{}".format(a.replace('"', '')) for a in args] | 
|  | cmake_args.extend(args) | 
|  |  | 
|  | cmake_opts = ['-DBOARD={}'.format(self.platform.name)] | 
|  | cmake_args.extend(cmake_opts) | 
|  |  | 
|  |  | 
|  | logger.debug("Calling cmake with arguments: {}".format(cmake_args)) | 
|  | cmake = shutil.which('cmake') | 
|  | cmd = [cmake] + cmake_args | 
|  | kwargs = dict() | 
|  |  | 
|  | if self.capture_output: | 
|  | kwargs['stdout'] = subprocess.PIPE | 
|  | # CMake sends the output of message() to stderr unless it's STATUS | 
|  | kwargs['stderr'] = subprocess.STDOUT | 
|  |  | 
|  | if self.cwd: | 
|  | kwargs['cwd'] = self.cwd | 
|  |  | 
|  | p = subprocess.Popen(cmd, **kwargs) | 
|  | out, _ = p.communicate() | 
|  |  | 
|  | if p.returncode == 0: | 
|  | filter_results = self.parse_generated() | 
|  | msg = "Finished building %s for %s" % (self.source_dir, self.platform.name) | 
|  | logger.debug(msg) | 
|  | results = {'msg': msg, 'filter': filter_results} | 
|  |  | 
|  | else: | 
|  | self.instance.status = "failed" | 
|  | self.instance.reason = "Cmake build failure" | 
|  | logger.error("Cmake build failure: %s for %s" % (self.source_dir, self.platform.name)) | 
|  | results = {"returncode": p.returncode} | 
|  |  | 
|  | if out: | 
|  | with open(os.path.join(self.build_dir, self.log), "a") as log: | 
|  | log_msg = out.decode(sys.getdefaultencoding()) | 
|  | log.write(log_msg) | 
|  |  | 
|  | return results | 
|  |  | 
|  |  | 
|  | class FilterBuilder(CMake): | 
|  |  | 
|  | def __init__(self, testcase, platform, source_dir, build_dir): | 
|  | super().__init__(testcase, platform, source_dir, build_dir) | 
|  |  | 
|  | self.log = "config-sanitycheck.log" | 
|  |  | 
|  | def parse_generated(self): | 
|  |  | 
|  | if self.platform.name == "unit_testing": | 
|  | return {} | 
|  |  | 
|  | cmake_cache_path = os.path.join(self.build_dir, "CMakeCache.txt") | 
|  | defconfig_path = os.path.join(self.build_dir, "zephyr", ".config") | 
|  |  | 
|  | with open(defconfig_path, "r") as fp: | 
|  | defconfig = {} | 
|  | for line in fp.readlines(): | 
|  | m = self.config_re.match(line) | 
|  | if not m: | 
|  | if line.strip() and not line.startswith("#"): | 
|  | sys.stderr.write("Unrecognized line %s\n" % line) | 
|  | continue | 
|  | defconfig[m.group(1)] = m.group(2).strip() | 
|  |  | 
|  | self.defconfig = defconfig | 
|  |  | 
|  | cmake_conf = {} | 
|  | try: | 
|  | cache = CMakeCache.from_file(cmake_cache_path) | 
|  | except FileNotFoundError: | 
|  | cache = {} | 
|  |  | 
|  | for k in iter(cache): | 
|  | cmake_conf[k.name] = k.value | 
|  |  | 
|  | self.cmake_cache = cmake_conf | 
|  |  | 
|  | filter_data = { | 
|  | "ARCH": self.platform.arch, | 
|  | "PLATFORM": self.platform.name | 
|  | } | 
|  | filter_data.update(os.environ) | 
|  | filter_data.update(self.defconfig) | 
|  | filter_data.update(self.cmake_cache) | 
|  |  | 
|  | dts_path = os.path.join(self.build_dir, "zephyr", self.platform.name + ".dts.pre.tmp") | 
|  | if self.testcase and self.testcase.tc_filter: | 
|  | try: | 
|  | if os.path.exists(dts_path): | 
|  | edt = edtlib.EDT(dts_path, [os.path.join(ZEPHYR_BASE, "dts", "bindings")]) | 
|  | else: | 
|  | edt = None | 
|  | res = expr_parser.parse(self.testcase.tc_filter, filter_data, edt) | 
|  |  | 
|  | except (ValueError, SyntaxError) as se: | 
|  | sys.stderr.write( | 
|  | "Failed processing %s\n" % self.testcase.yamlfile) | 
|  | raise se | 
|  |  | 
|  | if not res: | 
|  | return {os.path.join(self.platform.name, self.testcase.name): True} | 
|  | else: | 
|  | return {os.path.join(self.platform.name, self.testcase.name): False} | 
|  | else: | 
|  | self.platform.filter_data = filter_data | 
|  | return filter_data | 
|  |  | 
|  |  | 
|  | class ProjectBuilder(FilterBuilder): | 
|  |  | 
|  | def __init__(self, suite, instance, **kwargs): | 
|  | super().__init__(instance.testcase, instance.platform, instance.testcase.source_dir, instance.build_dir) | 
|  |  | 
|  | self.log = "build.log" | 
|  | self.instance = instance | 
|  | self.suite = suite | 
|  |  | 
|  | self.lsan = kwargs.get('lsan', False) | 
|  | self.asan = kwargs.get('asan', False) | 
|  | self.valgrind = kwargs.get('valgrind', False) | 
|  | self.extra_args = kwargs.get('extra_args', []) | 
|  | self.device_testing = kwargs.get('device_testing', False) | 
|  | self.cmake_only = kwargs.get('cmake_only', False) | 
|  | self.cleanup = kwargs.get('cleanup', False) | 
|  | self.coverage = kwargs.get('coverage', False) | 
|  | self.inline_logs = kwargs.get('inline_logs', False) | 
|  |  | 
|  | @staticmethod | 
|  | def log_info(filename, inline_logs): | 
|  | filename = os.path.abspath(os.path.realpath(filename)) | 
|  | if inline_logs: | 
|  | logger.info("{:-^100}".format(filename)) | 
|  |  | 
|  | try: | 
|  | with open(filename) as fp: | 
|  | data = fp.read() | 
|  | except Exception as e: | 
|  | data = "Unable to read log data (%s)\n" % (str(e)) | 
|  |  | 
|  | logger.error(data) | 
|  |  | 
|  | logger.info("{:-^100}".format(filename)) | 
|  | else: | 
|  | logger.error("see: " + Fore.YELLOW + filename + Fore.RESET) | 
|  |  | 
|  | def log_info_file(self, inline_logs): | 
|  | build_dir = self.instance.build_dir | 
|  | h_log = "{}/handler.log".format(build_dir) | 
|  | b_log = "{}/build.log".format(build_dir) | 
|  | v_log = "{}/valgrind.log".format(build_dir) | 
|  | d_log = "{}/device.log".format(build_dir) | 
|  |  | 
|  | if os.path.exists(v_log) and "Valgrind" in self.instance.reason: | 
|  | self.log_info("{}".format(v_log), inline_logs) | 
|  | elif os.path.exists(h_log) and os.path.getsize(h_log) > 0: | 
|  | self.log_info("{}".format(h_log), inline_logs) | 
|  | elif os.path.exists(d_log) and os.path.getsize(d_log) > 0: | 
|  | self.log_info("{}".format(d_log), inline_logs) | 
|  | else: | 
|  | self.log_info("{}".format(b_log), inline_logs) | 
|  |  | 
|  | def setup_handler(self): | 
|  |  | 
|  | instance = self.instance | 
|  | args = [] | 
|  |  | 
|  | # FIXME: Needs simplification | 
|  | if instance.platform.simulation == "qemu": | 
|  | instance.handler = QEMUHandler(instance, "qemu") | 
|  | args.append("QEMU_PIPE=%s" % instance.handler.get_fifo()) | 
|  | instance.handler.call_make_run = True | 
|  | elif instance.testcase.type == "unit": | 
|  | instance.handler = BinaryHandler(instance, "unit") | 
|  | instance.handler.binary = os.path.join(instance.build_dir, "testbinary") | 
|  | elif instance.platform.type == "native": | 
|  | handler = BinaryHandler(instance, "native") | 
|  |  | 
|  | handler.asan = self.asan | 
|  | handler.valgrind = self.valgrind | 
|  | handler.lsan = self.lsan | 
|  | handler.coverage = self.coverage | 
|  |  | 
|  | handler.binary = os.path.join(instance.build_dir, "zephyr", "zephyr.exe") | 
|  | instance.handler = handler | 
|  | elif instance.platform.simulation == "nsim": | 
|  | if find_executable("nsimdrv"): | 
|  | instance.handler = BinaryHandler(instance, "nsim") | 
|  | instance.handler.call_make_run = True | 
|  | elif instance.platform.simulation == "renode": | 
|  | if find_executable("renode"): | 
|  | instance.handler = BinaryHandler(instance, "renode") | 
|  | instance.handler.pid_fn = os.path.join(instance.build_dir, "renode.pid") | 
|  | instance.handler.call_make_run = True | 
|  | elif self.device_testing: | 
|  | instance.handler = DeviceHandler(instance, "device") | 
|  |  | 
|  | if instance.handler: | 
|  | instance.handler.args = args | 
|  |  | 
|  | def process(self, message): | 
|  | op = message.get('op') | 
|  |  | 
|  | if not self.instance.handler: | 
|  | self.setup_handler() | 
|  |  | 
|  | # The build process, call cmake and build with configured generator | 
|  | if op == "cmake": | 
|  | results = self.cmake() | 
|  | if self.instance.status == "failed": | 
|  | pipeline.put({"op": "report", "test": self.instance}) | 
|  | elif self.cmake_only: | 
|  | pipeline.put({"op": "report", "test": self.instance}) | 
|  | else: | 
|  | if self.instance.name in results['filter'] and results['filter'][self.instance.name]: | 
|  | logger.debug("filtering %s" % self.instance.name) | 
|  | self.instance.status = "skipped" | 
|  | self.instance.reason = "filter" | 
|  | pipeline.put({"op": "report", "test": self.instance}) | 
|  | else: | 
|  | pipeline.put({"op": "build", "test": self.instance}) | 
|  |  | 
|  | elif op == "build": | 
|  | logger.debug("build test: %s" % self.instance.name) | 
|  | results = self.build() | 
|  |  | 
|  | if results.get('returncode', 1) > 0: | 
|  | pipeline.put({"op": "report", "test": self.instance}) | 
|  | else: | 
|  | if self.instance.run: | 
|  | pipeline.put({"op": "run", "test": self.instance}) | 
|  | else: | 
|  | pipeline.put({"op": "report", "test": self.instance}) | 
|  | # Run the generated binary using one of the supported handlers | 
|  | elif op == "run": | 
|  | logger.debug("run test: %s" % self.instance.name) | 
|  | self.run() | 
|  | self.instance.status, _ = self.instance.handler.get_state() | 
|  | pipeline.put({ | 
|  | "op": "report", | 
|  | "test": self.instance, | 
|  | "state": "executed", | 
|  | "status": self.instance.status, | 
|  | "reason": self.instance.reason} | 
|  | ) | 
|  |  | 
|  | # Report results and output progress to screen | 
|  | elif op == "report": | 
|  | with report_lock: | 
|  | self.report_out() | 
|  |  | 
|  | if self.cleanup and self.instance.status == "passed": | 
|  | pipeline.put({ | 
|  | "op": "cleanup", | 
|  | "test": self.instance | 
|  | }) | 
|  |  | 
|  | elif op == "cleanup": | 
|  | self.cleanup_artifacts() | 
|  |  | 
|  | def cleanup_artifacts(self): | 
|  | logger.debug("Cleaning up {}".format(self.instance.build_dir)) | 
|  | whitelist = [ | 
|  | 'zephyr/.config', | 
|  | 'handler.log', | 
|  | 'build.log', | 
|  | 'device.log', | 
|  | ] | 
|  | whitelist = [os.path.join(self.instance.build_dir, file) for file in whitelist] | 
|  |  | 
|  | for dirpath, dirnames, filenames in os.walk(self.instance.build_dir, topdown=False): | 
|  | for name in filenames: | 
|  | path = os.path.join(dirpath, name) | 
|  | if path not in whitelist: | 
|  | os.remove(path) | 
|  | # Remove empty directories and symbolic links to directories | 
|  | for dir in dirnames: | 
|  | path = os.path.join(dirpath, dir) | 
|  | if os.path.islink(path): | 
|  | os.remove(path) | 
|  | elif not os.listdir(path): | 
|  | os.rmdir(path) | 
|  |  | 
|  | def report_out(self): | 
|  | total_tests_width = len(str(self.suite.total_tests)) | 
|  | self.suite.total_done += 1 | 
|  | instance = self.instance | 
|  |  | 
|  | if instance.status in ["failed", "timeout"]: | 
|  | self.suite.total_failed += 1 | 
|  | if VERBOSE: | 
|  | status = Fore.RED + "FAILED " + Fore.RESET + instance.reason | 
|  | else: | 
|  | print("") | 
|  | logger.error( | 
|  | "{:<25} {:<50} {}FAILED{}: {}".format( | 
|  | instance.platform.name, | 
|  | instance.testcase.name, | 
|  | Fore.RED, | 
|  | Fore.RESET, | 
|  | instance.reason)) | 
|  | if not VERBOSE: | 
|  | self.log_info_file(self.inline_logs) | 
|  | elif instance.status == "skipped": | 
|  | self.suite.total_skipped += 1 | 
|  | status = Fore.YELLOW + "SKIPPED" + Fore.RESET | 
|  | else: | 
|  | status = Fore.GREEN + "PASSED" + Fore.RESET | 
|  |  | 
|  | if VERBOSE: | 
|  | if self.cmake_only: | 
|  | more_info = "cmake" | 
|  | elif instance.status == "skipped": | 
|  | more_info = instance.reason | 
|  | else: | 
|  | if instance.handler and instance.run: | 
|  | more_info = instance.handler.type_str | 
|  | htime = instance.handler.duration | 
|  | if htime: | 
|  | more_info += " {:.3f}s".format(htime) | 
|  | else: | 
|  | more_info = "build" | 
|  |  | 
|  | logger.info("{:>{}}/{} {:<25} {:<50} {} ({})".format( | 
|  | self.suite.total_done, total_tests_width, self.suite.total_tests, instance.platform.name, | 
|  | instance.testcase.name, status, more_info)) | 
|  |  | 
|  | if instance.status in ["failed", "timeout"]: | 
|  | self.log_info_file(self.inline_logs) | 
|  | else: | 
|  | sys.stdout.write("\rINFO    - Total complete: %s%4d/%4d%s  %2d%%  skipped: %s%4d%s, failed: %s%4d%s" % ( | 
|  | Fore.GREEN, | 
|  | self.suite.total_done, | 
|  | self.suite.total_tests, | 
|  | Fore.RESET, | 
|  | int((float(self.suite.total_done) / self.suite.total_tests) * 100), | 
|  | Fore.YELLOW if self.suite.total_skipped > 0 else Fore.RESET, | 
|  | self.suite.total_skipped, | 
|  | Fore.RESET, | 
|  | Fore.RED if self.suite.total_failed > 0 else Fore.RESET, | 
|  | self.suite.total_failed, | 
|  | Fore.RESET | 
|  | ) | 
|  | ) | 
|  | sys.stdout.flush() | 
|  |  | 
|  | def cmake(self): | 
|  |  | 
|  | instance = self.instance | 
|  | args = self.testcase.extra_args[:] | 
|  | args += self.extra_args | 
|  |  | 
|  | if instance.handler: | 
|  | args += instance.handler.args | 
|  |  | 
|  | # merge overlay files into one variable | 
|  | def extract_overlays(args): | 
|  | re_overlay = re.compile('OVERLAY_CONFIG=(.*)') | 
|  | other_args = [] | 
|  | overlays = [] | 
|  | for arg in args: | 
|  | match = re_overlay.search(arg) | 
|  | if match: | 
|  | overlays.append(match.group(1).strip('\'"')) | 
|  | else: | 
|  | other_args.append(arg) | 
|  |  | 
|  | args[:] = other_args | 
|  | return overlays | 
|  |  | 
|  | overlays = extract_overlays(args) | 
|  |  | 
|  | if (self.testcase.extra_configs or self.coverage or | 
|  | self.asan): | 
|  | overlays.append(os.path.join(instance.build_dir, | 
|  | "sanitycheck", "testcase_extra.conf")) | 
|  |  | 
|  | if overlays: | 
|  | args.append("OVERLAY_CONFIG=\"%s\"" % (" ".join(overlays))) | 
|  |  | 
|  | results = self.run_cmake(args) | 
|  | return results | 
|  |  | 
|  | def build(self): | 
|  | results = self.run_build(['--build', self.build_dir]) | 
|  | return results | 
|  |  | 
|  | def run(self): | 
|  |  | 
|  | instance = self.instance | 
|  |  | 
|  | if instance.handler.type_str == "device": | 
|  | instance.handler.suite = self.suite | 
|  |  | 
|  | instance.handler.handle() | 
|  |  | 
|  | sys.stdout.flush() | 
|  |  | 
|  |  | 
|  | pipeline = queue.LifoQueue() | 
|  |  | 
|  |  | 
|  | class BoundedExecutor(concurrent.futures.ThreadPoolExecutor): | 
|  | """BoundedExecutor behaves as a ThreadPoolExecutor which will block on | 
|  | calls to submit() once the limit given as "bound" work items are queued for | 
|  | execution. | 
|  | :param bound: Integer - the maximum number of items in the work queue | 
|  | :param max_workers: Integer - the size of the thread pool | 
|  | """ | 
|  |  | 
|  | def __init__(self, bound, max_workers, **kwargs): | 
|  | super().__init__(max_workers) | 
|  | # self.executor = ThreadPoolExecutor(max_workers=max_workers) | 
|  | self.semaphore = BoundedSemaphore(bound + max_workers) | 
|  |  | 
|  | def submit(self, fn, *args, **kwargs): | 
|  | self.semaphore.acquire() | 
|  | try: | 
|  | future = super().submit(fn, *args, **kwargs) | 
|  | except Exception: | 
|  | self.semaphore.release() | 
|  | raise | 
|  | else: | 
|  | future.add_done_callback(lambda x: self.semaphore.release()) | 
|  | return future | 
|  |  | 
|  |  | 
|  | class TestSuite: | 
|  | config_re = re.compile('(CONFIG_[A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$') | 
|  | dt_re = re.compile('([A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$') | 
|  |  | 
|  | tc_schema = scl.yaml_load( | 
|  | os.path.join(ZEPHYR_BASE, | 
|  | "scripts", "sanity_chk", "testcase-schema.yaml")) | 
|  |  | 
|  | def __init__(self, board_root_list, testcase_roots, outdir): | 
|  |  | 
|  | self.roots = testcase_roots | 
|  | if not isinstance(board_root_list, list): | 
|  | self.board_roots = [board_root_list] | 
|  | else: | 
|  | self.board_roots = board_root_list | 
|  |  | 
|  | # Testsuite Options | 
|  | self.coverage_platform = [] | 
|  | self.build_only = False | 
|  | self.cmake_only = False | 
|  | self.cleanup = False | 
|  | self.enable_slow = False | 
|  | self.device_testing = False | 
|  | self.fixture = [] | 
|  | self.enable_coverage = False | 
|  | self.enable_lsan = False | 
|  | self.enable_asan = False | 
|  | self.enable_valgrind = False | 
|  | self.extra_args = [] | 
|  | self.inline_logs = False | 
|  | self.enable_sizes_report = False | 
|  |  | 
|  | # Keep track of which test cases we've filtered out and why | 
|  | self.testcases = {} | 
|  | self.platforms = [] | 
|  | self.selected_platforms = [] | 
|  | self.default_platforms = [] | 
|  | self.outdir = os.path.abspath(outdir) | 
|  | self.discards = None | 
|  | self.load_errors = 0 | 
|  | self.instances = dict() | 
|  |  | 
|  | self.total_tests = 0  # number of test instances | 
|  | self.total_cases = 0  # number of test cases | 
|  | self.total_done = 0  # tests completed | 
|  | self.total_failed = 0 | 
|  | self.total_skipped = 0 | 
|  |  | 
|  | self.total_platforms = 0 | 
|  | self.start_time = 0 | 
|  | self.duration = 0 | 
|  | self.warnings = 0 | 
|  | self.cv = threading.Condition() | 
|  |  | 
|  | # hardcoded for now | 
|  | self.connected_hardware = [] | 
|  |  | 
|  | def config(self): | 
|  | logger.info("coverage platform: {}".format(self.coverage_platform)) | 
|  |  | 
|  | # Debug Functions | 
|  | @staticmethod | 
|  | def info(what): | 
|  | sys.stdout.write(what + "\n") | 
|  | sys.stdout.flush() | 
|  |  | 
|  | def update(self): | 
|  | self.total_tests = len(self.instances) | 
|  | self.total_cases = len(self.testcases) | 
|  |  | 
|  | def compare_metrics(self, filename): | 
|  | # name, datatype, lower results better | 
|  | interesting_metrics = [("ram_size", int, True), | 
|  | ("rom_size", int, True)] | 
|  |  | 
|  | if not os.path.exists(filename): | 
|  | logger.info("Cannot compare metrics, %s not found" % filename) | 
|  | return [] | 
|  |  | 
|  | results = [] | 
|  | saved_metrics = {} | 
|  | with open(filename) as fp: | 
|  | cr = csv.DictReader(fp) | 
|  | for row in cr: | 
|  | d = {} | 
|  | for m, _, _ in interesting_metrics: | 
|  | d[m] = row[m] | 
|  | saved_metrics[(row["test"], row["platform"])] = d | 
|  |  | 
|  | for instance in self.instances.values(): | 
|  | mkey = (instance.testcase.name, instance.platform.name) | 
|  | if mkey not in saved_metrics: | 
|  | continue | 
|  | sm = saved_metrics[mkey] | 
|  | for metric, mtype, lower_better in interesting_metrics: | 
|  | if metric not in instance.metrics: | 
|  | continue | 
|  | if sm[metric] == "": | 
|  | continue | 
|  | delta = instance.metrics.get(metric, 0) - mtype(sm[metric]) | 
|  | if delta == 0: | 
|  | continue | 
|  | results.append((instance, metric, instance.metrics.get(metric, 0), delta, | 
|  | lower_better)) | 
|  | return results | 
|  |  | 
|  | def misc_reports(self, report, show_footprint, all_deltas, | 
|  | footprint_threshold, last_metrics): | 
|  |  | 
|  | if not report: | 
|  | return | 
|  |  | 
|  | deltas = self.compare_metrics(report) | 
|  | warnings = 0 | 
|  | if deltas and show_footprint: | 
|  | for i, metric, value, delta, lower_better in deltas: | 
|  | if not all_deltas and ((delta < 0 and lower_better) or | 
|  | (delta > 0 and not lower_better)): | 
|  | continue | 
|  |  | 
|  | percentage = (float(delta) / float(value - delta)) | 
|  | if not all_deltas and (percentage < | 
|  | (footprint_threshold / 100.0)): | 
|  | continue | 
|  |  | 
|  | logger.info("{:<25} {:<60} {}{}{}: {} {:<+4}, is now {:6} {:+.2%}".format( | 
|  | i.platform.name, i.testcase.name, Fore.YELLOW, | 
|  | "INFO" if all_deltas else "WARNING", Fore.RESET, | 
|  | metric, delta, value, percentage)) | 
|  | warnings += 1 | 
|  |  | 
|  | if warnings: | 
|  | logger.warning("Deltas based on metrics from last %s" % | 
|  | ("release" if not last_metrics else "run")) | 
|  |  | 
|  | def summary(self, unrecognized_sections): | 
|  | failed = 0 | 
|  | for instance in self.instances.values(): | 
|  | if instance.status == "failed": | 
|  | failed += 1 | 
|  | elif instance.metrics.get("unrecognized") and not unrecognized_sections: | 
|  | logger.error("%sFAILED%s: %s has unrecognized binary sections: %s" % | 
|  | (Fore.RED, Fore.RESET, instance.name, | 
|  | str(instance.metrics.get("unrecognized", [])))) | 
|  | failed += 1 | 
|  |  | 
|  | if self.total_tests and self.total_tests != self.total_skipped: | 
|  | pass_rate = (float(self.total_tests - self.total_failed - self.total_skipped) / float( | 
|  | self.total_tests - self.total_skipped)) | 
|  | else: | 
|  | pass_rate = 0 | 
|  |  | 
|  | logger.info( | 
|  | "{}{} of {}{} tests passed ({:.2%}), {}{}{} failed, {} skipped with {}{}{} warnings in {:.2f} seconds".format( | 
|  | Fore.RED if failed else Fore.GREEN, | 
|  | self.total_tests - self.total_failed - self.total_skipped, | 
|  | self.total_tests - self.total_skipped, | 
|  | Fore.RESET, | 
|  | pass_rate, | 
|  | Fore.RED if self.total_failed else Fore.RESET, | 
|  | self.total_failed, | 
|  | Fore.RESET, | 
|  | self.total_skipped, | 
|  | Fore.YELLOW if self.warnings else Fore.RESET, | 
|  | self.warnings, | 
|  | Fore.RESET, | 
|  | self.duration)) | 
|  |  | 
|  | self.total_platforms = len(self.platforms) | 
|  | if self.platforms: | 
|  | logger.info("In total {} test cases were executed on {} out of total {} platforms ({:02.2f}%)".format( | 
|  | self.total_cases, | 
|  | len(self.selected_platforms), | 
|  | self.total_platforms, | 
|  | (100 * len(self.selected_platforms) / len(self.platforms)) | 
|  | )) | 
|  |  | 
|  | def save_reports(self, name, report_dir, no_update, release, only_failed): | 
|  | if not self.instances: | 
|  | return | 
|  |  | 
|  | if name: | 
|  | report_name = name | 
|  | else: | 
|  | report_name = "sanitycheck" | 
|  |  | 
|  | if report_dir: | 
|  | os.makedirs(report_dir, exist_ok=True) | 
|  | filename = os.path.join(report_dir, report_name) | 
|  | outdir = report_dir | 
|  | else: | 
|  | filename = os.path.join(self.outdir, report_name) | 
|  | outdir = self.outdir | 
|  |  | 
|  | if not no_update: | 
|  | self.xunit_report(filename + ".xml", only_failed) | 
|  | self.csv_report(filename + ".csv") | 
|  | self.target_report(outdir) | 
|  | if self.discards: | 
|  | self.discard_report(filename + "_discard.csv") | 
|  |  | 
|  | if release: | 
|  | self.csv_report(RELEASE_DATA) | 
|  |  | 
|  | def add_configurations(self): | 
|  |  | 
|  | for board_root in self.board_roots: | 
|  | board_root = os.path.abspath(board_root) | 
|  |  | 
|  | logger.debug("Reading platform configuration files under %s..." % | 
|  | board_root) | 
|  |  | 
|  | for file in glob.glob(os.path.join(board_root, "*", "*", "*.yaml")): | 
|  | logger.debug("Found platform configuration " + file) | 
|  | try: | 
|  | platform = Platform() | 
|  | platform.load(file) | 
|  | if platform.sanitycheck: | 
|  | self.platforms.append(platform) | 
|  | if platform.default: | 
|  | self.default_platforms.append(platform.name) | 
|  |  | 
|  | except RuntimeError as e: | 
|  | logger.error("E: %s: can't load: %s" % (file, e)) | 
|  | self.load_errors += 1 | 
|  |  | 
|  | def get_all_tests(self): | 
|  | tests = [] | 
|  | for _, tc in self.testcases.items(): | 
|  | for case in tc.cases: | 
|  | tests.append(case) | 
|  |  | 
|  | return tests | 
|  |  | 
|  | @staticmethod | 
|  | def get_toolchain(): | 
|  | toolchain = os.environ.get("ZEPHYR_TOOLCHAIN_VARIANT", None) or \ | 
|  | os.environ.get("ZEPHYR_GCC_VARIANT", None) | 
|  |  | 
|  | if toolchain == "gccarmemb": | 
|  | # Remove this translation when gccarmemb is no longer supported. | 
|  | toolchain = "gnuarmemb" | 
|  |  | 
|  | try: | 
|  | if not toolchain: | 
|  | raise SanityRuntimeError("E: Variable ZEPHYR_TOOLCHAIN_VARIANT is not defined") | 
|  | except Exception as e: | 
|  | print(str(e)) | 
|  | sys.exit(2) | 
|  |  | 
|  | return toolchain | 
|  |  | 
|  | def add_testcases(self): | 
|  | for root in self.roots: | 
|  | root = os.path.abspath(root) | 
|  |  | 
|  | logger.debug("Reading test case configuration files under %s..." % root) | 
|  |  | 
|  | for dirpath, dirnames, filenames in os.walk(root, topdown=True): | 
|  | logger.debug("scanning %s" % dirpath) | 
|  | if 'sample.yaml' in filenames: | 
|  | filename = 'sample.yaml' | 
|  | elif 'testcase.yaml' in filenames: | 
|  | filename = 'testcase.yaml' | 
|  | else: | 
|  | continue | 
|  |  | 
|  | logger.debug("Found possible test case in " + dirpath) | 
|  |  | 
|  | dirnames[:] = [] | 
|  | tc_path = os.path.join(dirpath, filename) | 
|  | self.add_testcase(tc_path, root) | 
|  |  | 
|  | def add_testcase(self, tc_data_file, root): | 
|  | try: | 
|  | parsed_data = SanityConfigParser(tc_data_file, self.tc_schema) | 
|  | parsed_data.load() | 
|  |  | 
|  | tc_path = os.path.dirname(tc_data_file) | 
|  | workdir = os.path.relpath(tc_path, root) | 
|  |  | 
|  | for name in parsed_data.tests.keys(): | 
|  | tc = TestCase() | 
|  | tc.name = tc.get_unique(root, workdir, name) | 
|  |  | 
|  | tc_dict = parsed_data.get_test(name, testcase_valid_keys) | 
|  |  | 
|  | tc.source_dir = tc_path | 
|  | tc.yamlfile = tc_data_file | 
|  |  | 
|  | tc.id = name | 
|  | tc.type = tc_dict["type"] | 
|  | tc.tags = tc_dict["tags"] | 
|  | tc.extra_args = tc_dict["extra_args"] | 
|  | tc.extra_configs = tc_dict["extra_configs"] | 
|  | tc.arch_whitelist = tc_dict["arch_whitelist"] | 
|  | tc.arch_exclude = tc_dict["arch_exclude"] | 
|  | tc.skip = tc_dict["skip"] | 
|  | tc.platform_exclude = tc_dict["platform_exclude"] | 
|  | tc.platform_whitelist = tc_dict["platform_whitelist"] | 
|  | tc.toolchain_exclude = tc_dict["toolchain_exclude"] | 
|  | tc.toolchain_whitelist = tc_dict["toolchain_whitelist"] | 
|  | tc.tc_filter = tc_dict["filter"] | 
|  | tc.timeout = tc_dict["timeout"] | 
|  | tc.harness = tc_dict["harness"] | 
|  | tc.harness_config = tc_dict["harness_config"] | 
|  | tc.build_only = tc_dict["build_only"] | 
|  | tc.build_on_all = tc_dict["build_on_all"] | 
|  | tc.slow = tc_dict["slow"] | 
|  | tc.min_ram = tc_dict["min_ram"] | 
|  | tc.depends_on = tc_dict["depends_on"] | 
|  | tc.min_flash = tc_dict["min_flash"] | 
|  | tc.extra_sections = tc_dict["extra_sections"] | 
|  |  | 
|  | tc.parse_subcases(tc_path) | 
|  |  | 
|  | if tc.name: | 
|  | self.testcases[tc.name] = tc | 
|  |  | 
|  | except Exception as e: | 
|  | logger.error("%s: can't load (skipping): %s" % (tc_data_file, e)) | 
|  | self.load_errors += 1 | 
|  | return False | 
|  |  | 
|  | return True | 
|  |  | 
|  | def get_platform(self, name): | 
|  | selected_platform = None | 
|  | for platform in self.platforms: | 
|  | if platform.name == name: | 
|  | selected_platform = platform | 
|  | break | 
|  | return selected_platform | 
|  |  | 
|  | def get_last_failed(self): | 
|  | last_run = os.path.join(self.outdir, "sanitycheck.csv") | 
|  | try: | 
|  | if not os.path.exists(last_run): | 
|  | raise SanityRuntimeError("Couldn't find last sanitycheck run.: %s" % last_run) | 
|  | except Exception as e: | 
|  | print(str(e)) | 
|  | sys.exit(2) | 
|  |  | 
|  | total_tests = 0 | 
|  | with open(last_run, "r") as fp: | 
|  | cr = csv.DictReader(fp) | 
|  | instance_list = [] | 
|  | for row in cr: | 
|  | total_tests += 1 | 
|  | if row["passed"] == "True": | 
|  | continue | 
|  | test = row["test"] | 
|  | platform = self.get_platform(row["platform"]) | 
|  | instance = TestInstance(self.testcases[test], platform, self.outdir) | 
|  | instance.check_build_or_run( | 
|  | self.build_only, | 
|  | self.enable_slow, | 
|  | self.device_testing, | 
|  | self.fixture | 
|  | ) | 
|  | instance.create_overlay(platform, self.enable_asan, self.enable_coverage, self.coverage_platform) | 
|  | instance_list.append(instance) | 
|  | self.add_instances(instance_list) | 
|  |  | 
|  | tests_to_run = len(self.instances) | 
|  | logger.info("%d tests passed already, retrying %d tests" % (total_tests - tests_to_run, tests_to_run)) | 
|  |  | 
|  | def load_from_file(self, file): | 
|  | try: | 
|  | if not os.path.exists(file): | 
|  | raise SanityRuntimeError( | 
|  | "Couldn't find input file with list of tests.") | 
|  | except Exception as e: | 
|  | print(str(e)) | 
|  | sys.exit(2) | 
|  |  | 
|  | with open(file, "r") as fp: | 
|  | cr = csv.DictReader(fp) | 
|  | instance_list = [] | 
|  | for row in cr: | 
|  | if row["arch"] == "arch": | 
|  | continue | 
|  | test = row["test"] | 
|  | platform = self.get_platform(row["platform"]) | 
|  | instance = TestInstance(self.testcases[test], platform, self.outdir) | 
|  | instance.check_build_or_run( | 
|  | self.build_only, | 
|  | self.enable_slow, | 
|  | self.device_testing, | 
|  | self.fixture | 
|  | ) | 
|  | instance.create_overlay(platform, self.enable_asan, self.enable_coverage, self.coverage_platform) | 
|  | instance_list.append(instance) | 
|  | self.add_instances(instance_list) | 
|  |  | 
|  | def apply_filters(self, **kwargs): | 
|  |  | 
|  | toolchain = self.get_toolchain() | 
|  |  | 
|  | discards = {} | 
|  | platform_filter = kwargs.get('platform') | 
|  | exclude_platform = kwargs.get('exclude_platform') | 
|  | testcase_filter = kwargs.get('run_individual_tests') | 
|  | arch_filter = kwargs.get('arch') | 
|  | tag_filter = kwargs.get('tag') | 
|  | exclude_tag = kwargs.get('exclude_tag') | 
|  | all_filter = kwargs.get('all') | 
|  | device_testing_filter = kwargs.get('device_testing') | 
|  | force_toolchain = kwargs.get('force_toolchain') | 
|  |  | 
|  | logger.debug("platform filter: " + str(platform_filter)) | 
|  | logger.debug("    arch_filter: " + str(arch_filter)) | 
|  | logger.debug("     tag_filter: " + str(tag_filter)) | 
|  | logger.debug("    exclude_tag: " + str(exclude_tag)) | 
|  |  | 
|  | default_platforms = False | 
|  |  | 
|  | if platform_filter: | 
|  | platforms = list(filter(lambda p: p.name in platform_filter, self.platforms)) | 
|  | else: | 
|  | platforms = self.platforms | 
|  |  | 
|  | if all_filter: | 
|  | logger.info("Selecting all possible platforms per test case") | 
|  | # When --all used, any --platform arguments ignored | 
|  | platform_filter = [] | 
|  | elif not platform_filter: | 
|  | logger.info("Selecting default platforms per test case") | 
|  | default_platforms = True | 
|  |  | 
|  | logger.info("Building initial testcase list...") | 
|  |  | 
|  | for tc_name, tc in self.testcases.items(): | 
|  | # list of instances per testcase, aka configurations. | 
|  | instance_list = [] | 
|  | for plat in platforms: | 
|  | instance = TestInstance(tc, plat, self.outdir) | 
|  | instance.check_build_or_run( | 
|  | self.build_only, | 
|  | self.enable_slow, | 
|  | self.device_testing, | 
|  | self.fixture | 
|  | ) | 
|  | if plat.name in exclude_platform: | 
|  | discards[instance] = "Platform is excluded on command line." | 
|  | continue | 
|  |  | 
|  | if (plat.arch == "unit") != (tc.type == "unit"): | 
|  | # Discard silently | 
|  | continue | 
|  |  | 
|  | if device_testing_filter and instance.build_only: | 
|  | discards[instance] = "Not runnable on device" | 
|  | continue | 
|  |  | 
|  | if tc.skip: | 
|  | discards[instance] = "Skip filter" | 
|  | continue | 
|  |  | 
|  | if tc.build_on_all and not platform_filter: | 
|  | platform_filter = [] | 
|  |  | 
|  | if tag_filter and not tc.tags.intersection(tag_filter): | 
|  | discards[instance] = "Command line testcase tag filter" | 
|  | continue | 
|  |  | 
|  | if exclude_tag and tc.tags.intersection(exclude_tag): | 
|  | discards[instance] = "Command line testcase exclude filter" | 
|  | continue | 
|  |  | 
|  | if testcase_filter and tc_name not in testcase_filter: | 
|  | discards[instance] = "Testcase name filter" | 
|  | continue | 
|  |  | 
|  | if arch_filter and plat.arch not in arch_filter: | 
|  | discards[instance] = "Command line testcase arch filter" | 
|  | continue | 
|  |  | 
|  | if tc.arch_whitelist and plat.arch not in tc.arch_whitelist: | 
|  | discards[instance] = "Not in test case arch whitelist" | 
|  | continue | 
|  |  | 
|  | if tc.arch_exclude and plat.arch in tc.arch_exclude: | 
|  | discards[instance] = "In test case arch exclude" | 
|  | continue | 
|  |  | 
|  | if tc.platform_exclude and plat.name in tc.platform_exclude: | 
|  | discards[instance] = "In test case platform exclude" | 
|  | continue | 
|  |  | 
|  | if tc.toolchain_exclude and toolchain in tc.toolchain_exclude: | 
|  | discards[instance] = "In test case toolchain exclude" | 
|  | continue | 
|  |  | 
|  | if platform_filter and plat.name not in platform_filter: | 
|  | discards[instance] = "Command line platform filter" | 
|  | continue | 
|  |  | 
|  | if tc.platform_whitelist and plat.name not in tc.platform_whitelist: | 
|  | discards[instance] = "Not in testcase platform whitelist" | 
|  | continue | 
|  |  | 
|  | if tc.toolchain_whitelist and toolchain not in tc.toolchain_whitelist: | 
|  | discards[instance] = "Not in testcase toolchain whitelist" | 
|  | continue | 
|  |  | 
|  | if not plat.env_satisfied: | 
|  | discards[instance] = "Environment ({}) not satisfied".format(", ".join(plat.env)) | 
|  | continue | 
|  |  | 
|  | if not force_toolchain \ | 
|  | and toolchain and (toolchain not in plat.supported_toolchains) \ | 
|  | and tc.type != 'unit': | 
|  | discards[instance] = "Not supported by the toolchain" | 
|  | continue | 
|  |  | 
|  | if plat.ram < tc.min_ram: | 
|  | discards[instance] = "Not enough RAM" | 
|  | continue | 
|  |  | 
|  | if tc.depends_on: | 
|  | dep_intersection = tc.depends_on.intersection(set(plat.supported)) | 
|  | if dep_intersection != set(tc.depends_on): | 
|  | discards[instance] = "No hardware support" | 
|  | continue | 
|  |  | 
|  | if plat.flash < tc.min_flash: | 
|  | discards[instance] = "Not enough FLASH" | 
|  | continue | 
|  |  | 
|  | if set(plat.ignore_tags) & tc.tags: | 
|  | discards[instance] = "Excluded tags per platform" | 
|  | continue | 
|  |  | 
|  | # if nothing stopped us until now, it means this configuration | 
|  | # needs to be added. | 
|  | instance_list.append(instance) | 
|  |  | 
|  | # no configurations, so jump to next testcase | 
|  | if not instance_list: | 
|  | continue | 
|  |  | 
|  | # if sanitycheck was launched with no platform options at all, we | 
|  | # take all default platforms | 
|  | if default_platforms and not tc.build_on_all: | 
|  | if tc.platform_whitelist: | 
|  | a = set(self.default_platforms) | 
|  | b = set(tc.platform_whitelist) | 
|  | c = a.intersection(b) | 
|  | if c: | 
|  | aa = list(filter(lambda tc: tc.platform.name in c, instance_list)) | 
|  | self.add_instances(aa) | 
|  | else: | 
|  | self.add_instances(instance_list[:1]) | 
|  | else: | 
|  | instances = list(filter(lambda tc: tc.platform.default, instance_list)) | 
|  | self.add_instances(instances) | 
|  |  | 
|  | for instance in list(filter(lambda tc: not tc.platform.default, instance_list)): | 
|  | discards[instance] = "Not a default test platform" | 
|  |  | 
|  | else: | 
|  | self.add_instances(instance_list) | 
|  |  | 
|  | for _, case in self.instances.items(): | 
|  | case.create_overlay(case.platform, self.enable_asan, self.enable_coverage, self.coverage_platform) | 
|  |  | 
|  | self.discards = discards | 
|  | self.selected_platforms = set(p.platform.name for p in self.instances.values()) | 
|  |  | 
|  | return discards | 
|  |  | 
|  | def add_instances(self, instance_list): | 
|  | for instance in instance_list: | 
|  | self.instances[instance.name] = instance | 
|  |  | 
|  | def add_tasks_to_queue(self, test_only=False): | 
|  | for instance in self.instances.values(): | 
|  | if test_only: | 
|  | if instance.run: | 
|  | pipeline.put({"op": "run", "test": instance, "status": "built"}) | 
|  | else: | 
|  | if instance.status not in ['passed', 'skipped']: | 
|  | instance.status = None | 
|  | pipeline.put({"op": "cmake", "test": instance}) | 
|  |  | 
|  | return "DONE FEEDING" | 
|  |  | 
|  | def execute(self): | 
|  | def calc_one_elf_size(instance): | 
|  | if instance.status not in ["failed", "skipped"]: | 
|  | if instance.platform.type != "native": | 
|  | size_calc = instance.calculate_sizes() | 
|  | instance.metrics["ram_size"] = size_calc.get_ram_size() | 
|  | instance.metrics["rom_size"] = size_calc.get_rom_size() | 
|  | instance.metrics["unrecognized"] = size_calc.unrecognized_sections() | 
|  | else: | 
|  | instance.metrics["ram_size"] = 0 | 
|  | instance.metrics["rom_size"] = 0 | 
|  | instance.metrics["unrecognized"] = [] | 
|  |  | 
|  | instance.metrics["handler_time"] = instance.handler.duration if instance.handler else 0 | 
|  |  | 
|  | logger.info("Adding tasks to the queue...") | 
|  | # We can use a with statement to ensure threads are cleaned up promptly | 
|  | with BoundedExecutor(bound=self.jobs, max_workers=self.jobs) as executor: | 
|  |  | 
|  | # start a future for a thread which sends work in through the queue | 
|  | future_to_test = { | 
|  | executor.submit(self.add_tasks_to_queue, self.test_only): 'FEEDER DONE'} | 
|  |  | 
|  | while future_to_test: | 
|  | # check for status of the futures which are currently working | 
|  | done, pending = concurrent.futures.wait(future_to_test, timeout=1, | 
|  | return_when=concurrent.futures.FIRST_COMPLETED) | 
|  |  | 
|  | # if there is incoming work, start a new future | 
|  | while not pipeline.empty(): | 
|  | # fetch a url from the queue | 
|  | message = pipeline.get() | 
|  | test = message['test'] | 
|  |  | 
|  | pb = ProjectBuilder(self, | 
|  | test, | 
|  | lsan=self.enable_lsan, | 
|  | asan=self.enable_asan, | 
|  | coverage=self.enable_coverage, | 
|  | extra_args=self.extra_args, | 
|  | device_testing=self.device_testing, | 
|  | cmake_only=self.cmake_only, | 
|  | cleanup=self.cleanup, | 
|  | valgrind=self.enable_valgrind, | 
|  | inline_logs=self.inline_logs | 
|  | ) | 
|  | future_to_test[executor.submit(pb.process, message)] = test.name | 
|  |  | 
|  | # process any completed futures | 
|  | for future in done: | 
|  | test = future_to_test[future] | 
|  | try: | 
|  | data = future.result() | 
|  | except Exception as exc: | 
|  | logger.error('%r generated an exception: %s' % (test, exc)) | 
|  | sys.exit('%r generated an exception: %s' % (test, exc)) | 
|  |  | 
|  | else: | 
|  | if data: | 
|  | logger.debug(data) | 
|  |  | 
|  | # remove the now completed future | 
|  | del future_to_test[future] | 
|  |  | 
|  | for future in pending: | 
|  | test = future_to_test[future] | 
|  |  | 
|  | try: | 
|  | future.result(timeout=180) | 
|  | except concurrent.futures.TimeoutError: | 
|  | logger.warning("{} stuck?".format(test)) | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  | if self.enable_size_report and not self.cmake_only: | 
|  | # Parallelize size calculation | 
|  | executor = concurrent.futures.ThreadPoolExecutor(self.jobs) | 
|  | futures = [executor.submit(calc_one_elf_size, instance) | 
|  | for instance in self.instances.values()] | 
|  | concurrent.futures.wait(futures) | 
|  | else: | 
|  | for instance in self.instances.values(): | 
|  | instance.metrics["ram_size"] = 0 | 
|  | instance.metrics["rom_size"] = 0 | 
|  | instance.metrics["handler_time"] = instance.handler.duration if instance.handler else 0 | 
|  | instance.metrics["unrecognized"] = [] | 
|  |  | 
|  | def discard_report(self, filename): | 
|  |  | 
|  | try: | 
|  | if self.discards is None: | 
|  | raise SanityRuntimeError("apply_filters() hasn't been run!") | 
|  | except Exception as e: | 
|  | logger.error(str(e)) | 
|  | sys.exit(2) | 
|  |  | 
|  | with open(filename, "wt") as csvfile: | 
|  | fieldnames = ["test", "arch", "platform", "reason"] | 
|  | cw = csv.DictWriter(csvfile, fieldnames, lineterminator=os.linesep) | 
|  | cw.writeheader() | 
|  | for instance, reason in sorted(self.discards.items()): | 
|  | rowdict = {"test": instance.testcase.name, | 
|  | "arch": instance.platform.arch, | 
|  | "platform": instance.platform.name, | 
|  | "reason": reason} | 
|  | cw.writerow(rowdict) | 
|  |  | 
|  | def target_report(self, outdir): | 
|  | run = "Sanitycheck" | 
|  | eleTestsuite = None | 
|  |  | 
|  | platforms = {inst.platform.name for _, inst in self.instances.items()} | 
|  | for platform in platforms: | 
|  | errors = 0 | 
|  | passes = 0 | 
|  | fails = 0 | 
|  | duration = 0 | 
|  | skips = 0 | 
|  | for _, instance in self.instances.items(): | 
|  | if instance.platform.name != platform: | 
|  | continue | 
|  |  | 
|  | handler_time = instance.metrics.get('handler_time', 0) | 
|  | duration += handler_time | 
|  | for k in instance.results.keys(): | 
|  | if instance.results[k] == 'PASS': | 
|  | passes += 1 | 
|  | elif instance.results[k] == 'BLOCK': | 
|  | errors += 1 | 
|  | elif instance.results[k] == 'SKIP': | 
|  | skips += 1 | 
|  | else: | 
|  | fails += 1 | 
|  |  | 
|  | eleTestsuites = ET.Element('testsuites') | 
|  | eleTestsuite = ET.SubElement(eleTestsuites, 'testsuite', | 
|  | name=run, time="%f" % duration, | 
|  | tests="%d" % (errors + passes + fails), | 
|  | failures="%d" % fails, | 
|  | errors="%d" % errors, skipped="%d" % skips) | 
|  |  | 
|  | handler_time = 0 | 
|  |  | 
|  | # print out test results | 
|  | for _, instance in self.instances.items(): | 
|  | if instance.platform.name != platform: | 
|  | continue | 
|  | handler_time = instance.metrics.get('handler_time', 0) | 
|  | for k in instance.results.keys(): | 
|  | eleTestcase = ET.SubElement( | 
|  | eleTestsuite, 'testcase', | 
|  | classname="%s:%s" % (instance.platform.name, os.path.basename(instance.testcase.name)), | 
|  | name="%s" % (k), time="%f" % handler_time) | 
|  | if instance.results[k] in ['FAIL', 'BLOCK']: | 
|  | if instance.results[k] == 'FAIL': | 
|  | el = ET.SubElement( | 
|  | eleTestcase, | 
|  | 'failure', | 
|  | type="failure", | 
|  | message="failed") | 
|  | else: | 
|  | el = ET.SubElement( | 
|  | eleTestcase, | 
|  | 'error', | 
|  | type="failure", | 
|  | message="failed") | 
|  | p = os.path.join(self.outdir, instance.platform.name, instance.testcase.name) | 
|  | log_file = os.path.join(p, "handler.log") | 
|  |  | 
|  | if os.path.exists(log_file): | 
|  | with open(log_file, "rb") as f: | 
|  | log = f.read().decode("utf-8") | 
|  | filtered_string = ''.join(filter(lambda x: x in string.printable, log)) | 
|  | el.text = filtered_string | 
|  |  | 
|  | elif instance.results[k] == 'SKIP': | 
|  | el = ET.SubElement( | 
|  | eleTestcase, | 
|  | 'skipped', | 
|  | type="skipped", | 
|  | message="Skipped") | 
|  |  | 
|  | result = ET.tostring(eleTestsuites) | 
|  | with open(os.path.join(outdir, platform + ".xml"), 'wb') as f: | 
|  | f.write(result) | 
|  |  | 
|  | def xunit_report(self, filename, append=False): | 
|  | fails = 0 | 
|  | passes = 0 | 
|  | errors = 0 | 
|  | skips = 0 | 
|  | duration = 0 | 
|  |  | 
|  | for instance in self.instances.values(): | 
|  | handler_time = instance.metrics.get('handler_time', 0) | 
|  | duration += handler_time | 
|  | if instance.status in ["failed", "timeout"]: | 
|  | if instance.reason in ['build_error', 'handler_crash']: | 
|  | errors += 1 | 
|  | else: | 
|  | fails += 1 | 
|  | elif instance.status == 'skipped': | 
|  | skips += 1 | 
|  | else: | 
|  | passes += 1 | 
|  |  | 
|  | run = "Sanitycheck" | 
|  | eleTestsuite = None | 
|  |  | 
|  | # When we re-run the tests, we re-use the results and update only with | 
|  | # the newly run tests. | 
|  | if os.path.exists(filename) and append: | 
|  | tree = ET.parse(filename) | 
|  | eleTestsuites = tree.getroot() | 
|  | eleTestsuite = tree.findall('testsuite')[0] | 
|  | else: | 
|  | eleTestsuites = ET.Element('testsuites') | 
|  | eleTestsuite = ET.SubElement(eleTestsuites, 'testsuite', | 
|  | name=run, time="%f" % duration, | 
|  | tests="%d" % (errors + passes + fails + skips), | 
|  | failures="%d" % fails, | 
|  | errors="%d" % (errors), skip="%s" % (skips)) | 
|  |  | 
|  | for instance in self.instances.values(): | 
|  |  | 
|  | # remove testcases that are a re-run | 
|  | if append: | 
|  | for tc in eleTestsuite.findall('testcase'): | 
|  | if tc.get('classname') == "%s:%s" % ( | 
|  | instance.platform.name, instance.testcase.name): | 
|  | eleTestsuite.remove(tc) | 
|  |  | 
|  | handler_time = 0 | 
|  | if instance.status != "failed" and instance.handler: | 
|  | handler_time = instance.metrics.get("handler_time", 0) | 
|  |  | 
|  |  | 
|  | eleTestcase = ET.SubElement( | 
|  | eleTestsuite, | 
|  | 'testcase', | 
|  | classname="%s:%s" % (instance.platform.name, instance.testcase.name), | 
|  | name="%s" % (instance.testcase.name), | 
|  | time="%f" % handler_time) | 
|  |  | 
|  | if instance.status in ["failed", "timeout"]: | 
|  | failure = ET.SubElement( | 
|  | eleTestcase, | 
|  | 'failure', | 
|  | type="failure", | 
|  | message=instance.reason) | 
|  | p = ("%s/%s/%s" % (self.outdir, instance.platform.name, instance.testcase.name)) | 
|  | bl = os.path.join(p, "build.log") | 
|  | hl = os.path.join(p, "handler.log") | 
|  | log_file = bl | 
|  | if instance.reason != 'Build error': | 
|  | if os.path.exists(hl): | 
|  | log_file = hl | 
|  | else: | 
|  | log_file = bl | 
|  |  | 
|  | if os.path.exists(log_file): | 
|  | with open(log_file, "rb") as f: | 
|  | log = f.read().decode("utf-8") | 
|  | filtered_string = ''.join(filter(lambda x: x in string.printable, log)) | 
|  | failure.text = filtered_string | 
|  | f.close() | 
|  | elif instance.status == "skipped": | 
|  | ET.SubElement(eleTestcase, 'skipped', type="skipped", message="Skipped") | 
|  |  | 
|  | result = ET.tostring(eleTestsuites) | 
|  | with open(filename, 'wb') as report: | 
|  | report.write(result) | 
|  |  | 
|  | def csv_report(self, filename): | 
|  | with open(filename, "wt") as csvfile: | 
|  | fieldnames = ["test", "arch", "platform", "passed", "status", | 
|  | "extra_args", "handler", "handler_time", "ram_size", | 
|  | "rom_size"] | 
|  | cw = csv.DictWriter(csvfile, fieldnames, lineterminator=os.linesep) | 
|  | cw.writeheader() | 
|  | for instance in self.instances.values(): | 
|  | rowdict = {"test": instance.testcase.name, | 
|  | "arch": instance.platform.arch, | 
|  | "platform": instance.platform.name, | 
|  | "extra_args": " ".join(instance.testcase.extra_args), | 
|  | "handler": instance.platform.simulation} | 
|  |  | 
|  | if instance.status in ["failed", "timeout"]: | 
|  | rowdict["passed"] = False | 
|  | rowdict["status"] = instance.reason | 
|  | else: | 
|  | rowdict["passed"] = True | 
|  | if instance.handler: | 
|  | rowdict["handler_time"] = instance.metrics.get("handler_time", 0) | 
|  | ram_size = instance.metrics.get("ram_size", 0) | 
|  | rom_size = instance.metrics.get("rom_size", 0) | 
|  | rowdict["ram_size"] = ram_size | 
|  | rowdict["rom_size"] = rom_size | 
|  | cw.writerow(rowdict) | 
|  |  | 
|  | def get_testcase(self, identifier): | 
|  | results = [] | 
|  | for _, tc in self.testcases.items(): | 
|  | for case in tc.cases: | 
|  | if case == identifier: | 
|  | results.append(tc) | 
|  | return results | 
|  |  | 
|  |  | 
|  | def parse_arguments(): | 
|  | parser = argparse.ArgumentParser( | 
|  | description=__doc__, | 
|  | formatter_class=argparse.RawDescriptionHelpFormatter) | 
|  | parser.fromfile_prefix_chars = "+" | 
|  |  | 
|  | case_select = parser.add_argument_group("Test case selection", | 
|  | """ | 
|  | Artificially long but functional example: | 
|  | $ ./scripts/sanitycheck -v     \\ | 
|  | --testcase-root tests/ztest/base    \\ | 
|  | --testcase-root tests/kernel   \\ | 
|  | --test      tests/ztest/base/testing.ztest.verbose_0  \\ | 
|  | --test      tests/kernel/fifo/fifo_api/kernel.fifo.poll | 
|  |  | 
|  | "kernel.fifo.poll" is one of the test section names in | 
|  | __/fifo_api/testcase.yaml | 
|  | """) | 
|  |  | 
|  | parser.add_argument("--force-toolchain", action="store_true", | 
|  | help="Do not filter based on toolchain, use the set " | 
|  | " toolchain unconditionally") | 
|  | parser.add_argument( | 
|  | "-p", "--platform", action="append", | 
|  | help="Platform filter for testing. This option may be used multiple " | 
|  | "times. Testcases will only be built/run on the platforms " | 
|  | "specified. If this option is not used, then platforms marked " | 
|  | "as default in the platform metadata file will be chosen " | 
|  | "to build and test. ") | 
|  |  | 
|  | parser.add_argument("-P", "--exclude-platform", action="append", default=[], | 
|  | help="""Exclude platforms and do not build or run any tests | 
|  | on those platforms. This option can be called multiple times. | 
|  | """ | 
|  | ) | 
|  |  | 
|  | parser.add_argument( | 
|  | "-a", "--arch", action="append", | 
|  | help="Arch filter for testing. Takes precedence over --platform. " | 
|  | "If unspecified, test all arches. Multiple invocations " | 
|  | "are treated as a logical 'or' relationship") | 
|  | parser.add_argument( | 
|  | "-t", "--tag", action="append", | 
|  | help="Specify tags to restrict which tests to run by tag value. " | 
|  | "Default is to not do any tag filtering. Multiple invocations " | 
|  | "are treated as a logical 'or' relationship") | 
|  | parser.add_argument("-e", "--exclude-tag", action="append", | 
|  | help="Specify tags of tests that should not run. " | 
|  | "Default is to run all tests with all tags.") | 
|  | case_select.add_argument( | 
|  | "-f", | 
|  | "--only-failed", | 
|  | action="store_true", | 
|  | help="Run only those tests that failed the previous sanity check " | 
|  | "invocation.") | 
|  |  | 
|  | parser.add_argument( | 
|  | "--retry-failed", type=int, default=0, | 
|  | help="Retry failing tests again, up to the number of times specified.") | 
|  |  | 
|  | test_xor_subtest = case_select.add_mutually_exclusive_group() | 
|  |  | 
|  | test_xor_subtest.add_argument( | 
|  | "-s", "--test", action="append", | 
|  | help="Run only the specified test cases. These are named by " | 
|  | "<path/relative/to/Zephyr/base/section.name.in.testcase.yaml>") | 
|  |  | 
|  | test_xor_subtest.add_argument( | 
|  | "--sub-test", action="append", | 
|  | help="""Recursively find sub-test functions and run the entire | 
|  | test section where they were found, including all sibling test | 
|  | functions. Sub-tests are named by: | 
|  | section.name.in.testcase.yaml.function_name_without_test_prefix | 
|  | Example: kernel.fifo.poll.fifo_loop | 
|  | """) | 
|  |  | 
|  | parser.add_argument( | 
|  | "-l", "--all", action="store_true", | 
|  | help="Build/test on all platforms. Any --platform arguments " | 
|  | "ignored.") | 
|  |  | 
|  | parser.add_argument( | 
|  | "-o", "--report-dir", | 
|  | help="""Output reports containing results of the test run into the | 
|  | specified directory. | 
|  | The output will be both in CSV and JUNIT format | 
|  | (sanitycheck.csv and sanitycheck.xml). | 
|  | """) | 
|  |  | 
|  | parser.add_argument( | 
|  | "--report-name", | 
|  | help="""Create a report with a custom name. | 
|  | """) | 
|  |  | 
|  | parser.add_argument("--report-excluded", | 
|  | action="store_true", | 
|  | help="""List all tests that are never run based on current scope and | 
|  | coverage. If you are looking for accurate results, run this with | 
|  | --all, but this will take a while...""") | 
|  |  | 
|  | parser.add_argument("--compare-report", | 
|  | help="Use this report file for size comparison") | 
|  |  | 
|  | parser.add_argument( | 
|  | "-B", "--subset", | 
|  | help="Only run a subset of the tests, 1/4 for running the first 25%%, " | 
|  | "3/5 means run the 3rd fifth of the total. " | 
|  | "This option is useful when running a large number of tests on " | 
|  | "different hosts to speed up execution time.") | 
|  |  | 
|  | parser.add_argument( | 
|  | "-N", "--ninja", action="store_true", | 
|  | help="Use the Ninja generator with CMake") | 
|  |  | 
|  | parser.add_argument( | 
|  | "-y", "--dry-run", action="store_true", | 
|  | help="""Create the filtered list of test cases, but don't actually | 
|  | run them. Useful if you're just interested in the discard report | 
|  | generated for every run and saved in the specified output | 
|  | directory (sanitycheck_discard.csv). | 
|  | """) | 
|  |  | 
|  | parser.add_argument("--list-tags", action="store_true", | 
|  | help="list all tags in selected tests") | 
|  |  | 
|  | case_select.add_argument("--list-tests", action="store_true", | 
|  | help="""List of all sub-test functions recursively found in | 
|  | all --testcase-root arguments. Note different sub-tests can share | 
|  | the same section name and come from different directories. | 
|  | The output is flattened and reports --sub-test names only, | 
|  | not their directories. For instance net.socket.getaddrinfo_ok | 
|  | and net.socket.fd_set belong to different directories. | 
|  | """) | 
|  |  | 
|  | case_select.add_argument("--test-tree", action="store_true", | 
|  | help="""Output the testsuite in a tree form""") | 
|  |  | 
|  | case_select.add_argument("--list-test-duplicates", action="store_true", | 
|  | help="""List tests with duplicate identifiers. | 
|  | """) | 
|  |  | 
|  | parser.add_argument("--export-tests", action="store", | 
|  | metavar="FILENAME", | 
|  | help="Export tests case meta-data to a file in CSV format.") | 
|  |  | 
|  | parser.add_argument("--timestamps", | 
|  | action="store_true", | 
|  | help="Print all messages with time stamps") | 
|  |  | 
|  | parser.add_argument( | 
|  | "-r", "--release", action="store_true", | 
|  | help="Update the benchmark database with the results of this test " | 
|  | "run. Intended to be run by CI when tagging an official " | 
|  | "release. This database is used as a basis for comparison " | 
|  | "when looking for deltas in metrics such as footprint") | 
|  | parser.add_argument("-w", "--warnings-as-errors", action="store_true", | 
|  | help="Treat warning conditions as errors") | 
|  | parser.add_argument( | 
|  | "-v", | 
|  | "--verbose", | 
|  | action="count", | 
|  | default=0, | 
|  | help="Emit debugging information, call multiple times to increase " | 
|  | "verbosity") | 
|  | parser.add_argument( | 
|  | "-i", "--inline-logs", action="store_true", | 
|  | help="Upon test failure, print relevant log data to stdout " | 
|  | "instead of just a path to it") | 
|  | parser.add_argument("--log-file", metavar="FILENAME", action="store", | 
|  | help="log also to file") | 
|  | parser.add_argument( | 
|  | "-m", "--last-metrics", action="store_true", | 
|  | help="Instead of comparing metrics from the last --release, " | 
|  | "compare with the results of the previous sanity check " | 
|  | "invocation") | 
|  | parser.add_argument( | 
|  | "-u", | 
|  | "--no-update", | 
|  | action="store_true", | 
|  | help="do not update the results of the last run of the sanity " | 
|  | "checks") | 
|  | case_select.add_argument( | 
|  | "-F", | 
|  | "--load-tests", | 
|  | metavar="FILENAME", | 
|  | action="store", | 
|  | help="Load list of tests and platforms to be run from file.") | 
|  |  | 
|  | case_select.add_argument( | 
|  | "-E", | 
|  | "--save-tests", | 
|  | metavar="FILENAME", | 
|  | action="store", | 
|  | help="Append list of tests and platforms to be run to file.") | 
|  |  | 
|  | test_or_build = parser.add_mutually_exclusive_group() | 
|  | test_or_build.add_argument( | 
|  | "-b", "--build-only", action="store_true", | 
|  | help="Only build the code, do not execute any of it in QEMU") | 
|  |  | 
|  | test_or_build.add_argument( | 
|  | "--test-only", action="store_true", | 
|  | help="""Only run device tests with current artifacts, do not build | 
|  | the code""") | 
|  | parser.add_argument( | 
|  | "--cmake-only", action="store_true", | 
|  | help="Only run cmake, do not build or run.") | 
|  |  | 
|  | parser.add_argument( | 
|  | "-M", "--runtime-artifact-cleanup", action="store_true", | 
|  | help="Delete artifacts of passing tests.") | 
|  |  | 
|  | parser.add_argument( | 
|  | "-j", "--jobs", type=int, | 
|  | help="Number of jobs for building, defaults to number of CPU threads, " | 
|  | "overcommited by factor 2 when --build-only") | 
|  |  | 
|  | parser.add_argument( | 
|  | "--show-footprint", action="store_true", | 
|  | help="Show footprint statistics and deltas since last release." | 
|  | ) | 
|  | parser.add_argument( | 
|  | "-H", "--footprint-threshold", type=float, default=5, | 
|  | help="When checking test case footprint sizes, warn the user if " | 
|  | "the new app size is greater then the specified percentage " | 
|  | "from the last release. Default is 5. 0 to warn on any " | 
|  | "increase on app size") | 
|  | parser.add_argument( | 
|  | "-D", "--all-deltas", action="store_true", | 
|  | help="Show all footprint deltas, positive or negative. Implies " | 
|  | "--footprint-threshold=0") | 
|  | parser.add_argument( | 
|  | "-O", "--outdir", | 
|  | default=os.path.join(os.getcwd(), "sanity-out"), | 
|  | help="Output directory for logs and binaries. " | 
|  | "Default is 'sanity-out' in the current directory. " | 
|  | "This directory will be cleaned unless '--no-clean' is set. " | 
|  | "The '--clobber-output' option controls what cleaning does.") | 
|  | parser.add_argument( | 
|  | "-c", "--clobber-output", action="store_true", | 
|  | help="Cleaning the output directory will simply delete it instead " | 
|  | "of the default policy of renaming.") | 
|  | parser.add_argument( | 
|  | "-n", "--no-clean", action="store_true", | 
|  | help="Re-use the outdir before building. Will result in " | 
|  | "faster compilation since builds will be incremental.") | 
|  | case_select.add_argument( | 
|  | "-T", "--testcase-root", action="append", default=[], | 
|  | help="Base directory to recursively search for test cases. All " | 
|  | "testcase.yaml files under here will be processed. May be " | 
|  | "called multiple times. Defaults to the 'samples/' and " | 
|  | "'tests/' directories at the base of the Zephyr tree.") | 
|  |  | 
|  | board_root_list = ["%s/boards" % ZEPHYR_BASE, | 
|  | "%s/scripts/sanity_chk/boards" % ZEPHYR_BASE] | 
|  |  | 
|  | parser.add_argument( | 
|  | "-A", "--board-root", action="append", default=board_root_list, | 
|  | help="""Directory to search for board configuration files. All .yaml | 
|  | files in the directory will be processed. The directory should have the same | 
|  | structure in the main Zephyr tree: boards/<arch>/<board_name>/""") | 
|  |  | 
|  | parser.add_argument( | 
|  | "-z", "--size", action="append", | 
|  | help="Don't run sanity  checks. Instead, produce a report to " | 
|  | "stdout detailing RAM/ROM sizes on the specified filenames. " | 
|  | "All other command line arguments ignored.") | 
|  | parser.add_argument( | 
|  | "-S", "--enable-slow", action="store_true", | 
|  | help="Execute time-consuming test cases that have been marked " | 
|  | "as 'slow' in testcase.yaml. Normally these are only built.") | 
|  | parser.add_argument( | 
|  | "--disable-unrecognized-section-test", action="store_true", | 
|  | default=False, | 
|  | help="Skip the 'unrecognized section' test.") | 
|  | parser.add_argument("-R", "--enable-asserts", action="store_true", | 
|  | default=True, | 
|  | help="deprecated, left for compatibility") | 
|  | parser.add_argument("--disable-asserts", action="store_false", | 
|  | dest="enable_asserts", | 
|  | help="deprecated, left for compatibility") | 
|  | parser.add_argument("-Q", "--error-on-deprecations", action="store_false", | 
|  | help="Error on deprecation warnings.") | 
|  | parser.add_argument("--enable-size-report", action="store_true", | 
|  | help="Enable expensive computation of RAM/ROM segment sizes.") | 
|  |  | 
|  | parser.add_argument( | 
|  | "-x", "--extra-args", action="append", default=[], | 
|  | help="""Extra CMake cache entries to define when building test cases. | 
|  | May be called multiple times. The key-value entries will be | 
|  | prefixed with -D before being passed to CMake. | 
|  |  | 
|  | E.g | 
|  | "sanitycheck -x=USE_CCACHE=0" | 
|  | will translate to | 
|  | "cmake -DUSE_CCACHE=0" | 
|  |  | 
|  | which will ultimately disable ccache. | 
|  | """ | 
|  | ) | 
|  |  | 
|  | parser.add_argument( | 
|  | "--device-testing", action="store_true", | 
|  | help="Test on device directly. Specify the serial device to " | 
|  | "use with the --device-serial option.") | 
|  |  | 
|  | parser.add_argument( | 
|  | "-X", "--fixture", action="append", default=[], | 
|  | help="Specify a fixture that a board might support") | 
|  | parser.add_argument( | 
|  | "--device-serial", | 
|  | help="Serial device for accessing the board (e.g., /dev/ttyACM0)") | 
|  |  | 
|  | parser.add_argument("--generate-hardware-map", | 
|  | help="""Probe serial devices connected to this platform | 
|  | and create a hardware map file to be used with | 
|  | --device-testing | 
|  | """) | 
|  |  | 
|  | parser.add_argument("--hardware-map", | 
|  | help="""Load hardware map from a file. This will be used | 
|  | for testing on hardware that is listed in the file. | 
|  | """) | 
|  |  | 
|  | parser.add_argument( | 
|  | "--west-flash", nargs='?', const=[], | 
|  | help="""Uses west instead of ninja or make to flash when running with | 
|  | --device-testing. Supports comma-separated argument list. | 
|  |  | 
|  | E.g "sanitycheck --device-testing --device-serial /dev/ttyACM0 | 
|  | --west-flash="--board-id=foobar,--erase" | 
|  | will translate to "west flash -- --board-id=foobar --erase" | 
|  |  | 
|  | NOTE: device-testing must be enabled to use this option. | 
|  | """ | 
|  | ) | 
|  | parser.add_argument( | 
|  | "--west-runner", | 
|  | help="""Uses the specified west runner instead of default when running | 
|  | with --west-flash. | 
|  |  | 
|  | E.g "sanitycheck --device-testing --device-serial /dev/ttyACM0 | 
|  | --west-flash --west-runner=pyocd" | 
|  | will translate to "west flash --runner pyocd" | 
|  |  | 
|  | NOTE: west-flash must be enabled to use this option. | 
|  | """ | 
|  | ) | 
|  |  | 
|  | valgrind_asan_group = parser.add_mutually_exclusive_group() | 
|  |  | 
|  | valgrind_asan_group.add_argument( | 
|  | "--enable-valgrind", action="store_true", | 
|  | help="""Run binary through valgrind and check for several memory access | 
|  | errors. Valgrind needs to be installed on the host. This option only | 
|  | works with host binaries such as those generated for the native_posix | 
|  | configuration and is mutual exclusive with --enable-asan. | 
|  | """) | 
|  |  | 
|  | valgrind_asan_group.add_argument( | 
|  | "--enable-asan", action="store_true", | 
|  | help="""Enable address sanitizer to check for several memory access | 
|  | errors. Libasan needs to be installed on the host. This option only | 
|  | works with host binaries such as those generated for the native_posix | 
|  | configuration and is mutual exclusive with --enable-valgrind. | 
|  | """) | 
|  |  | 
|  | parser.add_argument( | 
|  | "--enable-lsan", action="store_true", | 
|  | help="""Enable leak sanitizer to check for heap memory leaks. | 
|  | Libasan needs to be installed on the host. This option only | 
|  | works with host binaries such as those generated for the native_posix | 
|  | configuration and when --enable-asan is given. | 
|  | """) | 
|  |  | 
|  | parser.add_argument("--enable-coverage", action="store_true", | 
|  | help="Enable code coverage using gcov.") | 
|  |  | 
|  | parser.add_argument("-C", "--coverage", action="store_true", | 
|  | help="Generate coverage reports. Implies " | 
|  | "--enable_coverage.") | 
|  |  | 
|  | parser.add_argument("--coverage-platform", action="append", default=[], | 
|  | help="Plarforms to run coverage reports on. " | 
|  | "This option may be used multiple times. " | 
|  | "Default to what was selected with --platform.") | 
|  |  | 
|  | parser.add_argument("--gcov-tool", default=None, | 
|  | help="Path to the gcov tool to use for code coverage " | 
|  | "reports") | 
|  |  | 
|  | parser.add_argument("--coverage-tool", choices=['lcov', 'gcovr'], default='lcov', | 
|  | help="Tool to use to generate coverage report.") | 
|  |  | 
|  | return parser.parse_args() | 
|  |  | 
|  |  | 
|  | def size_report(sc): | 
|  | logger.info(sc.filename) | 
|  | logger.info("SECTION NAME             VMA        LMA     SIZE  HEX SZ TYPE") | 
|  | for i in range(len(sc.sections)): | 
|  | v = sc.sections[i] | 
|  |  | 
|  | logger.info("%-17s 0x%08x 0x%08x %8d 0x%05x %-7s" % | 
|  | (v["name"], v["virt_addr"], v["load_addr"], v["size"], v["size"], | 
|  | v["type"])) | 
|  |  | 
|  | logger.info("Totals: %d bytes (ROM), %d bytes (RAM)" % | 
|  | (sc.rom_size, sc.ram_size)) | 
|  | logger.info("") | 
|  |  | 
|  |  | 
|  | class CoverageTool: | 
|  | """ Base class for every supported coverage tool | 
|  | """ | 
|  |  | 
|  | def __init__(self): | 
|  | self.gcov_tool = options.gcov_tool | 
|  |  | 
|  | @staticmethod | 
|  | def factory(tool): | 
|  | if tool == 'lcov': | 
|  | return Lcov() | 
|  | if tool == 'gcovr': | 
|  | return Gcovr() | 
|  | logger.error("Unsupported coverage tool specified: {}".format(tool)) | 
|  |  | 
|  | @staticmethod | 
|  | def retrieve_gcov_data(intput_file): | 
|  | if VERBOSE: | 
|  | logger.debug("Working on %s" % intput_file) | 
|  | extracted_coverage_info = {} | 
|  | capture_data = False | 
|  | capture_complete = False | 
|  | with open(intput_file, 'r') as fp: | 
|  | for line in fp.readlines(): | 
|  | if re.search("GCOV_COVERAGE_DUMP_START", line): | 
|  | capture_data = True | 
|  | continue | 
|  | if re.search("GCOV_COVERAGE_DUMP_END", line): | 
|  | capture_complete = True | 
|  | break | 
|  | # Loop until the coverage data is found. | 
|  | if not capture_data: | 
|  | continue | 
|  | if line.startswith("*"): | 
|  | sp = line.split("<") | 
|  | if len(sp) > 1: | 
|  | # Remove the leading delimiter "*" | 
|  | file_name = sp[0][1:] | 
|  | # Remove the trailing new line char | 
|  | hex_dump = sp[1][:-1] | 
|  | else: | 
|  | continue | 
|  | else: | 
|  | continue | 
|  | extracted_coverage_info.update({file_name: hex_dump}) | 
|  | if not capture_data: | 
|  | capture_complete = True | 
|  | return {'complete': capture_complete, 'data': extracted_coverage_info} | 
|  |  | 
|  | @staticmethod | 
|  | def create_gcda_files(extracted_coverage_info): | 
|  | if VERBOSE: | 
|  | logger.debug("Generating gcda files") | 
|  | for filename, hexdump_val in extracted_coverage_info.items(): | 
|  | # if kobject_hash is given for coverage gcovr fails | 
|  | # hence skipping it problem only in gcovr v4.1 | 
|  | if "kobject_hash" in filename: | 
|  | filename = (filename[:-4]) + "gcno" | 
|  | try: | 
|  | os.remove(filename) | 
|  | except Exception: | 
|  | pass | 
|  | continue | 
|  |  | 
|  | with open(filename, 'wb') as fp: | 
|  | fp.write(bytes.fromhex(hexdump_val)) | 
|  |  | 
|  | def generate(self, outdir): | 
|  | for filename in glob.glob("%s/**/handler.log" % outdir, recursive=True): | 
|  | gcov_data = self.__class__.retrieve_gcov_data(filename) | 
|  | capture_complete = gcov_data['complete'] | 
|  | extracted_coverage_info = gcov_data['data'] | 
|  | if capture_complete: | 
|  | self.__class__.create_gcda_files(extracted_coverage_info) | 
|  | logger.debug("Gcov data captured: {}".format(filename)) | 
|  | else: | 
|  | logger.error("Gcov data capture incomplete: {}".format(filename)) | 
|  |  | 
|  | with open(os.path.join(outdir, "coverage.log"), "a") as coveragelog: | 
|  | ret = self._generate(outdir, coveragelog) | 
|  | if ret == 0: | 
|  | logger.info("HTML report generated: {}".format( | 
|  | os.path.join(outdir, "coverage", "index.html"))) | 
|  |  | 
|  |  | 
|  | class Lcov(CoverageTool): | 
|  |  | 
|  | def __init__(self): | 
|  | super().__init__() | 
|  | self.ignores = [] | 
|  |  | 
|  | def add_ignore_file(self, pattern): | 
|  | self.ignores.append('*' + pattern + '*') | 
|  |  | 
|  | def add_ignore_directory(self, pattern): | 
|  | self.ignores.append(pattern + '/*') | 
|  |  | 
|  | def _generate(self, outdir, coveragelog): | 
|  | coveragefile = os.path.join(outdir, "coverage.info") | 
|  | ztestfile = os.path.join(outdir, "ztest.info") | 
|  | subprocess.call(["lcov", "--gcov-tool", self.gcov_tool, | 
|  | "--capture", "--directory", outdir, | 
|  | "--rc", "lcov_branch_coverage=1", | 
|  | "--output-file", coveragefile], stdout=coveragelog) | 
|  | # We want to remove tests/* and tests/ztest/test/* but save tests/ztest | 
|  | subprocess.call(["lcov", "--gcov-tool", self.gcov_tool, "--extract", | 
|  | coveragefile, | 
|  | os.path.join(ZEPHYR_BASE, "tests", "ztest", "*"), | 
|  | "--output-file", ztestfile, | 
|  | "--rc", "lcov_branch_coverage=1"], stdout=coveragelog) | 
|  |  | 
|  | if os.path.exists(ztestfile) and os.path.getsize(ztestfile) > 0: | 
|  | subprocess.call(["lcov", "--gcov-tool", self.gcov_tool, "--remove", | 
|  | ztestfile, | 
|  | os.path.join(ZEPHYR_BASE, "tests/ztest/test/*"), | 
|  | "--output-file", ztestfile, | 
|  | "--rc", "lcov_branch_coverage=1"], | 
|  | stdout=coveragelog) | 
|  | files = [coveragefile, ztestfile] | 
|  | else: | 
|  | files = [coveragefile] | 
|  |  | 
|  | for i in self.ignores: | 
|  | subprocess.call( | 
|  | ["lcov", "--gcov-tool", self.gcov_tool, "--remove", | 
|  | coveragefile, i, "--output-file", | 
|  | coveragefile, "--rc", "lcov_branch_coverage=1"], | 
|  | stdout=coveragelog) | 
|  |  | 
|  | # The --ignore-errors source option is added to avoid it exiting due to | 
|  | # samples/application_development/external_lib/ | 
|  | return subprocess.call(["genhtml", "--legend", "--branch-coverage", | 
|  | "--ignore-errors", "source", | 
|  | "-output-directory", | 
|  | os.path.join(outdir, "coverage")] + files, | 
|  | stdout=coveragelog) | 
|  |  | 
|  |  | 
|  | class Gcovr(CoverageTool): | 
|  |  | 
|  | def __init__(self): | 
|  | super().__init__() | 
|  | self.ignores = [] | 
|  |  | 
|  | def add_ignore_file(self, pattern): | 
|  | self.ignores.append('.*' + pattern + '.*') | 
|  |  | 
|  | def add_ignore_directory(self, pattern): | 
|  | self.ignores.append(pattern + '/.*') | 
|  |  | 
|  | @staticmethod | 
|  | def _interleave_list(prefix, list): | 
|  | tuple_list = [(prefix, item) for item in list] | 
|  | return [item for sublist in tuple_list for item in sublist] | 
|  |  | 
|  | def _generate(self, outdir, coveragelog): | 
|  | coveragefile = os.path.join(outdir, "coverage.json") | 
|  | ztestfile = os.path.join(outdir, "ztest.json") | 
|  |  | 
|  | excludes = Gcovr._interleave_list("-e", self.ignores) | 
|  |  | 
|  | # We want to remove tests/* and tests/ztest/test/* but save tests/ztest | 
|  | subprocess.call(["gcovr", "-r", ZEPHYR_BASE, "--gcov-executable", | 
|  | self.gcov_tool, "-e", "tests/*"] + excludes + | 
|  | ["--json", "-o", coveragefile, outdir], | 
|  | stdout=coveragelog) | 
|  |  | 
|  | subprocess.call(["gcovr", "-r", ZEPHYR_BASE, "--gcov-executable", | 
|  | self.gcov_tool, "-f", "tests/ztest", "-e", | 
|  | "tests/ztest/test/*", "--json", "-o", ztestfile, | 
|  | outdir], stdout=coveragelog) | 
|  |  | 
|  | if os.path.exists(ztestfile) and os.path.getsize(ztestfile) > 0: | 
|  | files = [coveragefile, ztestfile] | 
|  | else: | 
|  | files = [coveragefile] | 
|  |  | 
|  | subdir = os.path.join(outdir, "coverage") | 
|  | os.makedirs(subdir, exist_ok=True) | 
|  |  | 
|  | tracefiles = self._interleave_list("--add-tracefile", files) | 
|  |  | 
|  | return subprocess.call(["gcovr", "-r", ZEPHYR_BASE, "--html", | 
|  | "--html-details"] + tracefiles + | 
|  | ["-o", os.path.join(subdir, "index.html")], | 
|  | stdout=coveragelog) | 
|  |  | 
|  |  | 
|  | def get_generator(): | 
|  | if options.ninja: | 
|  | generator_cmd = "ninja" | 
|  | generator = "Ninja" | 
|  | else: | 
|  | generator_cmd = "make" | 
|  | generator = "Unix Makefiles" | 
|  | return generator_cmd, generator | 
|  |  | 
|  |  | 
|  | def export_tests(filename, tests): | 
|  | with open(filename, "wt") as csvfile: | 
|  | fieldnames = ['section', 'subsection', 'title', 'reference'] | 
|  | cw = csv.DictWriter(csvfile, fieldnames, lineterminator=os.linesep) | 
|  | for test in tests: | 
|  | data = test.split(".") | 
|  | if len(data) > 1: | 
|  | subsec = " ".join(data[1].split("_")).title() | 
|  | rowdict = { | 
|  | "section": data[0].capitalize(), | 
|  | "subsection": subsec, | 
|  | "title": test, | 
|  | "reference": test | 
|  | } | 
|  | cw.writerow(rowdict) | 
|  | else: | 
|  | logger.info("{} can't be exported".format(test)) | 
|  |  | 
|  |  | 
|  | class HardwareMap: | 
|  |  | 
|  | schema_path = os.path.join(ZEPHYR_BASE, "scripts", "sanity_chk", "hwmap-schema.yaml") | 
|  |  | 
|  | manufacturer = [ | 
|  | 'ARM', | 
|  | 'SEGGER', | 
|  | 'MBED', | 
|  | 'STMicroelectronics', | 
|  | 'Atmel Corp.', | 
|  | 'Texas Instruments', | 
|  | 'Silicon Labs', | 
|  | 'NXP Semiconductors', | 
|  | 'Microchip Technology Inc.', | 
|  | 'FTDI', | 
|  | 'Digilent' | 
|  | ] | 
|  |  | 
|  | runner_mapping = { | 
|  | 'pyocd': [ | 
|  | 'DAPLink CMSIS-DAP', | 
|  | 'MBED CMSIS-DAP' | 
|  | ], | 
|  | 'jlink': [ | 
|  | 'J-Link', | 
|  | 'J-Link OB' | 
|  | ], | 
|  | 'openocd': [ | 
|  | 'STM32 STLink', '^XDS110.*' | 
|  | ], | 
|  | 'dediprog': [ | 
|  | 'TTL232R-3V3', | 
|  | 'MCP2200 USB Serial Port Emulator' | 
|  | ] | 
|  | } | 
|  |  | 
|  | def __init__(self): | 
|  | self.detected = [] | 
|  | self.connected_hardware = [] | 
|  |  | 
|  | def load_device_from_cmdline(self, serial, platform): | 
|  | device = { | 
|  | "serial": serial, | 
|  | "platform": platform, | 
|  | "counter": 0, | 
|  | "available": True, | 
|  | "connected": True | 
|  | } | 
|  | self.connected_hardware.append(device) | 
|  |  | 
|  | def load_hardware_map(self, map_file): | 
|  | hwm_schema = scl.yaml_load(self.schema_path) | 
|  | self.connected_hardware = scl.yaml_load_verify(map_file, hwm_schema) | 
|  | for i in self.connected_hardware: | 
|  | i['counter'] = 0 | 
|  |  | 
|  | def scan_hw(self): | 
|  | from serial.tools import list_ports | 
|  |  | 
|  | serial_devices = list_ports.comports() | 
|  | logger.info("Scanning connected hardware...") | 
|  | for d in serial_devices: | 
|  | if d.manufacturer in self.manufacturer: | 
|  |  | 
|  | # TI XDS110 can have multiple serial devices for a single board | 
|  | # assume endpoint 0 is the serial, skip all others | 
|  | if d.manufacturer == 'Texas Instruments' and not d.location.endswith('0'): | 
|  | continue | 
|  | s_dev = {} | 
|  | s_dev['platform'] = "unknown" | 
|  | s_dev['id'] = d.serial_number | 
|  | s_dev['serial'] = d.device | 
|  | s_dev['product'] = d.product | 
|  | s_dev['runner'] = 'unknown' | 
|  | for runner, _ in self.runner_mapping.items(): | 
|  | products = self.runner_mapping.get(runner) | 
|  | if d.product in products: | 
|  | s_dev['runner'] = runner | 
|  | continue | 
|  | # Try regex matching | 
|  | for p in products: | 
|  | if re.match(p, d.product): | 
|  | s_dev['runner'] = runner | 
|  |  | 
|  | s_dev['available'] = True | 
|  | s_dev['connected'] = True | 
|  | self.detected.append(s_dev) | 
|  | else: | 
|  | logger.warning("Unsupported device (%s): %s" % (d.manufacturer, d)) | 
|  |  | 
|  | def write_map(self, hwm_file): | 
|  | # use existing map | 
|  | if os.path.exists(hwm_file): | 
|  | with open(hwm_file, 'r') as yaml_file: | 
|  | hwm = yaml.load(yaml_file, Loader=yaml.FullLoader) | 
|  | # disconnect everything | 
|  | for h in hwm: | 
|  | h['connected'] = False | 
|  | h['serial'] = None | 
|  |  | 
|  | for d in self.detected: | 
|  | for h in hwm: | 
|  | if d['id'] == h['id'] and d['product'] == h['product']: | 
|  | h['connected'] = True | 
|  | h['serial'] = d['serial'] | 
|  | d['match'] = True | 
|  |  | 
|  | new = list(filter(lambda n: not n.get('match', False), self.detected)) | 
|  | hwm = hwm + new | 
|  |  | 
|  | logger.info("Registered devices:") | 
|  | self.dump(hwm) | 
|  |  | 
|  | with open(hwm_file, 'w') as yaml_file: | 
|  | yaml.dump(hwm, yaml_file, default_flow_style=False) | 
|  |  | 
|  | else: | 
|  | # create new file | 
|  | with open(hwm_file, 'w') as yaml_file: | 
|  | yaml.dump(self.detected, yaml_file, default_flow_style=False) | 
|  | logger.info("Detected devices:") | 
|  | self.dump(self.detected) | 
|  |  | 
|  | @staticmethod | 
|  | def dump(hwmap=[], filtered=[], header=[], connected_only=False): | 
|  | print("") | 
|  | table = [] | 
|  | if not header: | 
|  | header = ["Platform", "ID", "Serial device"] | 
|  | for p in sorted(hwmap, key=lambda i: i['platform']): | 
|  | platform = p.get('platform') | 
|  | connected = p.get('connected', False) | 
|  | if filtered and platform not in filtered: | 
|  | continue | 
|  |  | 
|  | if not connected_only or connected: | 
|  | table.append([platform, p.get('id', None), p.get('serial')]) | 
|  |  | 
|  | print(tabulate(table, headers=header, tablefmt="github")) | 
|  |  | 
|  | options = None | 
|  |  | 
|  |  | 
|  | def main(): | 
|  | start_time = time.time() | 
|  | global VERBOSE | 
|  | global options | 
|  |  | 
|  | options = parse_arguments() | 
|  |  | 
|  | # Cleanup | 
|  | if options.no_clean or options.only_failed or options.test_only: | 
|  | if os.path.exists(options.outdir): | 
|  | print("Keeping artifacts untouched") | 
|  | elif os.path.exists(options.outdir): | 
|  | if options.clobber_output: | 
|  | print("Deleting output directory {}".format(options.outdir)) | 
|  | shutil.rmtree(options.outdir) | 
|  | else: | 
|  | for i in range(1, 100): | 
|  | new_out = options.outdir + ".{}".format(i) | 
|  | if not os.path.exists(new_out): | 
|  | print("Renaming output directory to {}".format(new_out)) | 
|  | shutil.move(options.outdir, new_out) | 
|  | break | 
|  |  | 
|  | os.makedirs(options.outdir, exist_ok=True) | 
|  |  | 
|  | # create file handler which logs even debug messages | 
|  | if options.log_file: | 
|  | fh = logging.FileHandler(options.log_file) | 
|  | else: | 
|  | fh = logging.FileHandler(os.path.join(options.outdir, "sanitycheck.log")) | 
|  |  | 
|  | fh.setLevel(logging.DEBUG) | 
|  |  | 
|  | # create console handler with a higher log level | 
|  | ch = logging.StreamHandler() | 
|  |  | 
|  | VERBOSE += options.verbose | 
|  | if VERBOSE > 1: | 
|  | ch.setLevel(logging.DEBUG) | 
|  | else: | 
|  | ch.setLevel(logging.INFO) | 
|  |  | 
|  | # create formatter and add it to the handlers | 
|  | if options.timestamps: | 
|  | formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') | 
|  | else: | 
|  | formatter = logging.Formatter('%(levelname)-7s - %(message)s') | 
|  |  | 
|  | formatter_file = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') | 
|  | ch.setFormatter(formatter) | 
|  | fh.setFormatter(formatter_file) | 
|  |  | 
|  | # add the handlers to logger | 
|  | logger.addHandler(ch) | 
|  | logger.addHandler(fh) | 
|  |  | 
|  | hwm = HardwareMap() | 
|  | if options.generate_hardware_map: | 
|  | hwm.scan_hw() | 
|  | hwm.write_map(options.generate_hardware_map) | 
|  | return | 
|  |  | 
|  | if not options.device_testing and options.hardware_map: | 
|  | hwm.load_hardware_map(options.hardware_map) | 
|  |  | 
|  | logger.info("Available devices:") | 
|  | table = [] | 
|  | hwm.dump(hwmap=hwm.connected_hardware, connected_only=True) | 
|  | return | 
|  |  | 
|  | if options.west_runner and not options.west_flash: | 
|  | logger.error("west-runner requires west-flash to be enabled") | 
|  | sys.exit(1) | 
|  |  | 
|  | if options.west_flash and not options.device_testing: | 
|  | logger.error("west-flash requires device-testing to be enabled") | 
|  | sys.exit(1) | 
|  |  | 
|  | if options.coverage: | 
|  | options.enable_coverage = True | 
|  |  | 
|  | if not options.coverage_platform: | 
|  | options.coverage_platform = options.platform | 
|  |  | 
|  | if options.size: | 
|  | for fn in options.size: | 
|  | size_report(SizeCalculator(fn, [])) | 
|  | sys.exit(0) | 
|  |  | 
|  | if options.subset: | 
|  | subset, sets = options.subset.split("/") | 
|  | if int(subset) > 0 and int(sets) >= int(subset): | 
|  | logger.info("Running only a subset: %s/%s" % (subset, sets)) | 
|  | else: | 
|  | logger.error("You have provided a wrong subset value: %s." % options.subset) | 
|  | return | 
|  |  | 
|  | if not options.testcase_root: | 
|  | options.testcase_root = [os.path.join(ZEPHYR_BASE, "tests"), | 
|  | os.path.join(ZEPHYR_BASE, "samples")] | 
|  |  | 
|  | if options.show_footprint or options.compare_report or options.release: | 
|  | options.enable_size_report = True | 
|  |  | 
|  | suite = TestSuite(options.board_root, options.testcase_root, options.outdir) | 
|  |  | 
|  | # Set testsuite options from command line. | 
|  | suite.build_only = options.build_only | 
|  | suite.cmake_only = options.cmake_only | 
|  | suite.cleanup = options.runtime_artifact_cleanup | 
|  | suite.test_only = options.test_only | 
|  | suite.enable_slow = options.enable_slow | 
|  | suite.device_testing = options.device_testing | 
|  | suite.fixture = options.fixture | 
|  | suite.enable_asan = options.enable_asan | 
|  | suite.enable_lsan = options.enable_lsan | 
|  | suite.enable_coverage = options.enable_coverage | 
|  | suite.enable_valgrind = options.enable_valgrind | 
|  | suite.coverage_platform = options.coverage_platform | 
|  | suite.inline_logs = options.inline_logs | 
|  | suite.enable_size_report = options.enable_size_report | 
|  | suite.extra_args = options.extra_args | 
|  |  | 
|  | # Set number of jobs | 
|  | if options.jobs: | 
|  | suite.jobs = options.jobs | 
|  | elif options.build_only: | 
|  | suite.jobs = multiprocessing.cpu_count() * 2 | 
|  | else: | 
|  | suite.jobs = multiprocessing.cpu_count() | 
|  | logger.info("JOBS: %d" % suite.jobs) | 
|  |  | 
|  | suite.add_testcases() | 
|  | suite.add_configurations() | 
|  |  | 
|  | if options.device_testing: | 
|  | if options.hardware_map: | 
|  | hwm.load_hardware_map(options.hardware_map) | 
|  | suite.connected_hardware = hwm.connected_hardware | 
|  | if not options.platform: | 
|  | options.platform = [] | 
|  | for platform in hwm.connected_hardware: | 
|  | if platform['connected']: | 
|  | options.platform.append(platform['platform']) | 
|  |  | 
|  | elif options.device_serial:  # back-ward compatibility | 
|  | if options.platform and len(options.platform) == 1: | 
|  | hwm.load_device_from_cmdline(options.device_serial, options.platform[0]) | 
|  | suite.connected_hardware = hwm.connected_hardware | 
|  | else: | 
|  | logger.error("""When --device-testing is used with --device-serial, only one | 
|  | platform is allowed""") | 
|  |  | 
|  | if suite.load_errors: | 
|  | sys.exit(1) | 
|  |  | 
|  | if options.list_tags: | 
|  | tags = set() | 
|  | for _, tc in suite.testcases.items(): | 
|  | tags = tags.union(tc.tags) | 
|  |  | 
|  | for t in tags: | 
|  | print("- {}".format(t)) | 
|  |  | 
|  | return | 
|  |  | 
|  | if options.export_tests: | 
|  | cnt = 0 | 
|  | tests = suite.get_all_tests() | 
|  | export_tests(options.export_tests, tests) | 
|  | return | 
|  |  | 
|  | run_individual_tests = [] | 
|  |  | 
|  | if options.test: | 
|  | run_individual_tests = options.test | 
|  |  | 
|  | if options.list_tests or options.test_tree or options.list_test_duplicates or options.sub_test: | 
|  | cnt = 0 | 
|  | all_tests = suite.get_all_tests() | 
|  |  | 
|  | if options.list_test_duplicates: | 
|  | import collections | 
|  | dupes = [item for item, count in collections.Counter(all_tests).items() if count > 1] | 
|  | if dupes: | 
|  | print("Tests with duplicate identifiers:") | 
|  | for dupe in dupes: | 
|  | print("- {}".format(dupe)) | 
|  | for dc in suite.get_testcase(dupe): | 
|  | print("  - {}".format(dc)) | 
|  | else: | 
|  | print("No duplicates found.") | 
|  | return | 
|  |  | 
|  | if options.sub_test: | 
|  | for st in options.sub_test: | 
|  | subtests = suite.get_testcase(st) | 
|  | for sti in subtests: | 
|  | run_individual_tests.append(sti.name) | 
|  |  | 
|  | if run_individual_tests: | 
|  | logger.info("Running the following tests:") | 
|  | for test in run_individual_tests: | 
|  | print(" - {}".format(test)) | 
|  | else: | 
|  | logger.info("Tests not found") | 
|  | return | 
|  |  | 
|  | elif options.list_tests or options.test_tree: | 
|  | if options.test_tree: | 
|  | testsuite = Node("Testsuite") | 
|  | samples = Node("Samples", parent=testsuite) | 
|  | tests = Node("Tests", parent=testsuite) | 
|  |  | 
|  | for test in sorted(all_tests): | 
|  | cnt = cnt + 1 | 
|  | if options.list_tests: | 
|  | print(" - {}".format(test)) | 
|  |  | 
|  | if options.test_tree: | 
|  | if test.startswith("sample."): | 
|  | sec = test.split(".") | 
|  | area = find(samples, lambda node: node.name == sec[1] and node.parent == samples) | 
|  | if not area: | 
|  | area = Node(sec[1], parent=samples) | 
|  |  | 
|  | t = Node(test, parent=area) | 
|  | else: | 
|  | sec = test.split(".") | 
|  | area = find(tests, lambda node: node.name == sec[0] and node.parent == tests) | 
|  | if not area: | 
|  | area = Node(sec[0], parent=tests) | 
|  |  | 
|  | if area and len(sec) > 2: | 
|  | subarea = find(area, lambda node: node.name == sec[1] and node.parent == area) | 
|  | if not subarea: | 
|  | subarea = Node(sec[1], parent=area) | 
|  |  | 
|  | t = Node(test, parent=subarea) | 
|  |  | 
|  | if options.list_tests: | 
|  | print("{} total.".format(cnt)) | 
|  |  | 
|  | if options.test_tree: | 
|  | for pre, _, node in RenderTree(testsuite): | 
|  | print("%s%s" % (pre, node.name)) | 
|  |  | 
|  | return | 
|  |  | 
|  | discards = [] | 
|  |  | 
|  | if options.only_failed: | 
|  | suite.get_last_failed() | 
|  | suite.selected_platforms = set(p.platform.name for p in suite.instances.values()) | 
|  | elif options.load_tests: | 
|  | suite.load_from_file(options.load_tests) | 
|  | elif options.test_only: | 
|  | last_run = os.path.join(options.outdir, "sanitycheck.csv") | 
|  | suite.load_from_file(last_run) | 
|  | else: | 
|  | discards = suite.apply_filters( | 
|  | build_only=options.build_only, | 
|  | enable_slow=options.enable_slow, | 
|  | platform=options.platform, | 
|  | exclude_platform=options.exclude_platform, | 
|  | arch=options.arch, | 
|  | tag=options.tag, | 
|  | exclude_tag=options.exclude_tag, | 
|  | force_toolchain=options.force_toolchain, | 
|  | all=options.all, | 
|  | run_individual_tests=run_individual_tests, | 
|  | device_testing=options.device_testing | 
|  |  | 
|  | ) | 
|  |  | 
|  | if VERBOSE > 1 and discards: | 
|  | # if we are using command line platform filter, no need to list every | 
|  | # other platform as excluded, we know that already. | 
|  | # Show only the discards that apply to the selected platforms on the | 
|  | # command line | 
|  |  | 
|  | for i, reason in discards.items(): | 
|  | if options.platform and i.platform.name not in options.platform: | 
|  | continue | 
|  | logger.debug( | 
|  | "{:<25} {:<50} {}SKIPPED{}: {}".format( | 
|  | i.platform.name, | 
|  | i.testcase.name, | 
|  | Fore.YELLOW, | 
|  | Fore.RESET, | 
|  | reason)) | 
|  |  | 
|  | if options.report_excluded: | 
|  | all_tests = suite.get_all_tests() | 
|  | to_be_run = set() | 
|  | for i, p in suite.instances.items(): | 
|  | to_be_run.update(p.testcase.cases) | 
|  |  | 
|  | if all_tests - to_be_run: | 
|  | print("Tests that never build or run:") | 
|  | for not_run in all_tests - to_be_run: | 
|  | print("- {}".format(not_run)) | 
|  |  | 
|  | return | 
|  |  | 
|  | if options.subset: | 
|  | suite.instances = OrderedDict(sorted(suite.instances.items(), | 
|  | key=lambda x: x[0][x[0].find("/") + 1:])) | 
|  |  | 
|  | subset, sets = options.subset.split("/") | 
|  | total = len(suite.instances) | 
|  | per_set = round(total / int(sets)) | 
|  | start = (int(subset) - 1) * per_set | 
|  | if subset == sets: | 
|  | end = total | 
|  | else: | 
|  | end = start + per_set | 
|  |  | 
|  | sliced_instances = islice(suite.instances.items(), start, end) | 
|  | suite.instances = OrderedDict(sliced_instances) | 
|  |  | 
|  | if options.save_tests: | 
|  | suite.csv_report(options.save_tests) | 
|  | return | 
|  |  | 
|  | logger.info("%d test configurations selected, %d configurations discarded due to filters." % | 
|  | (len(suite.instances), len(discards))) | 
|  |  | 
|  | if options.device_testing: | 
|  | print("\nDevice testing on:") | 
|  | hwm.dump(suite.connected_hardware, suite.selected_platforms) | 
|  | print("") | 
|  |  | 
|  | if options.dry_run: | 
|  | duration = time.time() - start_time | 
|  | logger.info("Completed in %d seconds" % (duration)) | 
|  | return | 
|  |  | 
|  | retries = options.retry_failed + 1 | 
|  | completed = 0 | 
|  |  | 
|  | suite.update() | 
|  | suite.start_time = start_time | 
|  |  | 
|  | while True: | 
|  | completed += 1 | 
|  |  | 
|  | if completed > 1: | 
|  | logger.info("%d Iteration:" % (completed)) | 
|  | time.sleep(60)  # waiting for the system to settle down | 
|  | suite.total_done = suite.total_tests - suite.total_failed | 
|  | suite.total_failed = 0 | 
|  |  | 
|  | suite.execute() | 
|  | print("") | 
|  |  | 
|  | retries = retries - 1 | 
|  | if retries == 0 or suite.total_failed == 0: | 
|  | break | 
|  |  | 
|  | suite.misc_reports(options.compare_report, options.show_footprint, | 
|  | options.all_deltas, options.footprint_threshold, options.last_metrics) | 
|  |  | 
|  | suite.duration = time.time() - start_time | 
|  | suite.summary(options.disable_unrecognized_section_test) | 
|  |  | 
|  | if options.coverage: | 
|  | if options.gcov_tool is None: | 
|  | use_system_gcov = False | 
|  |  | 
|  | for plat in options.coverage_platform: | 
|  | ts_plat = suite.get_platform(plat) | 
|  | if ts_plat and (ts_plat.type in {"native", "unit"}): | 
|  | use_system_gcov = True | 
|  |  | 
|  | if use_system_gcov or "ZEPHYR_SDK_INSTALL_DIR" not in os.environ: | 
|  | options.gcov_tool = "gcov" | 
|  | else: | 
|  | options.gcov_tool = os.path.join(os.environ["ZEPHYR_SDK_INSTALL_DIR"], | 
|  | "i586-zephyr-elf/bin/i586-zephyr-elf-gcov") | 
|  |  | 
|  | logger.info("Generating coverage files...") | 
|  | coverage_tool = CoverageTool.factory(options.coverage_tool) | 
|  | coverage_tool.add_ignore_file('generated') | 
|  | coverage_tool.add_ignore_directory('tests') | 
|  | coverage_tool.add_ignore_directory('samples') | 
|  | coverage_tool.generate(options.outdir) | 
|  |  | 
|  | if options.device_testing: | 
|  | print("\nHardware distribution summary:\n") | 
|  | table = [] | 
|  | header = ['Board', 'ID', 'Counter'] | 
|  | for p in hwm.connected_hardware: | 
|  | if p['connected'] and p['platform'] in suite.selected_platforms: | 
|  | row = [p['platform'], p.get('id', None), p['counter']] | 
|  | table.append(row) | 
|  | print(tabulate(table, headers=header, tablefmt="github")) | 
|  |  | 
|  | suite.save_reports(options.report_name, | 
|  | options.report_dir, | 
|  | options.no_update, | 
|  | options.release, | 
|  | options.only_failed) | 
|  |  | 
|  | if suite.total_failed or (suite.warnings and options.warnings_as_errors): | 
|  | sys.exit(1) | 
|  |  | 
|  |  | 
|  | if __name__ == "__main__": | 
|  | try: | 
|  | main() | 
|  | finally: | 
|  | if os.isatty(1): # stdout is interactive | 
|  | os.system("stty sane") |