| #!/usr/bin/env python3 |
| # vim: set syntax=python ts=4 : |
| # SPDX-License-Identifier: Apache-2.0 |
| """Zephyr Sanity Tests |
| |
| Also check the "User and Developer Guides" at https://docs.zephyrproject.org/ |
| |
| This script scans for the set of unit test applications in the git |
| repository and attempts to execute them. By default, it tries to |
| build each test case on one platform per architecture, using a precedence |
| list defined in an architecture configuration file, and if possible |
| run the tests in any available emulators or simulators on the system. |
| |
| Test cases are detected by the presence of a 'testcase.yaml' or a sample.yaml |
| files in the application's project directory. This file may contain one or more |
| blocks, each identifying a test scenario. The title of the block is a name for |
| the test case, which only needs to be unique for the test cases specified in |
| that testcase meta-data. The full canonical name for each test case is <path to |
| test case>/<block>. |
| |
| Each test block in the testcase meta data can define the following key/value |
| pairs: |
| |
| tags: <list of tags> (required) |
| A set of string tags for the testcase. Usually pertains to |
| functional domains but can be anything. Command line invocations |
| of this script can filter the set of tests to run based on tag. |
| |
| skip: <True|False> (default False) |
| skip testcase unconditionally. This can be used for broken tests. |
| |
| slow: <True|False> (default False) |
| Don't build or run this test case unless --enable-slow was passed |
| in on the command line. Intended for time-consuming test cases |
| that are only run under certain circumstances, like daily |
| builds. |
| |
| extra_args: <list of extra arguments> |
| Extra cache entries to pass to CMake when building or running the |
| test case. |
| |
| extra_configs: <list of extra configurations> |
| Extra configuration options to be merged with a master prj.conf |
| when building or running the test case. |
| |
| build_only: <True|False> (default False) |
| If true, don't try to run the test even if the selected platform |
| supports it. |
| |
| build_on_all: <True|False> (default False) |
| If true, attempt to build test on all available platforms. |
| |
| depends_on: <list of features> |
| A board or platform can announce what features it supports, this option |
| will enable the test only those platforms that provide this feature. |
| |
| min_ram: <integer> |
| minimum amount of RAM needed for this test to build and run. This is |
| compared with information provided by the board metadata. |
| |
| min_flash: <integer> |
| minimum amount of ROM needed for this test to build and run. This is |
| compared with information provided by the board metadata. |
| |
| timeout: <number of seconds> |
| Length of time to run test in emulator before automatically killing it. |
| Default to 60 seconds. |
| |
| arch_whitelist: <list of arches, such as x86, arm, arc> |
| Set of architectures that this test case should only be run for. |
| |
| arch_exclude: <list of arches, such as x86, arm, arc> |
| Set of architectures that this test case should not run on. |
| |
| platform_whitelist: <list of platforms> |
| Set of platforms that this test case should only be run for. |
| |
| platform_exclude: <list of platforms> |
| Set of platforms that this test case should not run on. |
| |
| extra_sections: <list of extra binary sections> |
| When computing sizes, sanitycheck will report errors if it finds |
| extra, unexpected sections in the Zephyr binary unless they are named |
| here. They will not be included in the size calculation. |
| |
| filter: <expression> |
| Filter whether the testcase should be run by evaluating an expression |
| against an environment containing the following values: |
| |
| { ARCH : <architecture>, |
| PLATFORM : <platform>, |
| <all CONFIG_* key/value pairs in the test's generated defconfig>, |
| <all DT_* key/value pairs in the test's generated device tree file>, |
| <all CMake key/value pairs in the test's generated CMakeCache.txt file>, |
| *<env>: any environment variable available |
| } |
| |
| The grammar for the expression language is as follows: |
| |
| expression ::= expression "and" expression |
| | expression "or" expression |
| | "not" expression |
| | "(" expression ")" |
| | symbol "==" constant |
| | symbol "!=" constant |
| | symbol "<" number |
| | symbol ">" number |
| | symbol ">=" number |
| | symbol "<=" number |
| | symbol "in" list |
| | symbol ":" string |
| | symbol |
| |
| list ::= "[" list_contents "]" |
| |
| list_contents ::= constant |
| | list_contents "," constant |
| |
| constant ::= number |
| | string |
| |
| |
| For the case where expression ::= symbol, it evaluates to true |
| if the symbol is defined to a non-empty string. |
| |
| Operator precedence, starting from lowest to highest: |
| |
| or (left associative) |
| and (left associative) |
| not (right associative) |
| all comparison operators (non-associative) |
| |
| arch_whitelist, arch_exclude, platform_whitelist, platform_exclude |
| are all syntactic sugar for these expressions. For instance |
| |
| arch_exclude = x86 arc |
| |
| Is the same as: |
| |
| filter = not ARCH in ["x86", "arc"] |
| |
| The ':' operator compiles the string argument as a regular expression, |
| and then returns a true value only if the symbol's value in the environment |
| matches. For example, if CONFIG_SOC="stm32f107xc" then |
| |
| filter = CONFIG_SOC : "stm.*" |
| |
| Would match it. |
| |
| The set of test cases that actually run depends on directives in the testcase |
| filed and options passed in on the command line. If there is any confusion, |
| running with -v or examining the discard report (sanitycheck_discard.csv) |
| can help show why particular test cases were skipped. |
| |
| Metrics (such as pass/fail state and binary size) for the last code |
| release are stored in scripts/sanity_chk/sanity_last_release.csv. |
| To update this, pass the --all --release options. |
| |
| To load arguments from a file, write '+' before the file name, e.g., |
| +file_name. File content must be one or more valid arguments separated by |
| line break instead of white spaces. |
| |
| Most everyday users will run with no arguments. |
| |
| """ |
| |
| import os |
| import contextlib |
| import string |
| import mmap |
| import argparse |
| import sys |
| import re |
| import subprocess |
| import multiprocessing |
| import select |
| import shutil |
| import shlex |
| import signal |
| import threading |
| import concurrent.futures |
| from threading import BoundedSemaphore |
| import queue |
| import time |
| import csv |
| import yaml |
| import glob |
| import serial |
| import concurrent |
| import xml.etree.ElementTree as ET |
| import logging |
| from colorama import Fore |
| from collections import OrderedDict |
| from itertools import islice |
| from pathlib import Path |
| from distutils.spawn import find_executable |
| |
| try: |
| from anytree import Node, RenderTree, find |
| except ImportError: |
| print("Install the anytree module to use the --test-tree option") |
| |
| try: |
| from tabulate import tabulate |
| except ImportError: |
| print("Install tabulate python module with pip to use --device-testing option.") |
| |
| ZEPHYR_BASE = os.getenv("ZEPHYR_BASE") |
| if not ZEPHYR_BASE: |
| sys.exit("$ZEPHYR_BASE environment variable undefined") |
| |
| sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts", "dts")) |
| import edtlib |
| |
| hw_map_local = threading.Lock() |
| report_lock = threading.Lock() |
| |
| # Use this for internal comparisons; that's what canonicalization is |
| # for. Don't use it when invoking other components of the build system |
| # to avoid confusing and hard to trace inconsistencies in error messages |
| # and logs, generated Makefiles, etc. compared to when users invoke these |
| # components directly. |
| # Note "normalization" is different from canonicalization, see os.path. |
| canonical_zephyr_base = os.path.realpath(ZEPHYR_BASE) |
| |
| sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts/")) |
| |
| from sanity_chk import scl |
| from sanity_chk import expr_parser |
| |
| VERBOSE = 0 |
| |
| RELEASE_DATA = os.path.join(ZEPHYR_BASE, "scripts", "sanity_chk", |
| "sanity_last_release.csv") |
| |
| |
| logger = logging.getLogger('sanitycheck') |
| logger.setLevel(logging.DEBUG) |
| |
| |
| class CMakeCacheEntry: |
| '''Represents a CMake cache entry. |
| |
| This class understands the type system in a CMakeCache.txt, and |
| converts the following cache types to Python types: |
| |
| Cache Type Python type |
| ---------- ------------------------------------------- |
| FILEPATH str |
| PATH str |
| STRING str OR list of str (if ';' is in the value) |
| BOOL bool |
| INTERNAL str OR list of str (if ';' is in the value) |
| ---------- ------------------------------------------- |
| ''' |
| |
| # Regular expression for a cache entry. |
| # |
| # CMake variable names can include escape characters, allowing a |
| # wider set of names than is easy to match with a regular |
| # expression. To be permissive here, use a non-greedy match up to |
| # the first colon (':'). This breaks if the variable name has a |
| # colon inside, but it's good enough. |
| CACHE_ENTRY = re.compile( |
| r'''(?P<name>.*?) # name |
| :(?P<type>FILEPATH|PATH|STRING|BOOL|INTERNAL) # type |
| =(?P<value>.*) # value |
| ''', re.X) |
| |
| @classmethod |
| def _to_bool(cls, val): |
| # Convert a CMake BOOL string into a Python bool. |
| # |
| # "True if the constant is 1, ON, YES, TRUE, Y, or a |
| # non-zero number. False if the constant is 0, OFF, NO, |
| # FALSE, N, IGNORE, NOTFOUND, the empty string, or ends in |
| # the suffix -NOTFOUND. Named boolean constants are |
| # case-insensitive. If the argument is not one of these |
| # constants, it is treated as a variable." |
| # |
| # https://cmake.org/cmake/help/v3.0/command/if.html |
| val = val.upper() |
| if val in ('ON', 'YES', 'TRUE', 'Y'): |
| return 1 |
| elif val in ('OFF', 'NO', 'FALSE', 'N', 'IGNORE', 'NOTFOUND', ''): |
| return 0 |
| elif val.endswith('-NOTFOUND'): |
| return 0 |
| else: |
| try: |
| v = int(val) |
| return v != 0 |
| except ValueError as exc: |
| raise ValueError('invalid bool {}'.format(val)) from exc |
| |
| @classmethod |
| def from_line(cls, line, line_no): |
| # Comments can only occur at the beginning of a line. |
| # (The value of an entry could contain a comment character). |
| if line.startswith('//') or line.startswith('#'): |
| return None |
| |
| # Whitespace-only lines do not contain cache entries. |
| if not line.strip(): |
| return None |
| |
| m = cls.CACHE_ENTRY.match(line) |
| if not m: |
| return None |
| |
| name, type_, value = (m.group(g) for g in ('name', 'type', 'value')) |
| if type_ == 'BOOL': |
| try: |
| value = cls._to_bool(value) |
| except ValueError as exc: |
| args = exc.args + ('on line {}: {}'.format(line_no, line),) |
| raise ValueError(args) from exc |
| elif type_ in ['STRING', 'INTERNAL']: |
| # If the value is a CMake list (i.e. is a string which |
| # contains a ';'), convert to a Python list. |
| if ';' in value: |
| value = value.split(';') |
| |
| return CMakeCacheEntry(name, value) |
| |
| def __init__(self, name, value): |
| self.name = name |
| self.value = value |
| |
| def __str__(self): |
| fmt = 'CMakeCacheEntry(name={}, value={})' |
| return fmt.format(self.name, self.value) |
| |
| |
| class CMakeCache: |
| '''Parses and represents a CMake cache file.''' |
| |
| @staticmethod |
| def from_file(cache_file): |
| return CMakeCache(cache_file) |
| |
| def __init__(self, cache_file): |
| self.cache_file = cache_file |
| self.load(cache_file) |
| |
| def load(self, cache_file): |
| entries = [] |
| with open(cache_file, 'r') as cache: |
| for line_no, line in enumerate(cache): |
| entry = CMakeCacheEntry.from_line(line, line_no) |
| if entry: |
| entries.append(entry) |
| self._entries = OrderedDict((e.name, e) for e in entries) |
| |
| def get(self, name, default=None): |
| entry = self._entries.get(name) |
| if entry is not None: |
| return entry.value |
| else: |
| return default |
| |
| def get_list(self, name, default=None): |
| if default is None: |
| default = [] |
| entry = self._entries.get(name) |
| if entry is not None: |
| value = entry.value |
| if isinstance(value, list): |
| return value |
| elif isinstance(value, str): |
| return [value] if value else [] |
| else: |
| msg = 'invalid value {} type {}' |
| raise RuntimeError(msg.format(value, type(value))) |
| else: |
| return default |
| |
| def __contains__(self, name): |
| return name in self._entries |
| |
| def __getitem__(self, name): |
| return self._entries[name].value |
| |
| def __setitem__(self, name, entry): |
| if not isinstance(entry, CMakeCacheEntry): |
| msg = 'improper type {} for value {}, expecting CMakeCacheEntry' |
| raise TypeError(msg.format(type(entry), entry)) |
| self._entries[name] = entry |
| |
| def __delitem__(self, name): |
| del self._entries[name] |
| |
| def __iter__(self): |
| return iter(self._entries.values()) |
| |
| |
| class SanityCheckException(Exception): |
| pass |
| |
| |
| class SanityRuntimeError(SanityCheckException): |
| pass |
| |
| |
| class ConfigurationError(SanityCheckException): |
| def __init__(self, cfile, message): |
| SanityCheckException.__init__(self, cfile + ": " + message) |
| |
| |
| class BuildError(SanityCheckException): |
| pass |
| |
| |
| class ExecutionError(SanityCheckException): |
| pass |
| |
| |
| class HarnessImporter: |
| |
| def __init__(self, name): |
| sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts/sanity_chk")) |
| module = __import__("harness") |
| if name: |
| my_class = getattr(module, name) |
| else: |
| my_class = getattr(module, "Test") |
| |
| self.instance = my_class() |
| |
| |
| class Handler: |
| def __init__(self, instance, type_str="build"): |
| """Constructor |
| |
| """ |
| self.lock = threading.Lock() |
| |
| self.state = "waiting" |
| self.run = False |
| self.duration = 0 |
| self.type_str = type_str |
| |
| self.binary = None |
| self.pid_fn = None |
| self.call_make_run = False |
| |
| self.name = instance.name |
| self.instance = instance |
| self.timeout = instance.testcase.timeout |
| self.sourcedir = instance.testcase.source_dir |
| self.build_dir = instance.build_dir |
| self.log = os.path.join(self.build_dir, "handler.log") |
| self.returncode = 0 |
| self.set_state("running", self.duration) |
| |
| self.args = [] |
| |
| def set_state(self, state, duration): |
| self.lock.acquire() |
| self.state = state |
| self.duration = duration |
| self.lock.release() |
| |
| def get_state(self): |
| self.lock.acquire() |
| ret = (self.state, self.duration) |
| self.lock.release() |
| return ret |
| |
| def record(self, harness): |
| if harness.recording: |
| filename = os.path.join(self.build_dir, "recording.csv") |
| with open(filename, "at") as csvfile: |
| cw = csv.writer(csvfile, harness.fieldnames, lineterminator=os.linesep) |
| cw.writerow(harness.fieldnames) |
| for instance in harness.recording: |
| cw.writerow(instance) |
| |
| |
| class BinaryHandler(Handler): |
| def __init__(self, instance, type_str): |
| """Constructor |
| |
| @param instance Test Instance |
| """ |
| super().__init__(instance, type_str) |
| |
| self.terminated = False |
| |
| # Tool options |
| self.valgrind = False |
| self.lsan = False |
| self.asan = False |
| self.coverage = False |
| |
| def try_kill_process_by_pid(self): |
| if self.pid_fn: |
| pid = int(open(self.pid_fn).read()) |
| os.unlink(self.pid_fn) |
| self.pid_fn = None # clear so we don't try to kill the binary twice |
| try: |
| os.kill(pid, signal.SIGTERM) |
| except ProcessLookupError: |
| pass |
| |
| def terminate(self, proc): |
| # encapsulate terminate functionality so we do it consistently where ever |
| # we might want to terminate the proc. We need try_kill_process_by_pid |
| # because of both how newer ninja (1.6.0 or greater) and .NET / renode |
| # work. Newer ninja's don't seem to pass SIGTERM down to the children |
| # so we need to use try_kill_process_by_pid. |
| self.try_kill_process_by_pid() |
| proc.terminate() |
| self.terminated = True |
| |
| def _output_reader(self, proc, harness): |
| log_out_fp = open(self.log, "wt") |
| for line in iter(proc.stdout.readline, b''): |
| logger.debug("OUTPUT: {0}".format(line.decode('utf-8').rstrip())) |
| log_out_fp.write(line.decode('utf-8')) |
| log_out_fp.flush() |
| harness.handle(line.decode('utf-8').rstrip()) |
| if harness.state: |
| try: |
| # POSIX arch based ztests end on their own, |
| # so let's give it up to 100ms to do so |
| proc.wait(0.1) |
| except subprocess.TimeoutExpired: |
| self.terminate(proc) |
| break |
| |
| log_out_fp.close() |
| |
| def handle(self): |
| |
| harness_name = self.instance.testcase.harness.capitalize() |
| harness_import = HarnessImporter(harness_name) |
| harness = harness_import.instance |
| harness.configure(self.instance) |
| |
| if self.call_make_run: |
| command = [get_generator()[0], "run"] |
| else: |
| command = [self.binary] |
| |
| run_valgrind = False |
| if self.valgrind and shutil.which("valgrind"): |
| command = ["valgrind", "--error-exitcode=2", |
| "--leak-check=full", |
| "--suppressions=" + ZEPHYR_BASE + "/scripts/valgrind.supp", |
| "--log-file=" + self.build_dir + "/valgrind.log" |
| ] + command |
| run_valgrind = True |
| |
| logger.debug("Spawning process: " + |
| " ".join(shlex.quote(word) for word in command) + os.linesep + |
| "in directory: " + self.build_dir) |
| |
| start_time = time.time() |
| |
| env = os.environ.copy() |
| if self.asan: |
| env["ASAN_OPTIONS"] = "log_path=stdout:" + \ |
| env.get("ASAN_OPTIONS", "") |
| if not self.lsan: |
| env["ASAN_OPTIONS"] += "detect_leaks=0" |
| with subprocess.Popen(command, stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE, cwd=self.build_dir, env=env) as proc: |
| logger.debug("Spawning BinaryHandler Thread for %s" % self.name) |
| t = threading.Thread(target=self._output_reader, args=(proc, harness,), daemon=True) |
| t.start() |
| t.join(self.timeout) |
| if t.is_alive(): |
| self.terminate(proc) |
| t.join() |
| proc.wait() |
| self.returncode = proc.returncode |
| |
| handler_time = time.time() - start_time |
| |
| if self.coverage: |
| subprocess.call(["GCOV_PREFIX=" + self.build_dir, |
| "gcov", self.sourcedir, "-b", "-s", self.build_dir], shell=True) |
| |
| self.try_kill_process_by_pid() |
| |
| # FIXME: This is needed when killing the simulator, the console is |
| # garbled and needs to be reset. Did not find a better way to do that. |
| |
| subprocess.call(["stty", "sane"]) |
| self.instance.results = harness.tests |
| |
| if not self.terminated and self.returncode != 0: |
| # When a process is killed, the default handler returns 128 + SIGTERM |
| # so in that case the return code itself is not meaningful |
| self.set_state("failed", handler_time) |
| self.instance.reason = "Failed" |
| elif run_valgrind and self.returncode == 2: |
| self.set_state("failed", handler_time) |
| self.instance.reason = "Valgrind error" |
| elif harness.state: |
| self.set_state(harness.state, handler_time) |
| else: |
| self.set_state("timeout", handler_time) |
| self.instance.reason = "Timeout" |
| |
| self.record(harness) |
| |
| |
| class DeviceHandler(Handler): |
| |
| def __init__(self, instance, type_str): |
| """Constructor |
| |
| @param instance Test Instance |
| """ |
| super().__init__(instance, type_str) |
| |
| self.suite = None |
| |
| def monitor_serial(self, ser, halt_fileno, harness): |
| log_out_fp = open(self.log, "wt") |
| |
| ser_fileno = ser.fileno() |
| readlist = [halt_fileno, ser_fileno] |
| |
| while ser.isOpen(): |
| readable, _, _ = select.select(readlist, [], [], self.timeout) |
| |
| if halt_fileno in readable: |
| logger.debug('halted') |
| ser.close() |
| break |
| if ser_fileno not in readable: |
| continue # Timeout. |
| |
| serial_line = None |
| try: |
| serial_line = ser.readline() |
| except TypeError: |
| pass |
| except serial.SerialException: |
| ser.close() |
| break |
| |
| # Just because ser_fileno has data doesn't mean an entire line |
| # is available yet. |
| if serial_line: |
| sl = serial_line.decode('utf-8', 'ignore') |
| logger.debug("DEVICE: {0}".format(sl.rstrip())) |
| |
| log_out_fp.write(sl) |
| log_out_fp.flush() |
| harness.handle(sl.rstrip()) |
| |
| if harness.state: |
| ser.close() |
| break |
| |
| log_out_fp.close() |
| |
| def device_is_available(self, device): |
| for i in self.suite.connected_hardware: |
| if i['platform'] == device and i['available'] and i['serial']: |
| return True |
| |
| return False |
| |
| def get_available_device(self, device): |
| for i in self.suite.connected_hardware: |
| if i['platform'] == device and i['available'] and i['serial']: |
| i['available'] = False |
| i['counter'] += 1 |
| return i |
| |
| return None |
| |
| def make_device_available(self, serial): |
| with hw_map_local: |
| for i in self.suite.connected_hardware: |
| if i['serial'] == serial: |
| i['available'] = True |
| |
| def handle(self): |
| out_state = "failed" |
| |
| if options.west_flash: |
| command = ["west", "flash", "--skip-rebuild", "-d", self.build_dir] |
| if options.west_runner: |
| command.append("--runner") |
| command.append(options.west_runner) |
| # There are three ways this option is used. |
| # 1) bare: --west-flash |
| # This results in options.west_flash == [] |
| # 2) with a value: --west-flash="--board-id=42" |
| # This results in options.west_flash == "--board-id=42" |
| # 3) Multiple values: --west-flash="--board-id=42,--erase" |
| # This results in options.west_flash == "--board-id=42 --erase" |
| if options.west_flash != []: |
| command.append('--') |
| command.extend(options.west_flash.split(',')) |
| else: |
| command = [get_generator()[0], "-C", self.build_dir, "flash"] |
| |
| while not self.device_is_available(self.instance.platform.name): |
| time.sleep(1) |
| |
| hardware = self.get_available_device(self.instance.platform.name) |
| |
| runner = hardware.get('runner', None) |
| if runner: |
| board_id = hardware.get("probe_id", hardware.get("id", None)) |
| product = hardware.get("product", None) |
| command = ["west", "flash", "--skip-rebuild", "-d", self.build_dir] |
| command.append("--runner") |
| command.append(hardware.get('runner', None)) |
| if runner == "pyocd": |
| command.append("--board-id") |
| command.append(board_id) |
| elif runner == "nrfjprog": |
| command.append('--') |
| command.append("--snr") |
| command.append(board_id) |
| elif runner == "openocd" and product == "STM32 STLink": |
| command.append('--') |
| command.append("--cmd-pre-init") |
| command.append("hla_serial %s" % (board_id)) |
| elif runner == "openocd" and product == "EDBG CMSIS-DAP": |
| command.append('--') |
| command.append("--cmd-pre-init") |
| command.append("cmsis_dap_serial %s" % (board_id)) |
| elif runner == "jlink": |
| command.append("--tool-opt=-SelectEmuBySN %s" % (board_id)) |
| |
| serial_device = hardware['serial'] |
| |
| try: |
| ser = serial.Serial( |
| serial_device, |
| baudrate=115200, |
| parity=serial.PARITY_NONE, |
| stopbits=serial.STOPBITS_ONE, |
| bytesize=serial.EIGHTBITS, |
| timeout=self.timeout |
| ) |
| except serial.SerialException as e: |
| self.set_state("failed", 0) |
| self.instance.reason = "Failed" |
| logger.error("Serial device error: %s" % (str(e))) |
| self.make_device_available(serial_device) |
| return |
| |
| ser.flush() |
| |
| harness_name = self.instance.testcase.harness.capitalize() |
| harness_import = HarnessImporter(harness_name) |
| harness = harness_import.instance |
| harness.configure(self.instance) |
| read_pipe, write_pipe = os.pipe() |
| start_time = time.time() |
| |
| t = threading.Thread(target=self.monitor_serial, daemon=True, |
| args=(ser, read_pipe, harness)) |
| t.start() |
| |
| d_log = "{}/device.log".format(self.instance.build_dir) |
| logger.debug('Flash command: %s', command) |
| try: |
| stdout = stderr = None |
| with subprocess.Popen(command, stderr=subprocess.PIPE, stdout=subprocess.PIPE) as proc: |
| try: |
| (stdout, stderr) = proc.communicate(timeout=30) |
| logger.debug(stdout.decode()) |
| |
| if proc.returncode != 0: |
| self.instance.reason = "Device issue (Flash?)" |
| with open(d_log, "w") as dlog_fp: |
| dlog_fp.write(stderr.decode()) |
| except subprocess.TimeoutExpired: |
| proc.kill() |
| (stdout, stderr) = proc.communicate() |
| self.instance.reason = "Device issue (Timeout)" |
| |
| with open(d_log, "w") as dlog_fp: |
| dlog_fp.write(stderr.decode()) |
| |
| except subprocess.CalledProcessError: |
| os.write(write_pipe, b'x') # halt the thread |
| |
| t.join(self.timeout) |
| if t.is_alive(): |
| out_state = "timeout" |
| |
| if ser.isOpen(): |
| ser.close() |
| |
| handler_time = time.time() - start_time |
| |
| if out_state == "timeout": |
| for c in self.instance.testcase.cases: |
| if c not in harness.tests: |
| harness.tests[c] = "BLOCK" |
| |
| self.instance.reason = "Timeout" |
| |
| self.instance.results = harness.tests |
| |
| if harness.state: |
| self.set_state(harness.state, handler_time) |
| if harness.state == "failed": |
| self.instance.reason = "Failed" |
| else: |
| self.set_state(out_state, handler_time) |
| |
| self.make_device_available(serial_device) |
| |
| self.record(harness) |
| |
| |
| class QEMUHandler(Handler): |
| """Spawns a thread to monitor QEMU output from pipes |
| |
| We pass QEMU_PIPE to 'make run' and monitor the pipes for output. |
| We need to do this as once qemu starts, it runs forever until killed. |
| Test cases emit special messages to the console as they run, we check |
| for these to collect whether the test passed or failed. |
| """ |
| |
| def __init__(self, instance, type_str): |
| """Constructor |
| |
| @param instance Test instance |
| """ |
| |
| super().__init__(instance, type_str) |
| self.fifo_fn = os.path.join(instance.build_dir, "qemu-fifo") |
| |
| self.pid_fn = os.path.join(instance.build_dir, "qemu.pid") |
| |
| @staticmethod |
| def _thread(handler, timeout, outdir, logfile, fifo_fn, pid_fn, results, harness): |
| fifo_in = fifo_fn + ".in" |
| fifo_out = fifo_fn + ".out" |
| |
| # These in/out nodes are named from QEMU's perspective, not ours |
| if os.path.exists(fifo_in): |
| os.unlink(fifo_in) |
| os.mkfifo(fifo_in) |
| if os.path.exists(fifo_out): |
| os.unlink(fifo_out) |
| os.mkfifo(fifo_out) |
| |
| # We don't do anything with out_fp but we need to open it for |
| # writing so that QEMU doesn't block, due to the way pipes work |
| out_fp = open(fifo_in, "wb") |
| # Disable internal buffering, we don't |
| # want read() or poll() to ever block if there is data in there |
| in_fp = open(fifo_out, "rb", buffering=0) |
| log_out_fp = open(logfile, "wt") |
| |
| start_time = time.time() |
| timeout_time = start_time + timeout |
| p = select.poll() |
| p.register(in_fp, select.POLLIN) |
| out_state = None |
| |
| line = "" |
| timeout_extended = False |
| while True: |
| this_timeout = int((timeout_time - time.time()) * 1000) |
| if this_timeout < 0 or not p.poll(this_timeout): |
| if not out_state: |
| out_state = "timeout" |
| break |
| |
| try: |
| c = in_fp.read(1).decode("utf-8") |
| except UnicodeDecodeError: |
| # Test is writing something weird, fail |
| out_state = "unexpected byte" |
| break |
| |
| if c == "": |
| # EOF, this shouldn't happen unless QEMU crashes |
| out_state = "unexpected eof" |
| break |
| line = line + c |
| if c != "\n": |
| continue |
| |
| # line contains a full line of data output from QEMU |
| log_out_fp.write(line) |
| log_out_fp.flush() |
| line = line.strip() |
| logger.debug("QEMU: %s" % line) |
| |
| harness.handle(line) |
| if harness.state: |
| # if we have registered a fail make sure the state is not |
| # overridden by a false success message coming from the |
| # testsuite |
| if out_state != 'failed': |
| out_state = harness.state |
| |
| # if we get some state, that means test is doing well, we reset |
| # the timeout and wait for 2 more seconds to catch anything |
| # printed late. We wait much longer if code |
| # coverage is enabled since dumping this information can |
| # take some time. |
| if not timeout_extended or harness.capture_coverage: |
| timeout_extended = True |
| if harness.capture_coverage: |
| timeout_time = time.time() + 30 |
| else: |
| timeout_time = time.time() + 2 |
| line = "" |
| |
| handler.record(harness) |
| |
| handler_time = time.time() - start_time |
| logger.debug("QEMU complete (%s) after %f seconds" % |
| (out_state, handler_time)) |
| handler.set_state(out_state, handler_time) |
| if out_state == "timeout": |
| handler.instance.reason = "Timeout" |
| elif out_state == "failed": |
| handler.instance.reason = "Failed" |
| |
| log_out_fp.close() |
| out_fp.close() |
| in_fp.close() |
| if os.path.exists(pid_fn): |
| pid = int(open(pid_fn).read()) |
| os.unlink(pid_fn) |
| |
| try: |
| if pid: |
| os.kill(pid, signal.SIGTERM) |
| except ProcessLookupError: |
| # Oh well, as long as it's dead! User probably sent Ctrl-C |
| pass |
| |
| os.unlink(fifo_in) |
| os.unlink(fifo_out) |
| |
| def handle(self): |
| self.results = {} |
| self.run = True |
| |
| # We pass this to QEMU which looks for fifos with .in and .out |
| # suffixes. |
| self.fifo_fn = os.path.join(self.instance.build_dir, "qemu-fifo") |
| |
| self.pid_fn = os.path.join(self.instance.build_dir, "qemu.pid") |
| if os.path.exists(self.pid_fn): |
| os.unlink(self.pid_fn) |
| |
| self.log_fn = self.log |
| |
| harness_import = HarnessImporter(self.instance.testcase.harness.capitalize()) |
| harness = harness_import.instance |
| harness.configure(self.instance) |
| self.thread = threading.Thread(name=self.name, target=QEMUHandler._thread, |
| args=(self, self.timeout, self.build_dir, |
| self.log_fn, self.fifo_fn, |
| self.pid_fn, self.results, harness)) |
| |
| self.instance.results = harness.tests |
| self.thread.daemon = True |
| logger.debug("Spawning QEMUHandler Thread for %s" % self.name) |
| self.thread.start() |
| subprocess.call(["stty", "sane"]) |
| |
| logger.debug("Running %s (%s)" % (self.name, self.type_str)) |
| command = [get_generator()[0]] |
| command += ["-C", self.build_dir, "run"] |
| |
| with subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=self.build_dir) as proc: |
| logger.debug("Spawning QEMUHandler Thread for %s" % self.name) |
| proc.wait() |
| self.returncode = proc.returncode |
| |
| if self.returncode != 0: |
| self.set_state("failed", 0) |
| self.instance.reason = "Exited with {}".format(self.returncode) |
| |
| def get_fifo(self): |
| return self.fifo_fn |
| |
| |
| class SizeCalculator: |
| alloc_sections = [ |
| "bss", |
| "noinit", |
| "app_bss", |
| "app_noinit", |
| "ccm_bss", |
| "ccm_noinit" |
| ] |
| |
| rw_sections = [ |
| "datas", |
| "initlevel", |
| "exceptions", |
| "initshell", |
| "_static_thread_area", |
| "_k_timer_area", |
| "_k_mem_slab_area", |
| "_k_mem_pool_area", |
| "sw_isr_table", |
| "_k_sem_area", |
| "_k_mutex_area", |
| "app_shmem_regions", |
| "_k_fifo_area", |
| "_k_lifo_area", |
| "_k_stack_area", |
| "_k_msgq_area", |
| "_k_mbox_area", |
| "_k_pipe_area", |
| "net_if", |
| "net_if_dev", |
| "net_stack", |
| "net_l2_data", |
| "_k_queue_area", |
| "_net_buf_pool_area", |
| "app_datas", |
| "kobject_data", |
| "mmu_tables", |
| "app_pad", |
| "priv_stacks", |
| "ccm_data", |
| "usb_descriptor", |
| "usb_data", "usb_bos_desc", |
| 'log_backends_sections', |
| 'log_dynamic_sections', |
| 'log_const_sections', |
| "app_smem", |
| 'shell_root_cmds_sections', |
| 'log_const_sections', |
| "font_entry_sections", |
| "priv_stacks_noinit", |
| "_TEXT_SECTION_NAME_2", |
| "_GCOV_BSS_SECTION_NAME", |
| "gcov", |
| "nocache" |
| ] |
| |
| # These get copied into RAM only on non-XIP |
| ro_sections = [ |
| "text", |
| "ctors", |
| "init_array", |
| "reset", |
| "object_access", |
| "rodata", |
| "devconfig", |
| "net_l2", |
| "vector", |
| "sw_isr_table", |
| "_settings_handlers_area", |
| "_bt_channels_area", |
| "_bt_br_channels_area", |
| "_bt_services_area", |
| "vectors", |
| "net_socket_register", |
| "net_ppp_proto" |
| ] |
| |
| def __init__(self, filename, extra_sections): |
| """Constructor |
| |
| @param filename Path to the output binary |
| The <filename> is parsed by objdump to determine section sizes |
| """ |
| # Make sure this is an ELF binary |
| with open(filename, "rb") as f: |
| magic = f.read(4) |
| |
| try: |
| if magic != b'\x7fELF': |
| raise SanityRuntimeError("%s is not an ELF binary" % filename) |
| except Exception as e: |
| print(str(e)) |
| sys.exit(2) |
| |
| # Search for CONFIG_XIP in the ELF's list of symbols using NM and AWK. |
| # GREP can not be used as it returns an error if the symbol is not |
| # found. |
| is_xip_command = "nm " + filename + \ |
| " | awk '/CONFIG_XIP/ { print $3 }'" |
| is_xip_output = subprocess.check_output( |
| is_xip_command, shell=True, stderr=subprocess.STDOUT).decode( |
| "utf-8").strip() |
| try: |
| if is_xip_output.endswith("no symbols"): |
| raise SanityRuntimeError("%s has no symbol information" % filename) |
| except Exception as e: |
| print(str(e)) |
| sys.exit(2) |
| |
| self.is_xip = (len(is_xip_output) != 0) |
| |
| self.filename = filename |
| self.sections = [] |
| self.rom_size = 0 |
| self.ram_size = 0 |
| self.extra_sections = extra_sections |
| |
| self._calculate_sizes() |
| |
| def get_ram_size(self): |
| """Get the amount of RAM the application will use up on the device |
| |
| @return amount of RAM, in bytes |
| """ |
| return self.ram_size |
| |
| def get_rom_size(self): |
| """Get the size of the data that this application uses on device's flash |
| |
| @return amount of ROM, in bytes |
| """ |
| return self.rom_size |
| |
| def unrecognized_sections(self): |
| """Get a list of sections inside the binary that weren't recognized |
| |
| @return list of unrecognized section names |
| """ |
| slist = [] |
| for v in self.sections: |
| if not v["recognized"]: |
| slist.append(v["name"]) |
| return slist |
| |
| def _calculate_sizes(self): |
| """ Calculate RAM and ROM usage by section """ |
| objdump_command = "objdump -h " + self.filename |
| objdump_output = subprocess.check_output( |
| objdump_command, shell=True).decode("utf-8").splitlines() |
| |
| for line in objdump_output: |
| words = line.split() |
| |
| if not words: # Skip lines that are too short |
| continue |
| |
| index = words[0] |
| if not index[0].isdigit(): # Skip lines that do not start |
| continue # with a digit |
| |
| name = words[1] # Skip lines with section names |
| if name[0] == '.': # starting with '.' |
| continue |
| |
| # TODO this doesn't actually reflect the size in flash or RAM as |
| # it doesn't include linker-imposed padding between sections. |
| # It is close though. |
| size = int(words[2], 16) |
| if size == 0: |
| continue |
| |
| load_addr = int(words[4], 16) |
| virt_addr = int(words[3], 16) |
| |
| # Add section to memory use totals (for both non-XIP and XIP scenarios) |
| # Unrecognized section names are not included in the calculations. |
| recognized = True |
| if name in SizeCalculator.alloc_sections: |
| self.ram_size += size |
| stype = "alloc" |
| elif name in SizeCalculator.rw_sections: |
| self.ram_size += size |
| self.rom_size += size |
| stype = "rw" |
| elif name in SizeCalculator.ro_sections: |
| self.rom_size += size |
| if not self.is_xip: |
| self.ram_size += size |
| stype = "ro" |
| else: |
| stype = "unknown" |
| if name not in self.extra_sections: |
| recognized = False |
| |
| self.sections.append({"name": name, "load_addr": load_addr, |
| "size": size, "virt_addr": virt_addr, |
| "type": stype, "recognized": recognized}) |
| |
| |
| # "list" - List of strings |
| # "list:<type>" - List of <type> |
| # "set" - Set of unordered, unique strings |
| # "set:<type>" - Set of <type> |
| # "float" - Floating point |
| # "int" - Integer |
| # "bool" - Boolean |
| # "str" - String |
| |
| # XXX Be sure to update __doc__ if you change any of this!! |
| |
| platform_valid_keys = { |
| "supported_toolchains": {"type": "list", "default": []}, |
| "env": {"type": "list", "default": []} |
| } |
| |
| testcase_valid_keys = {"tags": {"type": "set", "required": False}, |
| "type": {"type": "str", "default": "integration"}, |
| "extra_args": {"type": "list"}, |
| "extra_configs": {"type": "list"}, |
| "build_only": {"type": "bool", "default": False}, |
| "build_on_all": {"type": "bool", "default": False}, |
| "skip": {"type": "bool", "default": False}, |
| "slow": {"type": "bool", "default": False}, |
| "timeout": {"type": "int", "default": 60}, |
| "min_ram": {"type": "int", "default": 8}, |
| "depends_on": {"type": "set"}, |
| "min_flash": {"type": "int", "default": 32}, |
| "arch_whitelist": {"type": "set"}, |
| "arch_exclude": {"type": "set"}, |
| "extra_sections": {"type": "list", "default": []}, |
| "platform_exclude": {"type": "set"}, |
| "platform_whitelist": {"type": "set"}, |
| "toolchain_exclude": {"type": "set"}, |
| "toolchain_whitelist": {"type": "set"}, |
| "filter": {"type": "str"}, |
| "harness": {"type": "str"}, |
| "harness_config": {"type": "map", "default": {}} |
| } |
| |
| |
| class SanityConfigParser: |
| """Class to read test case files with semantic checking |
| """ |
| |
| def __init__(self, filename, schema): |
| """Instantiate a new SanityConfigParser object |
| |
| @param filename Source .yaml file to read |
| """ |
| self.data = {} |
| self.schema = schema |
| self.filename = filename |
| self.tests = {} |
| self.common = {} |
| |
| def load(self): |
| self.data = scl.yaml_load_verify(self.filename, self.schema) |
| |
| if 'tests' in self.data: |
| self.tests = self.data['tests'] |
| if 'common' in self.data: |
| self.common = self.data['common'] |
| |
| def _cast_value(self, value, typestr): |
| if isinstance(value, str): |
| v = value.strip() |
| if typestr == "str": |
| return v |
| |
| elif typestr == "float": |
| return float(value) |
| |
| elif typestr == "int": |
| return int(value) |
| |
| elif typestr == "bool": |
| return value |
| |
| elif typestr.startswith("list") and isinstance(value, list): |
| return value |
| elif typestr.startswith("list") and isinstance(value, str): |
| vs = v.split() |
| if len(typestr) > 4 and typestr[4] == ":": |
| return [self._cast_value(vsi, typestr[5:]) for vsi in vs] |
| else: |
| return vs |
| |
| elif typestr.startswith("set"): |
| vs = v.split() |
| if len(typestr) > 3 and typestr[3] == ":": |
| return {self._cast_value(vsi, typestr[4:]) for vsi in vs} |
| else: |
| return set(vs) |
| |
| elif typestr.startswith("map"): |
| return value |
| else: |
| raise ConfigurationError( |
| self.filename, "unknown type '%s'" % value) |
| |
| def get_test(self, name, valid_keys): |
| """Get a dictionary representing the keys/values within a test |
| |
| @param name The test in the .yaml file to retrieve data from |
| @param valid_keys A dictionary representing the intended semantics |
| for this test. Each key in this dictionary is a key that could |
| be specified, if a key is given in the .yaml file which isn't in |
| here, it will generate an error. Each value in this dictionary |
| is another dictionary containing metadata: |
| |
| "default" - Default value if not given |
| "type" - Data type to convert the text value to. Simple types |
| supported are "str", "float", "int", "bool" which will get |
| converted to respective Python data types. "set" and "list" |
| may also be specified which will split the value by |
| whitespace (but keep the elements as strings). finally, |
| "list:<type>" and "set:<type>" may be given which will |
| perform a type conversion after splitting the value up. |
| "required" - If true, raise an error if not defined. If false |
| and "default" isn't specified, a type conversion will be |
| done on an empty string |
| @return A dictionary containing the test key-value pairs with |
| type conversion and default values filled in per valid_keys |
| """ |
| |
| d = {} |
| for k, v in self.common.items(): |
| d[k] = v |
| |
| for k, v in self.tests[name].items(): |
| if k not in valid_keys: |
| raise ConfigurationError( |
| self.filename, |
| "Unknown config key '%s' in definition for '%s'" % |
| (k, name)) |
| |
| if k in d: |
| if isinstance(d[k], str): |
| # By default, we just concatenate string values of keys |
| # which appear both in "common" and per-test sections, |
| # but some keys are handled in adhoc way based on their |
| # semantics. |
| if k == "filter": |
| d[k] = "(%s) and (%s)" % (d[k], v) |
| else: |
| d[k] += " " + v |
| else: |
| d[k] = v |
| |
| for k, kinfo in valid_keys.items(): |
| if k not in d: |
| if "required" in kinfo: |
| required = kinfo["required"] |
| else: |
| required = False |
| |
| if required: |
| raise ConfigurationError( |
| self.filename, |
| "missing required value for '%s' in test '%s'" % |
| (k, name)) |
| else: |
| if "default" in kinfo: |
| default = kinfo["default"] |
| else: |
| default = self._cast_value("", kinfo["type"]) |
| d[k] = default |
| else: |
| try: |
| d[k] = self._cast_value(d[k], kinfo["type"]) |
| except ValueError: |
| raise ConfigurationError( |
| self.filename, "bad %s value '%s' for key '%s' in name '%s'" % |
| (kinfo["type"], d[k], k, name)) |
| |
| return d |
| |
| |
| class Platform: |
| """Class representing metadata for a particular platform |
| |
| Maps directly to BOARD when building""" |
| |
| platform_schema = scl.yaml_load(os.path.join(ZEPHYR_BASE, |
| "scripts", "sanity_chk", "platform-schema.yaml")) |
| |
| def __init__(self): |
| """Constructor. |
| |
| """ |
| |
| self.name = "" |
| self.sanitycheck = True |
| # if no RAM size is specified by the board, take a default of 128K |
| self.ram = 128 |
| |
| self.ignore_tags = [] |
| self.default = False |
| # if no flash size is specified by the board, take a default of 512K |
| self.flash = 512 |
| self.supported = set() |
| |
| self.arch = "" |
| self.type = "na" |
| self.simulation = "na" |
| self.supported_toolchains = [] |
| self.env = [] |
| self.env_satisfied = True |
| self.filter_data = dict() |
| |
| def load(self, platform_file): |
| scp = SanityConfigParser(platform_file, self.platform_schema) |
| scp.load() |
| data = scp.data |
| |
| self.name = data['identifier'] |
| self.sanitycheck = data.get("sanitycheck", True) |
| # if no RAM size is specified by the board, take a default of 128K |
| self.ram = data.get("ram", 128) |
| testing = data.get("testing", {}) |
| self.ignore_tags = testing.get("ignore_tags", []) |
| self.default = testing.get("default", False) |
| # if no flash size is specified by the board, take a default of 512K |
| self.flash = data.get("flash", 512) |
| self.supported = set() |
| for supp_feature in data.get("supported", []): |
| for item in supp_feature.split(":"): |
| self.supported.add(item) |
| |
| self.arch = data['arch'] |
| self.type = data.get('type', "na") |
| self.simulation = data.get('simulation', "na") |
| self.supported_toolchains = data.get("toolchain", []) |
| self.env = data.get("env", []) |
| self.env_satisfied = True |
| for env in self.env: |
| if not os.environ.get(env, None): |
| self.env_satisfied = False |
| |
| def __repr__(self): |
| return "<%s on %s>" % (self.name, self.arch) |
| |
| |
| class TestCase(object): |
| """Class representing a test application |
| """ |
| |
| def __init__(self): |
| """TestCase constructor. |
| |
| This gets called by TestSuite as it finds and reads test yaml files. |
| Multiple TestCase instances may be generated from a single testcase.yaml, |
| each one corresponds to an entry within that file. |
| |
| We need to have a unique name for every single test case. Since |
| a testcase.yaml can define multiple tests, the canonical name for |
| the test case is <workdir>/<name>. |
| |
| @param testcase_root os.path.abspath() of one of the --testcase-root |
| @param workdir Sub-directory of testcase_root where the |
| .yaml test configuration file was found |
| @param name Name of this test case, corresponding to the entry name |
| in the test case configuration file. For many test cases that just |
| define one test, can be anything and is usually "test". This is |
| really only used to distinguish between different cases when |
| the testcase.yaml defines multiple tests |
| @param tc_dict Dictionary with test values for this test case |
| from the testcase.yaml file |
| """ |
| |
| self.id = "" |
| self.source_dir = "" |
| self.yamlfile = "" |
| self.cases = [] |
| self.name = "" |
| |
| self.type = None |
| self.tags = None |
| self.extra_args = None |
| self.extra_configs = None |
| self.arch_whitelist = None |
| self.arch_exclude = None |
| self.skip = None |
| self.platform_exclude = None |
| self.platform_whitelist = None |
| self.toolchain_exclude = None |
| self.toolchain_whitelist = None |
| self.tc_filter = None |
| self.timeout = 60 |
| self.harness = "" |
| self.harness_config = {} |
| self.build_only = True |
| self.build_on_all = False |
| self.slow = False |
| self.min_ram = None |
| self.depends_on = None |
| self.min_flash = None |
| self.extra_sections = None |
| |
| @staticmethod |
| def get_unique(testcase_root, workdir, name): |
| |
| canonical_testcase_root = os.path.realpath(testcase_root) |
| if Path(canonical_zephyr_base) in Path(canonical_testcase_root).parents: |
| # This is in ZEPHYR_BASE, so include path in name for uniqueness |
| # FIXME: We should not depend on path of test for unique names. |
| relative_tc_root = os.path.relpath(canonical_testcase_root, |
| start=canonical_zephyr_base) |
| else: |
| relative_tc_root = "" |
| |
| # workdir can be "." |
| unique = os.path.normpath(os.path.join(relative_tc_root, workdir, name)) |
| return unique |
| |
| @staticmethod |
| def scan_file(inf_name): |
| suite_regex = re.compile( |
| # do not match until end-of-line, otherwise we won't allow |
| # stc_regex below to catch the ones that are declared in the same |
| # line--as we only search starting the end of this match |
| br"^\s*ztest_test_suite\(\s*(?P<suite_name>[a-zA-Z0-9_]+)\s*,", |
| re.MULTILINE) |
| stc_regex = re.compile( |
| br"^\s*" # empy space at the beginning is ok |
| # catch the case where it is declared in the same sentence, e.g: |
| # |
| # ztest_test_suite(mutex_complex, ztest_user_unit_test(TESTNAME)); |
| br"(?:ztest_test_suite\([a-zA-Z0-9_]+,\s*)?" |
| # Catch ztest[_user]_unit_test-[_setup_teardown](TESTNAME) |
| br"ztest_(?:1cpu_)?(?:user_)?unit_test(?:_setup_teardown)?" |
| # Consume the argument that becomes the extra testcse |
| br"\(\s*" |
| br"(?P<stc_name>[a-zA-Z0-9_]+)" |
| # _setup_teardown() variant has two extra arguments that we ignore |
| br"(?:\s*,\s*[a-zA-Z0-9_]+\s*,\s*[a-zA-Z0-9_]+)?" |
| br"\s*\)", |
| # We don't check how it finishes; we don't care |
| re.MULTILINE) |
| suite_run_regex = re.compile( |
| br"^\s*ztest_run_test_suite\((?P<suite_name>[a-zA-Z0-9_]+)\)", |
| re.MULTILINE) |
| achtung_regex = re.compile( |
| br"(#ifdef|#endif)", |
| re.MULTILINE) |
| warnings = None |
| |
| with open(inf_name) as inf: |
| if os.name == 'nt': |
| mmap_args = {'fileno': inf.fileno(), 'length': 0, 'access': mmap.ACCESS_READ} |
| else: |
| mmap_args = {'fileno': inf.fileno(), 'length': 0, 'flags': mmap.MAP_PRIVATE, 'prot': mmap.PROT_READ, |
| 'offset': 0} |
| |
| with contextlib.closing(mmap.mmap(**mmap_args)) as main_c: |
| # contextlib makes pylint think main_c isn't subscriptable |
| # pylint: disable=unsubscriptable-object |
| |
| suite_regex_match = suite_regex.search(main_c) |
| if not suite_regex_match: |
| # can't find ztest_test_suite, maybe a client, because |
| # it includes ztest.h |
| return None, None |
| |
| suite_run_match = suite_run_regex.search(main_c) |
| if not suite_run_match: |
| raise ValueError("can't find ztest_run_test_suite") |
| |
| achtung_matches = re.findall( |
| achtung_regex, |
| main_c[suite_regex_match.end():suite_run_match.start()]) |
| if achtung_matches: |
| warnings = "found invalid %s in ztest_test_suite()" \ |
| % ", ".join({match.decode() for match in achtung_matches}) |
| _matches = re.findall( |
| stc_regex, |
| main_c[suite_regex_match.end():suite_run_match.start()]) |
| matches = [match.decode().replace("test_", "") for match in _matches] |
| return matches, warnings |
| |
| def scan_path(self, path): |
| subcases = [] |
| for filename in glob.glob(os.path.join(path, "src", "*.c")): |
| try: |
| _subcases, warnings = self.scan_file(filename) |
| if warnings: |
| logger.error("%s: %s" % (filename, warnings)) |
| if _subcases: |
| subcases += _subcases |
| except ValueError as e: |
| logger.error("%s: can't find: %s" % (filename, e)) |
| for filename in glob.glob(os.path.join(path, "*.c")): |
| try: |
| _subcases, warnings = self.scan_file(filename) |
| if warnings: |
| logger.error("%s: %s" % (filename, warnings)) |
| if _subcases: |
| subcases += _subcases |
| except ValueError as e: |
| logger.error("%s: can't find: %s" % (filename, e)) |
| return subcases |
| |
| def parse_subcases(self, test_path): |
| results = self.scan_path(test_path) |
| for sub in results: |
| name = "{}.{}".format(self.id, sub) |
| self.cases.append(name) |
| |
| if not results: |
| self.cases.append(self.id) |
| |
| def __str__(self): |
| return self.name |
| |
| |
| class TestInstance: |
| """Class representing the execution of a particular TestCase on a platform |
| |
| @param test The TestCase object we want to build/execute |
| @param platform Platform object that we want to build and run against |
| @param base_outdir Base directory for all test results. The actual |
| out directory used is <outdir>/<platform>/<test case name> |
| """ |
| |
| def __init__(self, testcase, platform, outdir): |
| |
| self.testcase = testcase |
| self.platform = platform |
| |
| self.status = None |
| self.reason = "N/A" |
| self.metrics = dict() |
| self.handler = None |
| self.outdir = outdir |
| |
| self.name = os.path.join(platform.name, testcase.name) |
| self.build_dir = os.path.join(outdir, platform.name, testcase.name) |
| |
| self.build_only = True |
| self.run = False |
| |
| self.results = {} |
| |
| def __lt__(self, other): |
| return self.name < other.name |
| |
| def check_build_or_run(self, build_only=False, enable_slow=False, device_testing=False, fixture=[]): |
| |
| # right now we only support building on windows. running is still work |
| # in progress. |
| if os.name == 'nt': |
| self.build_only = True |
| self.run = False |
| return |
| |
| _build_only = True |
| |
| # we asked for build-only on the command line |
| if build_only or self.testcase.build_only: |
| self.build_only = True |
| self.run = False |
| return |
| |
| # Do not run slow tests: |
| skip_slow = self.testcase.slow and not enable_slow |
| if skip_slow: |
| self.build_only = True |
| self.run = False |
| return |
| |
| runnable = bool(self.testcase.type == "unit" or \ |
| self.platform.type == "native" or \ |
| self.platform.simulation in ["nsim", "renode", "qemu"] or \ |
| device_testing) |
| |
| if self.platform.simulation == "nsim": |
| if not find_executable("nsimdrv"): |
| runnable = False |
| |
| if self.platform.simulation == "renode": |
| if not find_executable("renode"): |
| runnable = False |
| |
| # console harness allows us to run the test and capture data. |
| if self.testcase.harness == 'console': |
| |
| # if we have a fixture that is also being supplied on the |
| # command-line, then we need to run the test, not just build it. |
| if "fixture" in self.testcase.harness_config: |
| fixture_cfg = self.testcase.harness_config['fixture'] |
| if fixture_cfg in fixture: |
| _build_only = False |
| else: |
| _build_only = True |
| else: |
| _build_only = False |
| elif self.testcase.harness: |
| _build_only = True |
| else: |
| _build_only = False |
| |
| self.build_only = not (not _build_only and runnable) |
| self.run = not self.build_only |
| return |
| |
| def create_overlay(self, platform, enable_asan=False, enable_coverage=False, coverage_platform=[]): |
| # Create this in a "sanitycheck/" subdirectory otherwise this |
| # will pass this overlay to kconfig.py *twice* and kconfig.cmake |
| # will silently give that second time precedence over any |
| # --extra-args=CONFIG_* |
| subdir = os.path.join(self.build_dir, "sanitycheck") |
| os.makedirs(subdir, exist_ok=True) |
| file = os.path.join(subdir, "testcase_extra.conf") |
| |
| with open(file, "w") as f: |
| content = "" |
| |
| if self.testcase.extra_configs: |
| content = "\n".join(self.testcase.extra_configs) |
| |
| if enable_coverage: |
| if platform.name in coverage_platform: |
| content = content + "\nCONFIG_COVERAGE=y" |
| |
| if enable_asan: |
| if platform.type == "native": |
| content = content + "\nCONFIG_ASAN=y" |
| |
| f.write(content) |
| |
| def calculate_sizes(self): |
| """Get the RAM/ROM sizes of a test case. |
| |
| This can only be run after the instance has been executed by |
| MakeGenerator, otherwise there won't be any binaries to measure. |
| |
| @return A SizeCalculator object |
| """ |
| fns = glob.glob(os.path.join(self.build_dir, "zephyr", "*.elf")) |
| fns.extend(glob.glob(os.path.join(self.build_dir, "zephyr", "*.exe"))) |
| fns = [x for x in fns if not x.endswith('_prebuilt.elf')] |
| if len(fns) != 1: |
| raise BuildError("Missing/multiple output ELF binary") |
| |
| return SizeCalculator(fns[0], self.testcase.extra_sections) |
| |
| def __repr__(self): |
| return "<TestCase %s on %s>" % (self.testcase.name, self.platform.name) |
| |
| |
| class CMake(): |
| config_re = re.compile('(CONFIG_[A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$') |
| dt_re = re.compile('([A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$') |
| |
| def __init__(self, testcase, platform, source_dir, build_dir): |
| |
| self.cwd = None |
| self.capture_output = True |
| |
| self.defconfig = {} |
| self.cmake_cache = {} |
| |
| self.instance = None |
| self.testcase = testcase |
| self.platform = platform |
| self.source_dir = source_dir |
| self.build_dir = build_dir |
| self.log = "build.log" |
| |
| def parse_generated(self): |
| self.defconfig = {} |
| return {} |
| |
| def run_build(self, args=[]): |
| |
| logger.debug("Building %s for %s" % (self.source_dir, self.platform.name)) |
| |
| cmake_args = [] |
| cmake_args.extend(args) |
| cmake = shutil.which('cmake') |
| cmd = [cmake] + cmake_args |
| kwargs = dict() |
| |
| if self.capture_output: |
| kwargs['stdout'] = subprocess.PIPE |
| # CMake sends the output of message() to stderr unless it's STATUS |
| kwargs['stderr'] = subprocess.STDOUT |
| |
| if self.cwd: |
| kwargs['cwd'] = self.cwd |
| |
| p = subprocess.Popen(cmd, **kwargs) |
| out, _ = p.communicate() |
| |
| results = {} |
| if p.returncode == 0: |
| msg = "Finished building %s for %s" % (self.source_dir, self.platform.name) |
| |
| self.instance.status = "passed" |
| results = {'msg': msg, "returncode": p.returncode, "instance": self.instance} |
| |
| if out: |
| log_msg = out.decode(sys.getdefaultencoding()) |
| with open(os.path.join(self.build_dir, self.log), "a") as log: |
| log.write(log_msg) |
| |
| else: |
| return None |
| else: |
| # A real error occurred, raise an exception |
| if out: |
| log_msg = out.decode(sys.getdefaultencoding()) |
| with open(os.path.join(self.build_dir, self.log), "a") as log: |
| log.write(log_msg) |
| |
| overflow_flash = "region `FLASH' overflowed by" |
| overflow_ram = "region `RAM' overflowed by" |
| |
| if log_msg: |
| if log_msg.find(overflow_flash) > 0 or log_msg.find(overflow_ram) > 0: |
| logger.debug("RAM/ROM Overflow") |
| self.instance.status = "skipped" |
| self.instance.reason = "overflow" |
| else: |
| self.instance.status = "failed" |
| self.instance.reason = "Build failure" |
| |
| results = { |
| "returncode": p.returncode, |
| "instance": self.instance, |
| } |
| |
| return results |
| |
| def run_cmake(self, args=[]): |
| |
| ldflags = "-Wl,--fatal-warnings" |
| logger.debug("Running cmake on %s for %s" % (self.source_dir, self.platform.name)) |
| |
| # fixme: add additional cflags based on options |
| cmake_args = [ |
| '-B{}'.format(self.build_dir), |
| '-S{}'.format(self.source_dir), |
| '-DEXTRA_CFLAGS="-Werror ', |
| '-DEXTRA_AFLAGS=-Wa,--fatal-warnings', |
| '-DEXTRA_LDFLAGS="{}'.format(ldflags), |
| '-G{}'.format(get_generator()[1]) |
| ] |
| |
| if options.cmake_only: |
| cmake_args.append("-DCMAKE_EXPORT_COMPILE_COMMANDS=1") |
| |
| args = ["-D{}".format(a.replace('"', '')) for a in args] |
| cmake_args.extend(args) |
| |
| cmake_opts = ['-DBOARD={}'.format(self.platform.name)] |
| cmake_args.extend(cmake_opts) |
| |
| cmake = shutil.which('cmake') |
| cmd = [cmake] + cmake_args |
| kwargs = dict() |
| |
| if self.capture_output: |
| kwargs['stdout'] = subprocess.PIPE |
| # CMake sends the output of message() to stderr unless it's STATUS |
| kwargs['stderr'] = subprocess.STDOUT |
| |
| if self.cwd: |
| kwargs['cwd'] = self.cwd |
| |
| p = subprocess.Popen(cmd, **kwargs) |
| out, _ = p.communicate() |
| |
| if p.returncode == 0: |
| filter_results = self.parse_generated() |
| msg = "Finished building %s for %s" % (self.source_dir, self.platform.name) |
| |
| results = {'msg': msg, 'filter': filter_results} |
| |
| else: |
| self.instance.status = "failed" |
| self.instance.reason = "Cmake build failure" |
| results = {"returncode": p.returncode} |
| |
| if out: |
| with open(os.path.join(self.build_dir, self.log), "a") as log: |
| log_msg = out.decode(sys.getdefaultencoding()) |
| log.write(log_msg) |
| |
| return results |
| |
| |
| class FilterBuilder(CMake): |
| |
| def __init__(self, testcase, platform, source_dir, build_dir): |
| super().__init__(testcase, platform, source_dir, build_dir) |
| |
| self.log = "config-sanitycheck.log" |
| |
| def parse_generated(self): |
| |
| if self.platform.name == "unit_testing": |
| return {} |
| |
| cmake_cache_path = os.path.join(self.build_dir, "CMakeCache.txt") |
| defconfig_path = os.path.join(self.build_dir, "zephyr", ".config") |
| |
| with open(defconfig_path, "r") as fp: |
| defconfig = {} |
| for line in fp.readlines(): |
| m = self.config_re.match(line) |
| if not m: |
| if line.strip() and not line.startswith("#"): |
| sys.stderr.write("Unrecognized line %s\n" % line) |
| continue |
| defconfig[m.group(1)] = m.group(2).strip() |
| |
| self.defconfig = defconfig |
| |
| cmake_conf = {} |
| try: |
| cache = CMakeCache.from_file(cmake_cache_path) |
| except FileNotFoundError: |
| cache = {} |
| |
| for k in iter(cache): |
| cmake_conf[k.name] = k.value |
| |
| self.cmake_cache = cmake_conf |
| |
| filter_data = { |
| "ARCH": self.platform.arch, |
| "PLATFORM": self.platform.name |
| } |
| filter_data.update(os.environ) |
| filter_data.update(self.defconfig) |
| filter_data.update(self.cmake_cache) |
| |
| dts_path = os.path.join(self.build_dir, "zephyr", self.platform.name + ".dts.pre.tmp") |
| if self.testcase and self.testcase.tc_filter: |
| try: |
| if os.path.exists(dts_path): |
| edt = edtlib.EDT(dts_path, [os.path.join(ZEPHYR_BASE, "dts", "bindings")]) |
| else: |
| edt = None |
| res = expr_parser.parse(self.testcase.tc_filter, filter_data, edt) |
| |
| except (ValueError, SyntaxError) as se: |
| sys.stderr.write( |
| "Failed processing %s\n" % self.testcase.yamlfile) |
| raise se |
| |
| if not res: |
| return {os.path.join(self.platform.name, self.testcase.name): True} |
| else: |
| return {os.path.join(self.platform.name, self.testcase.name): False} |
| else: |
| self.platform.filter_data = filter_data |
| return filter_data |
| |
| |
| class ProjectBuilder(FilterBuilder): |
| |
| def __init__(self, suite, instance, **kwargs): |
| super().__init__(instance.testcase, instance.platform, instance.testcase.source_dir, instance.build_dir) |
| |
| self.log = "build.log" |
| self.instance = instance |
| self.suite = suite |
| |
| self.lsan = kwargs.get('lsan', False) |
| self.asan = kwargs.get('asan', False) |
| self.valgrind = kwargs.get('valgrind', False) |
| self.extra_args = kwargs.get('extra_args', []) |
| self.device_testing = kwargs.get('device_testing', False) |
| self.cmake_only = kwargs.get('cmake_only', False) |
| self.coverage = kwargs.get('coverage', False) |
| self.inline_logs = kwargs.get('inline_logs', False) |
| |
| @staticmethod |
| def log_info(filename, inline_logs): |
| filename = os.path.relpath(os.path.realpath(filename)) |
| if inline_logs: |
| logger.info("{:-^100}".format(filename)) |
| |
| try: |
| with open(filename) as fp: |
| data = fp.read() |
| except Exception as e: |
| data = "Unable to read log data (%s)\n" % (str(e)) |
| |
| logger.error(data) |
| |
| logger.info("{:-^100}".format(filename)) |
| else: |
| logger.error("see: " + Fore.YELLOW + filename + Fore.RESET) |
| |
| def log_info_file(self, inline_logs): |
| build_dir = self.instance.build_dir |
| h_log = "{}/handler.log".format(build_dir) |
| b_log = "{}/build.log".format(build_dir) |
| v_log = "{}/valgrind.log".format(build_dir) |
| d_log = "{}/device.log".format(build_dir) |
| |
| if os.path.exists(v_log) and "Valgrind" in self.instance.reason: |
| self.log_info("{}".format(v_log), inline_logs) |
| elif os.path.exists(d_log) and os.path.getsize(d_log) > 0: |
| self.log_info("{}".format(d_log), inline_logs) |
| elif os.path.exists(h_log) and os.path.getsize(h_log) > 0: |
| self.log_info("{}".format(h_log), inline_logs) |
| else: |
| self.log_info("{}".format(b_log), inline_logs) |
| |
| def setup_handler(self): |
| |
| instance = self.instance |
| args = [] |
| |
| # FIXME: Needs simplification |
| if instance.platform.simulation == "qemu": |
| instance.handler = QEMUHandler(instance, "qemu") |
| args.append("QEMU_PIPE=%s" % instance.handler.get_fifo()) |
| instance.handler.call_make_run = True |
| elif instance.testcase.type == "unit": |
| instance.handler = BinaryHandler(instance, "unit") |
| instance.handler.binary = os.path.join(instance.build_dir, "testbinary") |
| elif instance.platform.type == "native": |
| handler = BinaryHandler(instance, "native") |
| |
| handler.asan = self.asan |
| handler.valgrind = self.valgrind |
| handler.lsan = self.lsan |
| handler.coverage = self.coverage |
| |
| handler.binary = os.path.join(instance.build_dir, "zephyr", "zephyr.exe") |
| instance.handler = handler |
| elif instance.platform.simulation == "nsim": |
| if find_executable("nsimdrv"): |
| instance.handler = BinaryHandler(instance, "nsim") |
| instance.handler.call_make_run = True |
| elif instance.platform.simulation == "renode": |
| if find_executable("renode"): |
| instance.handler = BinaryHandler(instance, "renode") |
| instance.handler.pid_fn = os.path.join(instance.build_dir, "renode.pid") |
| instance.handler.call_make_run = True |
| elif self.device_testing: |
| instance.handler = DeviceHandler(instance, "device") |
| |
| if instance.handler: |
| instance.handler.args = args |
| |
| def process(self, message): |
| op = message.get('op') |
| |
| if not self.instance.handler: |
| self.setup_handler() |
| |
| # The build process, call cmake and build with configured generator |
| if op == "cmake": |
| results = self.cmake() |
| if self.instance.status == "failed": |
| pipeline.put({"op": "report", "test": self.instance}) |
| elif self.cmake_only: |
| pipeline.put({"op": "report", "test": self.instance}) |
| else: |
| if self.instance.name in results['filter'] and results['filter'][self.instance.name]: |
| logger.debug("filtering %s" % self.instance.name) |
| self.instance.status = "skipped" |
| self.instance.reason = "filter" |
| pipeline.put({"op": "report", "test": self.instance}) |
| else: |
| pipeline.put({"op": "build", "test": self.instance}) |
| |
| elif op == "build": |
| logger.debug("build test: %s" % self.instance.name) |
| results = self.build() |
| |
| if results.get('returncode', 1) > 0: |
| pipeline.put({"op": "report", "test": self.instance}) |
| else: |
| if self.instance.run: |
| pipeline.put({"op": "run", "test": self.instance}) |
| else: |
| pipeline.put({"op": "report", "test": self.instance}) |
| # Run the generated binary using one of the supported handlers |
| elif op == "run": |
| logger.debug("run test: %s" % self.instance.name) |
| self.run() |
| self.instance.status, _ = self.instance.handler.get_state() |
| pipeline.put({ |
| "op": "report", |
| "test": self.instance, |
| "state": "executed", |
| "status": self.instance.status, |
| "reason": self.instance.reason} |
| ) |
| |
| # Report results and output progress to screen |
| elif op == "report": |
| with report_lock: |
| self.report_out() |
| |
| def report_out(self): |
| total_tests_width = len(str(self.suite.total_tests)) |
| self.suite.total_done += 1 |
| instance = self.instance |
| |
| if instance.status in ["failed", "timeout"]: |
| self.suite.total_failed += 1 |
| if VERBOSE: |
| status = Fore.RED + "FAILED " + Fore.RESET + instance.reason |
| else: |
| print("") |
| logger.error( |
| "{:<25} {:<50} {}FAILED{}: {}".format( |
| instance.platform.name, |
| instance.testcase.name, |
| Fore.RED, |
| Fore.RESET, |
| instance.reason)) |
| if not VERBOSE: |
| self.log_info_file(self.inline_logs) |
| elif instance.status == "skipped": |
| self.suite.total_skipped += 1 |
| status = Fore.YELLOW + "SKIPPED" + Fore.RESET |
| else: |
| status = Fore.GREEN + "PASSED" + Fore.RESET |
| |
| if VERBOSE: |
| if self.cmake_only: |
| more_info = "cmake" |
| elif instance.status == "skipped": |
| more_info = instance.reason |
| else: |
| if instance.handler and instance.run: |
| more_info = instance.handler.type_str |
| htime = instance.handler.duration |
| if htime: |
| more_info += " {:.3f}s".format(htime) |
| else: |
| more_info = "build" |
| |
| logger.info("{:>{}}/{} {:<25} {:<50} {} ({})".format( |
| self.suite.total_done, total_tests_width, self.suite.total_tests, instance.platform.name, |
| instance.testcase.name, status, more_info)) |
| |
| if instance.status in ["failed", "timeout"]: |
| self.log_info_file(self.inline_logs) |
| else: |
| sys.stdout.write("\rINFO - Total complete: %s%4d/%4d%s %2d%% skipped: %s%4d%s, failed: %s%4d%s" % ( |
| Fore.GREEN, |
| self.suite.total_done, |
| self.suite.total_tests, |
| Fore.RESET, |
| int((float(self.suite.total_done) / self.suite.total_tests) * 100), |
| Fore.YELLOW if self.suite.total_skipped > 0 else Fore.RESET, |
| self.suite.total_skipped, |
| Fore.RESET, |
| Fore.RED if self.suite.total_failed > 0 else Fore.RESET, |
| self.suite.total_failed, |
| Fore.RESET |
| ) |
| ) |
| sys.stdout.flush() |
| |
| def cmake(self): |
| |
| instance = self.instance |
| args = self.testcase.extra_args[:] |
| args += self.extra_args |
| |
| if instance.handler: |
| args += instance.handler.args |
| |
| # merge overlay files into one variable |
| overlays = "" |
| idx = 0 |
| for arg in args: |
| match = re.search('OVERLAY_CONFIG="(.*)"', arg) |
| if match: |
| overlays += match.group(1) |
| del args[idx] |
| idx += 1 |
| |
| if (self.testcase.extra_configs or self.coverage or |
| self.asan): |
| args.append("OVERLAY_CONFIG=\"%s %s\"" % (overlays, |
| os.path.join(instance.build_dir, |
| "sanitycheck", "testcase_extra.conf"))) |
| |
| results = self.run_cmake(args) |
| return results |
| |
| def build(self): |
| results = self.run_build(['--build', self.build_dir]) |
| return results |
| |
| def run(self): |
| |
| instance = self.instance |
| |
| if instance.handler.type_str == "device": |
| instance.handler.suite = self.suite |
| |
| instance.handler.handle() |
| |
| sys.stdout.flush() |
| |
| |
| pipeline = queue.LifoQueue() |
| |
| |
| class BoundedExecutor(concurrent.futures.ThreadPoolExecutor): |
| """BoundedExecutor behaves as a ThreadPoolExecutor which will block on |
| calls to submit() once the limit given as "bound" work items are queued for |
| execution. |
| :param bound: Integer - the maximum number of items in the work queue |
| :param max_workers: Integer - the size of the thread pool |
| """ |
| |
| def __init__(self, bound, max_workers, **kwargs): |
| super().__init__(max_workers) |
| # self.executor = ThreadPoolExecutor(max_workers=max_workers) |
| self.semaphore = BoundedSemaphore(bound + max_workers) |
| |
| def submit(self, fn, *args, **kwargs): |
| self.semaphore.acquire() |
| try: |
| future = super().submit(fn, *args, **kwargs) |
| except Exception: |
| self.semaphore.release() |
| raise |
| else: |
| future.add_done_callback(lambda x: self.semaphore.release()) |
| return future |
| |
| |
| class TestSuite: |
| config_re = re.compile('(CONFIG_[A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$') |
| dt_re = re.compile('([A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$') |
| |
| tc_schema = scl.yaml_load( |
| os.path.join(ZEPHYR_BASE, |
| "scripts", "sanity_chk", "testcase-schema.yaml")) |
| |
| def __init__(self, board_root_list, testcase_roots, outdir): |
| |
| self.roots = testcase_roots |
| if not isinstance(board_root_list, list): |
| self.board_roots = [board_root_list] |
| else: |
| self.board_roots = board_root_list |
| |
| # Testsuite Options |
| self.coverage_platform = [] |
| self.build_only = False |
| self.cmake_only = False |
| self.enable_slow = False |
| self.device_testing = False |
| self.fixture = [] |
| self.enable_coverage = False |
| self.enable_lsan = False |
| self.enable_asan = False |
| self.enable_valgrind = False |
| self.extra_args = [] |
| self.inline_logs = False |
| self.enable_sizes_report = False |
| |
| # Keep track of which test cases we've filtered out and why |
| self.testcases = {} |
| self.platforms = [] |
| self.selected_platforms = [] |
| self.default_platforms = [] |
| self.outdir = os.path.abspath(outdir) |
| self.discards = None |
| self.load_errors = 0 |
| self.instances = dict() |
| |
| self.total_tests = 0 # number of test instances |
| self.total_cases = 0 # number of test cases |
| self.total_done = 0 # tests completed |
| self.total_failed = 0 |
| self.total_skipped = 0 |
| |
| self.total_platforms = 0 |
| self.start_time = 0 |
| self.duration = 0 |
| self.warnings = 0 |
| self.cv = threading.Condition() |
| |
| # hardcoded for now |
| self.connected_hardware = [] |
| |
| def config(self): |
| logger.info("coverage platform: {}".format(self.coverage_platform)) |
| |
| # Debug Functions |
| @staticmethod |
| def info(what): |
| sys.stdout.write(what + "\n") |
| sys.stdout.flush() |
| |
| def update(self): |
| self.total_tests = len(self.instances) |
| self.total_cases = len(self.testcases) |
| |
| def compare_metrics(self, filename): |
| # name, datatype, lower results better |
| interesting_metrics = [("ram_size", int, True), |
| ("rom_size", int, True)] |
| |
| if not os.path.exists(filename): |
| logger.info("Cannot compare metrics, %s not found" % filename) |
| return [] |
| |
| results = [] |
| saved_metrics = {} |
| with open(filename) as fp: |
| cr = csv.DictReader(fp) |
| for row in cr: |
| d = {} |
| for m, _, _ in interesting_metrics: |
| d[m] = row[m] |
| saved_metrics[(row["test"], row["platform"])] = d |
| |
| for instance in self.instances.values(): |
| mkey = (instance.testcase.name, instance.platform.name) |
| if mkey not in saved_metrics: |
| continue |
| sm = saved_metrics[mkey] |
| for metric, mtype, lower_better in interesting_metrics: |
| if metric not in instance.metrics: |
| continue |
| if sm[metric] == "": |
| continue |
| delta = instance.metrics.get(metric, 0) - mtype(sm[metric]) |
| if delta == 0: |
| continue |
| results.append((instance, metric, instance.metrics.get(metric, 0), delta, |
| lower_better)) |
| return results |
| |
| def misc_reports(self, report, show_footprint, all_deltas, |
| footprint_threshold, last_metrics): |
| |
| if not report: |
| return |
| |
| deltas = self.compare_metrics(report) |
| warnings = 0 |
| if deltas and show_footprint: |
| for i, metric, value, delta, lower_better in deltas: |
| if not all_deltas and ((delta < 0 and lower_better) or |
| (delta > 0 and not lower_better)): |
| continue |
| |
| percentage = (float(delta) / float(value - delta)) |
| if not all_deltas and (percentage < |
| (footprint_threshold / 100.0)): |
| continue |
| |
| logger.info("{:<25} {:<60} {}{}{}: {} {:<+4}, is now {:6} {:+.2%}".format( |
| i.platform.name, i.testcase.name, Fore.YELLOW, |
| "INFO" if all_deltas else "WARNING", Fore.RESET, |
| metric, delta, value, percentage)) |
| warnings += 1 |
| |
| if warnings: |
| logger.warning("Deltas based on metrics from last %s" % |
| ("release" if not last_metrics else "run")) |
| |
| def summary(self, unrecognized_sections): |
| failed = 0 |
| for instance in self.instances.values(): |
| if instance.status == "failed": |
| failed += 1 |
| elif instance.metrics.get("unrecognized") and not unrecognized_sections: |
| logger.error("%sFAILED%s: %s has unrecognized binary sections: %s" % |
| (Fore.RED, Fore.RESET, instance.name, |
| str(instance.metrics.get("unrecognized", [])))) |
| failed += 1 |
| |
| if self.total_tests and self.total_tests != self.total_skipped: |
| pass_rate = (float(self.total_tests - self.total_failed - self.total_skipped) / float( |
| self.total_tests - self.total_skipped)) |
| else: |
| pass_rate = 0 |
| |
| logger.info( |
| "{}{} of {}{} tests passed ({:.2%}), {}{}{} failed, {} skipped with {}{}{} warnings in {:.2f} seconds".format( |
| Fore.RED if failed else Fore.GREEN, |
| self.total_tests - self.total_failed - self.total_skipped, |
| self.total_tests, |
| Fore.RESET, |
| pass_rate, |
| Fore.RED if self.total_failed else Fore.RESET, |
| self.total_failed, |
| Fore.RESET, |
| self.total_skipped, |
| Fore.YELLOW if self.warnings else Fore.RESET, |
| self.warnings, |
| Fore.RESET, |
| self.duration)) |
| |
| self.total_platforms = len(self.platforms) |
| if self.platforms: |
| logger.info("In total {} test cases were executed on {} out of total {} platforms ({:02.2f}%)".format( |
| self.total_cases, |
| len(self.selected_platforms), |
| self.total_platforms, |
| (100 * len(self.selected_platforms) / len(self.platforms)) |
| )) |
| |
| def save_reports(self, name, report_dir, no_update, release, only_failed): |
| if not self.instances: |
| return |
| |
| if name: |
| report_name = name |
| else: |
| report_name = "sanitycheck" |
| |
| if report_dir: |
| os.makedirs(report_dir, exist_ok=True) |
| filename = os.path.join(report_dir, report_name) |
| outdir = report_dir |
| else: |
| filename = os.path.join(self.outdir, report_name) |
| outdir = self.outdir |
| |
| if not no_update: |
| self.xunit_report(filename + ".xml", only_failed) |
| self.csv_report(filename + ".csv") |
| self.target_report(outdir) |
| if self.discards: |
| self.discard_report(filename + "_discard.csv") |
| |
| if release: |
| self.csv_report(RELEASE_DATA) |
| |
| def add_configurations(self): |
| |
| for board_root in self.board_roots: |
| board_root = os.path.abspath(board_root) |
| |
| logger.debug("Reading platform configuration files under %s..." % |
| board_root) |
| |
| for file in glob.glob(os.path.join(board_root, "*", "*", "*.yaml")): |
| logger.debug("Found plaform configuration " + file) |
| try: |
| platform = Platform() |
| platform.load(file) |
| if platform.sanitycheck: |
| self.platforms.append(platform) |
| if platform.default: |
| self.default_platforms.append(platform.name) |
| |
| except RuntimeError as e: |
| logger.error("E: %s: can't load: %s" % (file, e)) |
| self.load_errors += 1 |
| |
| def get_all_tests(self): |
| tests = [] |
| for _, tc in self.testcases.items(): |
| for case in tc.cases: |
| tests.append(case) |
| |
| return tests |
| |
| @staticmethod |
| def get_toolchain(): |
| toolchain = os.environ.get("ZEPHYR_TOOLCHAIN_VARIANT", None) or \ |
| os.environ.get("ZEPHYR_GCC_VARIANT", None) |
| |
| if toolchain == "gccarmemb": |
| # Remove this translation when gccarmemb is no longer supported. |
| toolchain = "gnuarmemb" |
| |
| try: |
| if not toolchain: |
| raise SanityRuntimeError("E: Variable ZEPHYR_TOOLCHAIN_VARIANT is not defined") |
| except Exception as e: |
| print(str(e)) |
| sys.exit(2) |
| |
| return toolchain |
| |
| def add_testcases(self): |
| for root in self.roots: |
| root = os.path.abspath(root) |
| |
| logger.debug("Reading test case configuration files under %s..." % root) |
| |
| for dirpath, dirnames, filenames in os.walk(root, topdown=True): |
| logger.debug("scanning %s" % dirpath) |
| if 'sample.yaml' in filenames: |
| filename = 'sample.yaml' |
| elif 'testcase.yaml' in filenames: |
| filename = 'testcase.yaml' |
| else: |
| continue |
| |
| logger.debug("Found possible test case in " + dirpath) |
| |
| dirnames[:] = [] |
| tc_path = os.path.join(dirpath, filename) |
| self.add_testcase(tc_path, root) |
| |
| def add_testcase(self, tc_data_file, root): |
| try: |
| parsed_data = SanityConfigParser(tc_data_file, self.tc_schema) |
| parsed_data.load() |
| |
| tc_path = os.path.dirname(tc_data_file) |
| workdir = os.path.relpath(tc_path, root) |
| |
| for name in parsed_data.tests.keys(): |
| tc = TestCase() |
| tc.name = tc.get_unique(root, workdir, name) |
| |
| tc_dict = parsed_data.get_test(name, testcase_valid_keys) |
| |
| tc.source_dir = tc_path |
| tc.yamlfile = tc_data_file |
| |
| tc.id = name |
| tc.type = tc_dict["type"] |
| tc.tags = tc_dict["tags"] |
| tc.extra_args = tc_dict["extra_args"] |
| tc.extra_configs = tc_dict["extra_configs"] |
| tc.arch_whitelist = tc_dict["arch_whitelist"] |
| tc.arch_exclude = tc_dict["arch_exclude"] |
| tc.skip = tc_dict["skip"] |
| tc.platform_exclude = tc_dict["platform_exclude"] |
| tc.platform_whitelist = tc_dict["platform_whitelist"] |
| tc.toolchain_exclude = tc_dict["toolchain_exclude"] |
| tc.toolchain_whitelist = tc_dict["toolchain_whitelist"] |
| tc.tc_filter = tc_dict["filter"] |
| tc.timeout = tc_dict["timeout"] |
| tc.harness = tc_dict["harness"] |
| tc.harness_config = tc_dict["harness_config"] |
| tc.build_only = tc_dict["build_only"] |
| tc.build_on_all = tc_dict["build_on_all"] |
| tc.slow = tc_dict["slow"] |
| tc.min_ram = tc_dict["min_ram"] |
| tc.depends_on = tc_dict["depends_on"] |
| tc.min_flash = tc_dict["min_flash"] |
| tc.extra_sections = tc_dict["extra_sections"] |
| |
| tc.parse_subcases(tc_path) |
| |
| if tc.name: |
| self.testcases[tc.name] = tc |
| |
| except Exception as e: |
| logger.error("%s: can't load (skipping): %s" % (tc_data_file, e)) |
| self.load_errors += 1 |
| return False |
| |
| return True |
| |
| def get_platform(self, name): |
| selected_platform = None |
| for platform in self.platforms: |
| if platform.name == name: |
| selected_platform = platform |
| break |
| return selected_platform |
| |
| def get_last_failed(self): |
| last_run = os.path.join(self.outdir, "sanitycheck.csv") |
| try: |
| if not os.path.exists(last_run): |
| raise SanityRuntimeError("Couldn't find last sanitycheck run.: %s" % last_run) |
| except Exception as e: |
| print(str(e)) |
| sys.exit(2) |
| |
| total_tests = 0 |
| with open(last_run, "r") as fp: |
| cr = csv.DictReader(fp) |
| instance_list = [] |
| for row in cr: |
| total_tests += 1 |
| if row["passed"] == "True": |
| continue |
| test = row["test"] |
| platform = self.get_platform(row["platform"]) |
| instance = TestInstance(self.testcases[test], platform, self.outdir) |
| instance.check_build_or_run( |
| self.build_only, |
| self.enable_slow, |
| self.device_testing, |
| self.fixture |
| ) |
| instance.create_overlay(platform, self.enable_asan, self.enable_coverage, self.coverage_platform) |
| instance_list.append(instance) |
| self.add_instances(instance_list) |
| |
| tests_to_run = len(self.instances) |
| logger.info("%d tests passed already, retrying %d tests" % (total_tests - tests_to_run, tests_to_run)) |
| |
| def load_from_file(self, file): |
| try: |
| if not os.path.exists(file): |
| raise SanityRuntimeError( |
| "Couldn't find input file with list of tests.") |
| except Exception as e: |
| print(str(e)) |
| sys.exit(2) |
| |
| with open(file, "r") as fp: |
| cr = csv.DictReader(fp) |
| instance_list = [] |
| for row in cr: |
| if row["arch"] == "arch": |
| continue |
| test = row["test"] |
| platform = self.get_platform(row["platform"]) |
| instance = TestInstance(self.testcases[test], platform, self.outdir) |
| instance.check_build_or_run( |
| self.build_only, |
| self.enable_slow, |
| self.device_testing, |
| self.fixture |
| ) |
| instance.create_overlay(platform, self.enable_asan, self.enable_coverage, self.coverage_platform) |
| instance_list.append(instance) |
| self.add_instances(instance_list) |
| |
| def apply_filters(self, **kwargs): |
| |
| toolchain = self.get_toolchain() |
| |
| discards = {} |
| platform_filter = kwargs.get('platform') |
| testcase_filter = kwargs.get('run_individual_tests') |
| arch_filter = kwargs.get('arch') |
| tag_filter = kwargs.get('tag') |
| exclude_tag = kwargs.get('exclude_tag') |
| all_filter = kwargs.get('all') |
| device_testing_filter = kwargs.get('device_testing') |
| force_toolchain = kwargs.get('force_toolchain') |
| |
| logger.debug("platform filter: " + str(platform_filter)) |
| logger.debug(" arch_filter: " + str(arch_filter)) |
| logger.debug(" tag_filter: " + str(tag_filter)) |
| logger.debug(" exclude_tag: " + str(exclude_tag)) |
| |
| default_platforms = False |
| |
| if platform_filter: |
| platforms = list(filter(lambda p: p.name in platform_filter, self.platforms)) |
| else: |
| platforms = self.platforms |
| |
| if all_filter: |
| logger.info("Selecting all possible platforms per test case") |
| # When --all used, any --platform arguments ignored |
| platform_filter = [] |
| elif not platform_filter: |
| logger.info("Selecting default platforms per test case") |
| default_platforms = True |
| |
| logger.info("Building initial testcase list...") |
| |
| for tc_name, tc in self.testcases.items(): |
| # list of instances per testcase, aka configurations. |
| instance_list = [] |
| for plat in platforms: |
| instance = TestInstance(tc, plat, self.outdir) |
| instance.check_build_or_run( |
| self.build_only, |
| self.enable_slow, |
| self.device_testing, |
| self.fixture |
| ) |
| |
| if (plat.arch == "unit") != (tc.type == "unit"): |
| # Discard silently |
| continue |
| |
| if device_testing_filter and instance.build_only: |
| discards[instance] = "Not runnable on device" |
| continue |
| |
| if tc.skip: |
| discards[instance] = "Skip filter" |
| continue |
| |
| if tc.build_on_all and not platform_filter: |
| platform_filter = [] |
| |
| if tag_filter and not tc.tags.intersection(tag_filter): |
| discards[instance] = "Command line testcase tag filter" |
| continue |
| |
| if exclude_tag and tc.tags.intersection(exclude_tag): |
| discards[instance] = "Command line testcase exclude filter" |
| continue |
| |
| if testcase_filter and tc_name not in testcase_filter: |
| discards[instance] = "Testcase name filter" |
| continue |
| |
| if arch_filter and plat.arch not in arch_filter: |
| discards[instance] = "Command line testcase arch filter" |
| continue |
| |
| if tc.arch_whitelist and plat.arch not in tc.arch_whitelist: |
| discards[instance] = "Not in test case arch whitelist" |
| continue |
| |
| if tc.arch_exclude and plat.arch in tc.arch_exclude: |
| discards[instance] = "In test case arch exclude" |
| continue |
| |
| if tc.platform_exclude and plat.name in tc.platform_exclude: |
| discards[instance] = "In test case platform exclude" |
| continue |
| |
| if tc.toolchain_exclude and toolchain in tc.toolchain_exclude: |
| discards[instance] = "In test case toolchain exclude" |
| continue |
| |
| if platform_filter and plat.name not in platform_filter: |
| discards[instance] = "Command line platform filter" |
| continue |
| |
| if tc.platform_whitelist and plat.name not in tc.platform_whitelist: |
| discards[instance] = "Not in testcase platform whitelist" |
| continue |
| |
| if tc.toolchain_whitelist and toolchain not in tc.toolchain_whitelist: |
| discards[instance] = "Not in testcase toolchain whitelist" |
| continue |
| |
| if not plat.env_satisfied: |
| discards[instance] = "Environment ({}) not satisfied".format(", ".join(plat.env)) |
| continue |
| |
| if not force_toolchain \ |
| and toolchain and (toolchain not in plat.supported_toolchains) \ |
| and tc.type != 'unit': |
| discards[instance] = "Not supported by the toolchain" |
| continue |
| |
| if plat.ram < tc.min_ram: |
| discards[instance] = "Not enough RAM" |
| continue |
| |
| if tc.depends_on: |
| dep_intersection = tc.depends_on.intersection(set(plat.supported)) |
| if dep_intersection != set(tc.depends_on): |
| discards[instance] = "No hardware support" |
| continue |
| |
| if plat.flash < tc.min_flash: |
| discards[instance] = "Not enough FLASH" |
| continue |
| |
| if set(plat.ignore_tags) & tc.tags: |
| discards[instance] = "Excluded tags per platform" |
| continue |
| |
| # if nothing stopped us until now, it means this configuration |
| # needs to be added. |
| instance_list.append(instance) |
| |
| # no configurations, so jump to next testcase |
| if not instance_list: |
| continue |
| |
| # if sanitycheck was launched with no platform options at all, we |
| # take all default platforms |
| if default_platforms and not tc.build_on_all: |
| if tc.platform_whitelist: |
| a = set(self.default_platforms) |
| b = set(tc.platform_whitelist) |
| c = a.intersection(b) |
| if c: |
| aa = list(filter(lambda tc: tc.platform.name in c, instance_list)) |
| self.add_instances(aa) |
| else: |
| self.add_instances(instance_list[:1]) |
| else: |
| instances = list(filter(lambda tc: tc.platform.default, instance_list)) |
| self.add_instances(instances) |
| |
| for instance in list(filter(lambda tc: not tc.platform.default, instance_list)): |
| discards[instance] = "Not a default test platform" |
| |
| else: |
| self.add_instances(instance_list) |
| |
| for _, case in self.instances.items(): |
| case.create_overlay(case.platform, self.enable_asan, self.enable_coverage, self.coverage_platform) |
| |
| self.discards = discards |
| self.selected_platforms = set(p.platform.name for p in self.instances.values()) |
| |
| return discards |
| |
| def add_instances(self, instance_list): |
| for instance in instance_list: |
| self.instances[instance.name] = instance |
| |
| def add_tasks_to_queue(self, test_only=False): |
| for instance in self.instances.values(): |
| if test_only: |
| if instance.run: |
| pipeline.put({"op": "run", "test": instance, "status": "built"}) |
| else: |
| if instance.status not in ['passed', 'skipped']: |
| instance.status = None |
| pipeline.put({"op": "cmake", "test": instance}) |
| |
| return "DONE FEEDING" |
| |
| def execute(self): |
| def calc_one_elf_size(instance): |
| if instance.status not in ["failed", "skipped"]: |
| if instance.platform.type != "native": |
| size_calc = instance.calculate_sizes() |
| instance.metrics["ram_size"] = size_calc.get_ram_size() |
| instance.metrics["rom_size"] = size_calc.get_rom_size() |
| instance.metrics["unrecognized"] = size_calc.unrecognized_sections() |
| else: |
| instance.metrics["ram_size"] = 0 |
| instance.metrics["rom_size"] = 0 |
| instance.metrics["unrecognized"] = [] |
| |
| instance.metrics["handler_time"] = instance.handler.duration if instance.handler else 0 |
| |
| logger.info("Adding tasks to the queue...") |
| # We can use a with statement to ensure threads are cleaned up promptly |
| with BoundedExecutor(bound=self.jobs, max_workers=self.jobs) as executor: |
| |
| # start a future for a thread which sends work in through the queue |
| future_to_test = { |
| executor.submit(self.add_tasks_to_queue, self.test_only): 'FEEDER DONE'} |
| |
| while future_to_test: |
| # check for status of the futures which are currently working |
| done, _ = concurrent.futures.wait( |
| future_to_test, timeout=0.25, |
| return_when=concurrent.futures.FIRST_COMPLETED) |
| |
| # if there is incoming work, start a new future |
| while not pipeline.empty(): |
| # fetch a url from the queue |
| message = pipeline.get() |
| test = message['test'] |
| |
| # Start the load operation and mark the future with its URL |
| pb = ProjectBuilder(self, |
| test, |
| lsan=self.enable_lsan, |
| asan=self.enable_asan, |
| coverage=self.enable_coverage, |
| extra_args=self.extra_args, |
| device_testing=self.device_testing, |
| cmake_only=self.cmake_only, |
| valgrind=self.enable_valgrind, |
| inline_logs=self.inline_logs |
| ) |
| future_to_test[executor.submit(pb.process, message)] = test.name |
| |
| # process any completed futures |
| for future in done: |
| test = future_to_test[future] |
| try: |
| data = future.result() |
| except Exception as exc: |
| sys.exit('%r generated an exception: %s' % (test, exc)) |
| |
| else: |
| if data: |
| logger.debug(data) |
| |
| # remove the now completed future |
| del future_to_test[future] |
| |
| if self.enable_size_report and not self.cmake_only: |
| # Parallelize size calculation |
| executor = concurrent.futures.ThreadPoolExecutor(self.jobs) |
| futures = [executor.submit(calc_one_elf_size, instance) |
| for instance in self.instances.values()] |
| concurrent.futures.wait(futures) |
| else: |
| for instance in self.instances.values(): |
| instance.metrics["ram_size"] = 0 |
| instance.metrics["rom_size"] = 0 |
| instance.metrics["handler_time"] = instance.handler.duration if instance.handler else 0 |
| instance.metrics["unrecognized"] = [] |
| |
| def discard_report(self, filename): |
| |
| try: |
| if self.discards is None: |
| raise SanityRuntimeError("apply_filters() hasn't been run!") |
| except Exception as e: |
| logger.error(str(e)) |
| sys.exit(2) |
| |
| with open(filename, "wt") as csvfile: |
| fieldnames = ["test", "arch", "platform", "reason"] |
| cw = csv.DictWriter(csvfile, fieldnames, lineterminator=os.linesep) |
| cw.writeheader() |
| for instance, reason in sorted(self.discards.items()): |
| rowdict = {"test": instance.testcase.name, |
| "arch": instance.platform.arch, |
| "platform": instance.platform.name, |
| "reason": reason} |
| cw.writerow(rowdict) |
| |
| def target_report(self, outdir): |
| run = "Sanitycheck" |
| eleTestsuite = None |
| |
| platforms = {inst.platform.name for _, inst in self.instances.items()} |
| for platform in platforms: |
| errors = 0 |
| passes = 0 |
| fails = 0 |
| duration = 0 |
| skips = 0 |
| for _, instance in self.instances.items(): |
| if instance.platform.name != platform: |
| continue |
| |
| handler_time = instance.metrics.get('handler_time', 0) |
| duration += handler_time |
| for k in instance.results.keys(): |
| if instance.results[k] == 'PASS': |
| passes += 1 |
| elif instance.results[k] == 'BLOCK': |
| errors += 1 |
| elif instance.results[k] == 'SKIP': |
| skips += 1 |
| else: |
| fails += 1 |
| |
| eleTestsuites = ET.Element('testsuites') |
| eleTestsuite = ET.SubElement(eleTestsuites, 'testsuite', |
| name=run, time="%f" % duration, |
| tests="%d" % (errors + passes + fails), |
| failures="%d" % fails, |
| errors="%d" % errors, skipped="%d" % skips) |
| |
| handler_time = 0 |
| |
| # print out test results |
| for _, instance in self.instances.items(): |
| if instance.platform.name != platform: |
| continue |
| handler_time = instance.metrics.get('handler_time', 0) |
| for k in instance.results.keys(): |
| eleTestcase = ET.SubElement( |
| eleTestsuite, 'testcase', |
| classname="%s:%s" % (instance.platform.name, os.path.basename(instance.testcase.name)), |
| name="%s" % (k), time="%f" % handler_time) |
| if instance.results[k] in ['FAIL', 'BLOCK']: |
| el = None |
| |
| if instance.results[k] == 'FAIL': |
| el = ET.SubElement( |
| eleTestcase, |
| 'failure', |
| type="failure", |
| message="failed") |
| elif instance.results[k] == 'BLOCK': |
| el = ET.SubElement( |
| eleTestcase, |
| 'error', |
| type="failure", |
| message="failed") |
| p = os.path.join(self.outdir, instance.platform.name, instance.testcase.name) |
| log_file = os.path.join(p, "handler.log") |
| |
| if os.path.exists(log_file): |
| with open(log_file, "rb") as f: |
| log = f.read().decode("utf-8") |
| filtered_string = ''.join(filter(lambda x: x in string.printable, log)) |
| el.text = filtered_string |
| |
| elif instance.results[k] == 'SKIP': |
| el = ET.SubElement( |
| eleTestcase, |
| 'skipped', |
| type="skipped", |
| message="Skipped") |
| |
| result = ET.tostring(eleTestsuites) |
| with open(os.path.join(outdir, platform + ".xml"), 'wb') as f: |
| f.write(result) |
| |
| def xunit_report(self, filename, append=False): |
| fails = 0 |
| passes = 0 |
| errors = 0 |
| skips = 0 |
| duration = 0 |
| |
| for instance in self.instances.values(): |
| handler_time = instance.metrics.get('handler_time', 0) |
| duration += handler_time |
| if instance.status == "failed": |
| if instance.reason in ['build_error', 'handler_crash']: |
| errors += 1 |
| else: |
| fails += 1 |
| elif instance.status == 'skipped': |
| skips += 1 |
| else: |
| passes += 1 |
| |
| run = "Sanitycheck" |
| eleTestsuite = None |
| |
| # When we re-run the tests, we re-use the results and update only with |
| # the newly run tests. |
| if os.path.exists(filename) and append: |
| tree = ET.parse(filename) |
| eleTestsuites = tree.getroot() |
| eleTestsuite = tree.findall('testsuite')[0] |
| else: |
| eleTestsuites = ET.Element('testsuites') |
| eleTestsuite = ET.SubElement(eleTestsuites, 'testsuite', |
| name=run, time="%f" % duration, |
| tests="%d" % (errors + passes + fails + skips), |
| failures="%d" % fails, |
| errors="%d" % (errors), skip="%s" % (skips)) |
| |
| for instance in self.instances.values(): |
| |
| # remove testcases that are a re-run |
| if append: |
| for tc in eleTestsuite.findall('testcase'): |
| if tc.get('classname') == "%s:%s" % ( |
| instance.platform.name, instance.testcase.name): |
| eleTestsuite.remove(tc) |
| |
| handler_time = 0 |
| if instance.status != "failed" and instance.handler: |
| handler_time = instance.metrics.get("handler_time", 0) |
| |
| |
| eleTestcase = ET.SubElement( |
| eleTestsuite, |
| 'testcase', |
| classname="%s:%s" % (instance.platform.name, instance.testcase.name), |
| name="%s" % (instance.testcase.name), |
| time="%f" % handler_time) |
| |
| if instance.status == "failed": |
| failure = ET.SubElement( |
| eleTestcase, |
| 'failure', |
| type="failure", |
| message=instance.reason) |
| p = ("%s/%s/%s" % (self.outdir, instance.platform.name, instance.testcase.name)) |
| bl = os.path.join(p, "build.log") |
| hl = os.path.join(p, "handler.log") |
| log_file = bl |
| if instance.reason != 'Build error': |
| if os.path.exists(hl): |
| log_file = hl |
| else: |
| log_file = bl |
| |
| if os.path.exists(log_file): |
| with open(log_file, "rb") as f: |
| log = f.read().decode("utf-8") |
| filtered_string = ''.join(filter(lambda x: x in string.printable, log)) |
| failure.text = filtered_string |
| f.close() |
| elif instance.status == "skipped": |
| ET.SubElement(eleTestcase, 'skipped', type="skipped", message="Skipped") |
| |
| result = ET.tostring(eleTestsuites) |
| with open(filename, 'wb') as report: |
| report.write(result) |
| |
| def csv_report(self, filename): |
| with open(filename, "wt") as csvfile: |
| fieldnames = ["test", "arch", "platform", "passed", "status", |
| "extra_args", "handler", "handler_time", "ram_size", |
| "rom_size"] |
| cw = csv.DictWriter(csvfile, fieldnames, lineterminator=os.linesep) |
| cw.writeheader() |
| for instance in sorted(self.instances.values()): |
| rowdict = {"test": instance.testcase.name, |
| "arch": instance.platform.arch, |
| "platform": instance.platform.name, |
| "extra_args": " ".join(instance.testcase.extra_args), |
| "handler": instance.platform.simulation} |
| |
| if instance.status in ["failed", "timeout"]: |
| rowdict["passed"] = False |
| rowdict["status"] = instance.reason |
| else: |
| rowdict["passed"] = True |
| if instance.handler: |
| rowdict["handler_time"] = instance.metrics.get("handler_time", 0) |
| ram_size = instance.metrics.get("ram_size", 0) |
| rom_size = instance.metrics.get("rom_size", 0) |
| rowdict["ram_size"] = ram_size |
| rowdict["rom_size"] = rom_size |
| cw.writerow(rowdict) |
| |
| def get_testcase(self, identifier): |
| results = [] |
| for _, tc in self.testcases.items(): |
| for case in tc.cases: |
| if case == identifier: |
| results.append(tc) |
| return results |
| |
| |
| def parse_arguments(): |
| parser = argparse.ArgumentParser( |
| description=__doc__, |
| formatter_class=argparse.RawDescriptionHelpFormatter) |
| parser.fromfile_prefix_chars = "+" |
| |
| case_select = parser.add_argument_group("Test case selection", |
| """ |
| Artificially long but functional example: |
| $ ./scripts/sanitycheck -v \\ |
| --testcase-root tests/ztest/base \\ |
| --testcase-root tests/kernel \\ |
| --test tests/ztest/base/testing.ztest.verbose_0 \\ |
| --test tests/kernel/fifo/fifo_api/kernel.fifo.poll |
| |
| "kernel.fifo.poll" is one of the test section names in |
| __/fifo_api/testcase.yaml |
| """) |
| |
| parser.add_argument("--force-toolchain", action="store_true", |
| help="Do not filter based on toolchain, use the set " |
| " toolchain unconditionally") |
| parser.add_argument( |
| "-p", "--platform", action="append", |
| help="Platform filter for testing. This option may be used multiple " |
| "times. Testcases will only be built/run on the platforms " |
| "specified. If this option is not used, then platforms marked " |
| "as default in the platform metadata file will be chosen " |
| "to build and test. ") |
| parser.add_argument( |
| "-a", "--arch", action="append", |
| help="Arch filter for testing. Takes precedence over --platform. " |
| "If unspecified, test all arches. Multiple invocations " |
| "are treated as a logical 'or' relationship") |
| parser.add_argument( |
| "-t", "--tag", action="append", |
| help="Specify tags to restrict which tests to run by tag value. " |
| "Default is to not do any tag filtering. Multiple invocations " |
| "are treated as a logical 'or' relationship") |
| parser.add_argument("-e", "--exclude-tag", action="append", |
| help="Specify tags of tests that should not run. " |
| "Default is to run all tests with all tags.") |
| case_select.add_argument( |
| "-f", |
| "--only-failed", |
| action="store_true", |
| help="Run only those tests that failed the previous sanity check " |
| "invocation.") |
| |
| parser.add_argument( |
| "--retry-failed", type=int, default=0, |
| help="Retry failing tests again, up to the number of times specified.") |
| |
| test_xor_subtest = case_select.add_mutually_exclusive_group() |
| |
| test_xor_subtest.add_argument( |
| "-s", "--test", action="append", |
| help="Run only the specified test cases. These are named by " |
| "<path/relative/to/Zephyr/base/section.name.in.testcase.yaml>") |
| |
| test_xor_subtest.add_argument( |
| "--sub-test", action="append", |
| help="""Recursively find sub-test functions and run the entire |
| test section where they were found, including all sibling test |
| functions. Sub-tests are named by: |
| section.name.in.testcase.yaml.function_name_without_test_prefix |
| Example: kernel.fifo.poll.fifo_loop |
| """) |
| |
| parser.add_argument( |
| "-l", "--all", action="store_true", |
| help="Build/test on all platforms. Any --platform arguments " |
| "ignored.") |
| |
| parser.add_argument( |
| "-o", "--report-dir", |
| help="""Output reports containing results of the test run into the |
| specified directory. |
| The output will be both in CSV and JUNIT format |
| (sanitycheck.csv and sanitycheck.xml). |
| """) |
| |
| parser.add_argument( |
| "--report-name", |
| help="""Create a report with a custom name. |
| """) |
| |
| parser.add_argument("--report-excluded", |
| action="store_true", |
| help="""List all tests that are never run based on current scope and |
| coverage. If you are looking for accurate results, run this with |
| --all, but this will take a while...""") |
| |
| parser.add_argument("--compare-report", |
| help="Use this report file for size comparison") |
| |
| parser.add_argument( |
| "-B", "--subset", |
| help="Only run a subset of the tests, 1/4 for running the first 25%%, " |
| "3/5 means run the 3rd fifth of the total. " |
| "This option is useful when running a large number of tests on " |
| "different hosts to speed up execution time.") |
| |
| parser.add_argument( |
| "-N", "--ninja", action="store_true", |
| help="Use the Ninja generator with CMake") |
| |
| parser.add_argument( |
| "-y", "--dry-run", action="store_true", |
| help="""Create the filtered list of test cases, but don't actually |
| run them. Useful if you're just interested in the discard report |
| generated for every run and saved in the specified output |
| directory (sanitycheck_discard.csv). |
| """) |
| |
| parser.add_argument("--list-tags", action="store_true", |
| help="list all tags in selected tests") |
| |
| case_select.add_argument("--list-tests", action="store_true", |
| help="""List of all sub-test functions recursively found in |
| all --testcase-root arguments. Note different sub-tests can share |
| the same section name and come from different directories. |
| The output is flattened and reports --sub-test names only, |
| not their directories. For instance net.socket.getaddrinfo_ok |
| and net.socket.fd_set belong to different directories. |
| """) |
| |
| case_select.add_argument("--test-tree", action="store_true", |
| help="""Output the testsuite in a tree form""") |
| |
| case_select.add_argument("--list-test-duplicates", action="store_true", |
| help="""List tests with duplicate identifiers. |
| """) |
| |
| parser.add_argument("--export-tests", action="store", |
| metavar="FILENAME", |
| help="Export tests case meta-data to a file in CSV format.") |
| |
| parser.add_argument("--timestamps", |
| action="store_true", |
| help="Print all messages with time stamps") |
| |
| parser.add_argument( |
| "-r", "--release", action="store_true", |
| help="Update the benchmark database with the results of this test " |
| "run. Intended to be run by CI when tagging an official " |
| "release. This database is used as a basis for comparison " |
| "when looking for deltas in metrics such as footprint") |
| parser.add_argument("-w", "--warnings-as-errors", action="store_true", |
| help="Treat warning conditions as errors") |
| parser.add_argument( |
| "-v", |
| "--verbose", |
| action="count", |
| default=0, |
| help="Emit debugging information, call multiple times to increase " |
| "verbosity") |
| parser.add_argument( |
| "-i", "--inline-logs", action="store_true", |
| help="Upon test failure, print relevant log data to stdout " |
| "instead of just a path to it") |
| parser.add_argument("--log-file", metavar="FILENAME", action="store", |
| help="log also to file") |
| parser.add_argument( |
| "-m", "--last-metrics", action="store_true", |
| help="Instead of comparing metrics from the last --release, " |
| "compare with the results of the previous sanity check " |
| "invocation") |
| parser.add_argument( |
| "-u", |
| "--no-update", |
| action="store_true", |
| help="do not update the results of the last run of the sanity " |
| "checks") |
| case_select.add_argument( |
| "-F", |
| "--load-tests", |
| metavar="FILENAME", |
| action="store", |
| help="Load list of tests and platforms to be run from file.") |
| |
| case_select.add_argument( |
| "-E", |
| "--save-tests", |
| metavar="FILENAME", |
| action="store", |
| help="Append list of tests and platforms to be run to file.") |
| |
| test_or_build = parser.add_mutually_exclusive_group() |
| test_or_build.add_argument( |
| "-b", "--build-only", action="store_true", |
| help="Only build the code, do not execute any of it in QEMU") |
| |
| test_or_build.add_argument( |
| "--test-only", action="store_true", |
| help="""Only run device tests with current artifacts, do not build |
| the code""") |
| parser.add_argument( |
| "--cmake-only", action="store_true", |
| help="Only run cmake, do not build or run.") |
| |
| parser.add_argument( |
| "-j", "--jobs", type=int, |
| help="Number of jobs for building, defaults to number of CPU threads, " |
| "overcommited by factor 2 when --build-only") |
| |
| parser.add_argument( |
| "--show-footprint", action="store_true", |
| help="Show footprint statistics and deltas since last release." |
| ) |
| parser.add_argument( |
| "-H", "--footprint-threshold", type=float, default=5, |
| help="When checking test case footprint sizes, warn the user if " |
| "the new app size is greater then the specified percentage " |
| "from the last release. Default is 5. 0 to warn on any " |
| "increase on app size") |
| parser.add_argument( |
| "-D", "--all-deltas", action="store_true", |
| help="Show all footprint deltas, positive or negative. Implies " |
| "--footprint-threshold=0") |
| parser.add_argument( |
| "-O", "--outdir", |
| default=os.path.join(os.getcwd(), "sanity-out"), |
| help="Output directory for logs and binaries. " |
| "Default is 'sanity-out' in the current directory. " |
| "This directory will be deleted unless '--no-clean' is set.") |
| parser.add_argument( |
| "-n", "--no-clean", action="store_true", |
| help="Do not delete the outdir before building. Will result in " |
| "faster compilation since builds will be incremental") |
| case_select.add_argument( |
| "-T", "--testcase-root", action="append", default=[], |
| help="Base directory to recursively search for test cases. All " |
| "testcase.yaml files under here will be processed. May be " |
| "called multiple times. Defaults to the 'samples/' and " |
| "'tests/' directories at the base of the Zephyr tree.") |
| |
| board_root_list = ["%s/boards" % ZEPHYR_BASE, |
| "%s/scripts/sanity_chk/boards" % ZEPHYR_BASE] |
| |
| parser.add_argument( |
| "-A", "--board-root", action="append", default=board_root_list, |
| help="""Directory to search for board configuration files. All .yaml |
| files in the directory will be processed. The directory should have the same |
| structure in the main Zephyr tree: boards/<arch>/<board_name>/""") |
| |
| parser.add_argument( |
| "-z", "--size", action="append", |
| help="Don't run sanity checks. Instead, produce a report to " |
| "stdout detailing RAM/ROM sizes on the specified filenames. " |
| "All other command line arguments ignored.") |
| parser.add_argument( |
| "-S", "--enable-slow", action="store_true", |
| help="Execute time-consuming test cases that have been marked " |
| "as 'slow' in testcase.yaml. Normally these are only built.") |
| parser.add_argument( |
| "--disable-unrecognized-section-test", action="store_true", |
| default=False, |
| help="Skip the 'unrecognized section' test.") |
| parser.add_argument("-R", "--enable-asserts", action="store_true", |
| default=True, |
| help="deprecated, left for compatibility") |
| parser.add_argument("--disable-asserts", action="store_false", |
| dest="enable_asserts", |
| help="deprecated, left for compatibility") |
| parser.add_argument("-Q", "--error-on-deprecations", action="store_false", |
| help="Error on deprecation warnings.") |
| parser.add_argument("--enable-size-report", action="store_true", |
| help="Enable expensive computation of RAM/ROM segment sizes.") |
| |
| parser.add_argument( |
| "-x", "--extra-args", action="append", default=[], |
| help="""Extra CMake cache entries to define when building test cases. |
| May be called multiple times. The key-value entries will be |
| prefixed with -D before being passed to CMake. |
| |
| E.g |
| "sanitycheck -x=USE_CCACHE=0" |
| will translate to |
| "cmake -DUSE_CCACHE=0" |
| |
| which will ultimately disable ccache. |
| """ |
| ) |
| |
| parser.add_argument( |
| "--device-testing", action="store_true", |
| help="Test on device directly. Specify the serial device to " |
| "use with the --device-serial option.") |
| |
| parser.add_argument( |
| "-X", "--fixture", action="append", default=[], |
| help="Specify a fixture that a board might support") |
| parser.add_argument( |
| "--device-serial", |
| help="Serial device for accessing the board (e.g., /dev/ttyACM0)") |
| |
| parser.add_argument("--generate-hardware-map", |
| help="""Probe serial devices connected to this platform |
| and create a hardware map file to be used with |
| --device-testing |
| """) |
| |
| parser.add_argument("--hardware-map", |
| help="""Load hardware map from a file. This will be used |
| for testing on hardware that is listed in the file. |
| """) |
| |
| parser.add_argument( |
| "--west-flash", nargs='?', const=[], |
| help="""Uses west instead of ninja or make to flash when running with |
| --device-testing. Supports comma-separated argument list. |
| |
| E.g "sanitycheck --device-testing --device-serial /dev/ttyACM0 |
| --west-flash="--board-id=foobar,--erase" |
| will translate to "west flash -- --board-id=foobar --erase" |
| |
| NOTE: device-testing must be enabled to use this option. |
| """ |
| ) |
| parser.add_argument( |
| "--west-runner", |
| help="""Uses the specified west runner instead of default when running |
| with --west-flash. |
| |
| E.g "sanitycheck --device-testing --device-serial /dev/ttyACM0 |
| --west-flash --west-runner=pyocd" |
| will translate to "west flash --runner pyocd" |
| |
| NOTE: west-flash must be enabled to use this option. |
| """ |
| ) |
| |
| valgrind_asan_group = parser.add_mutually_exclusive_group() |
| |
| valgrind_asan_group.add_argument( |
| "--enable-valgrind", action="store_true", |
| help="""Run binary through valgrind and check for several memory access |
| errors. Valgrind needs to be installed on the host. This option only |
| works with host binaries such as those generated for the native_posix |
| configuration and is mutual exclusive with --enable-asan. |
| """) |
| |
| valgrind_asan_group.add_argument( |
| "--enable-asan", action="store_true", |
| help="""Enable address sanitizer to check for several memory access |
| errors. Libasan needs to be installed on the host. This option only |
| works with host binaries such as those generated for the native_posix |
| configuration and is mutual exclusive with --enable-valgrind. |
| """) |
| |
| parser.add_argument( |
| "--enable-lsan", action="store_true", |
| help="""Enable leak sanitizer to check for heap memory leaks. |
| Libasan needs to be installed on the host. This option only |
| works with host binaries such as those generated for the native_posix |
| configuration and when --enable-asan is given. |
| """) |
| |
| parser.add_argument("--enable-coverage", action="store_true", |
| help="Enable code coverage using gcov.") |
| |
| parser.add_argument("-C", "--coverage", action="store_true", |
| help="Generate coverage reports. Implies " |
| "--enable_coverage.") |
| |
| parser.add_argument("--coverage-platform", action="append", default=[], |
| help="Plarforms to run coverage reports on. " |
| "This option may be used multiple times. " |
| "Default to what was selected with --platform.") |
| |
| parser.add_argument("--gcov-tool", default=None, |
| help="Path to the gcov tool to use for code coverage " |
| "reports") |
| |
| parser.add_argument("--coverage-tool", choices=['lcov', 'gcovr'], default='lcov', |
| help="Tool to use to generate coverage report.") |
| |
| return parser.parse_args() |
| |
| |
| def size_report(sc): |
| logger.info(sc.filename) |
| logger.info("SECTION NAME VMA LMA SIZE HEX SZ TYPE") |
| for i in range(len(sc.sections)): |
| v = sc.sections[i] |
| |
| logger.info("%-17s 0x%08x 0x%08x %8d 0x%05x %-7s" % |
| (v["name"], v["virt_addr"], v["load_addr"], v["size"], v["size"], |
| v["type"])) |
| |
| logger.info("Totals: %d bytes (ROM), %d bytes (RAM)" % |
| (sc.rom_size, sc.ram_size)) |
| logger.info("") |
| |
| |
| class CoverageTool: |
| """ Base class for every supported coverage tool |
| """ |
| |
| def __init__(self): |
| self.gcov_tool = options.gcov_tool |
| |
| @staticmethod |
| def factory(tool): |
| if tool == 'lcov': |
| return Lcov() |
| if tool == 'gcovr': |
| return Gcovr() |
| logger.error("Unsupported coverage tool specified: {}".format(tool)) |
| |
| @staticmethod |
| def retrieve_gcov_data(intput_file): |
| if VERBOSE: |
| logger.debug("Working on %s" % intput_file) |
| extracted_coverage_info = {} |
| capture_data = False |
| capture_complete = False |
| with open(intput_file, 'r') as fp: |
| for line in fp.readlines(): |
| if re.search("GCOV_COVERAGE_DUMP_START", line): |
| capture_data = True |
| continue |
| if re.search("GCOV_COVERAGE_DUMP_END", line): |
| capture_complete = True |
| break |
| # Loop until the coverage data is found. |
| if not capture_data: |
| continue |
| if line.startswith("*"): |
| sp = line.split("<") |
| if len(sp) > 1: |
| # Remove the leading delimiter "*" |
| file_name = sp[0][1:] |
| # Remove the trailing new line char |
| hex_dump = sp[1][:-1] |
| else: |
| continue |
| else: |
| continue |
| extracted_coverage_info.update({file_name: hex_dump}) |
| if not capture_data: |
| capture_complete = True |
| return {'complete': capture_complete, 'data': extracted_coverage_info} |
| |
| @staticmethod |
| def create_gcda_files(extracted_coverage_info): |
| if VERBOSE: |
| logger.debug("Generating gcda files") |
| for filename, hexdump_val in extracted_coverage_info.items(): |
| # if kobject_hash is given for coverage gcovr fails |
| # hence skipping it problem only in gcovr v4.1 |
| if "kobject_hash" in filename: |
| filename = (filename[:-4]) + "gcno" |
| try: |
| os.remove(filename) |
| except Exception: |
| pass |
| continue |
| |
| with open(filename, 'wb') as fp: |
| fp.write(bytes.fromhex(hexdump_val)) |
| |
| def generate(self, outdir): |
| for filename in glob.glob("%s/**/handler.log" % outdir, recursive=True): |
| gcov_data = self.__class__.retrieve_gcov_data(filename) |
| capture_complete = gcov_data['complete'] |
| extracted_coverage_info = gcov_data['data'] |
| if capture_complete: |
| self.__class__.create_gcda_files(extracted_coverage_info) |
| logger.debug("Gcov data captured: {}".format(filename)) |
| else: |
| logger.error("Gcov data capture incomplete: {}".format(filename)) |
| |
| with open(os.path.join(outdir, "coverage.log"), "a") as coveragelog: |
| ret = self._generate(outdir, coveragelog) |
| if ret == 0: |
| logger.info("HTML report generated: {}".format( |
| os.path.join(outdir, "coverage", "index.html"))) |
| |
| |
| class Lcov(CoverageTool): |
| |
| def __init__(self): |
| super().__init__() |
| self.ignores = [] |
| |
| def add_ignore_file(self, pattern): |
| self.ignores.append('*' + pattern + '*') |
| |
| def add_ignore_directory(self, pattern): |
| self.ignores.append(pattern + '/*') |
| |
| def _generate(self, outdir, coveragelog): |
| coveragefile = os.path.join(outdir, "coverage.info") |
| ztestfile = os.path.join(outdir, "ztest.info") |
| subprocess.call(["lcov", "--gcov-tool", self.gcov_tool, |
| "--capture", "--directory", outdir, |
| "--rc", "lcov_branch_coverage=1", |
| "--output-file", coveragefile], stdout=coveragelog) |
| # We want to remove tests/* and tests/ztest/test/* but save tests/ztest |
| subprocess.call(["lcov", "--gcov-tool", self.gcov_tool, "--extract", |
| coveragefile, |
| os.path.join(ZEPHYR_BASE, "tests", "ztest", "*"), |
| "--output-file", ztestfile, |
| "--rc", "lcov_branch_coverage=1"], stdout=coveragelog) |
| |
| if os.path.exists(ztestfile) and os.path.getsize(ztestfile) > 0: |
| subprocess.call(["lcov", "--gcov-tool", self.gcov_tool, "--remove", |
| ztestfile, |
| os.path.join(ZEPHYR_BASE, "tests/ztest/test/*"), |
| "--output-file", ztestfile, |
| "--rc", "lcov_branch_coverage=1"], |
| stdout=coveragelog) |
| files = [coveragefile, ztestfile] |
| else: |
| files = [coveragefile] |
| |
| for i in self.ignores: |
| subprocess.call( |
| ["lcov", "--gcov-tool", self.gcov_tool, "--remove", |
| coveragefile, i, "--output-file", |
| coveragefile, "--rc", "lcov_branch_coverage=1"], |
| stdout=coveragelog) |
| |
| # The --ignore-errors source option is added to avoid it exiting due to |
| # samples/application_development/external_lib/ |
| return subprocess.call(["genhtml", "--legend", "--branch-coverage", |
| "--ignore-errors", "source", |
| "-output-directory", |
| os.path.join(outdir, "coverage")] + files, |
| stdout=coveragelog) |
| |
| |
| class Gcovr(CoverageTool): |
| |
| def __init__(self): |
| super().__init__() |
| self.ignores = [] |
| |
| def add_ignore_file(self, pattern): |
| self.ignores.append('.*' + pattern + '.*') |
| |
| def add_ignore_directory(self, pattern): |
| self.ignores.append(pattern + '/.*') |
| |
| @staticmethod |
| def _interleave_list(prefix, list): |
| tuple_list = [(prefix, item) for item in list] |
| return [item for sublist in tuple_list for item in sublist] |
| |
| def _generate(self, outdir, coveragelog): |
| coveragefile = os.path.join(outdir, "coverage.json") |
| ztestfile = os.path.join(outdir, "ztest.json") |
| |
| excludes = Gcovr._interleave_list("-e", self.ignores) |
| |
| # We want to remove tests/* and tests/ztest/test/* but save tests/ztest |
| subprocess.call(["gcovr", "-r", ZEPHYR_BASE, "--gcov-executable", |
| self.gcov_tool, "-e", "tests/*"] + excludes + |
| ["--json", "-o", coveragefile, outdir], |
| stdout=coveragelog) |
| |
| subprocess.call(["gcovr", "-r", ZEPHYR_BASE, "--gcov-executable", |
| self.gcov_tool, "-f", "tests/ztest", "-e", |
| "tests/ztest/test/*", "--json", "-o", ztestfile, |
| outdir], stdout=coveragelog) |
| |
| if os.path.exists(ztestfile) and os.path.getsize(ztestfile) > 0: |
| files = [coveragefile, ztestfile] |
| else: |
| files = [coveragefile] |
| |
| subdir = os.path.join(outdir, "coverage") |
| os.makedirs(subdir, exist_ok=True) |
| |
| tracefiles = self._interleave_list("--add-tracefile", files) |
| |
| return subprocess.call(["gcovr", "-r", ZEPHYR_BASE, "--html", |
| "--html-details"] + tracefiles + |
| ["-o", os.path.join(subdir, "index.html")], |
| stdout=coveragelog) |
| |
| |
| def get_generator(): |
| if options.ninja: |
| generator_cmd = "ninja" |
| generator = "Ninja" |
| else: |
| generator_cmd = "make" |
| generator = "Unix Makefiles" |
| return generator_cmd, generator |
| |
| |
| def export_tests(filename, tests): |
| with open(filename, "wt") as csvfile: |
| fieldnames = ['section', 'subsection', 'title', 'reference'] |
| cw = csv.DictWriter(csvfile, fieldnames, lineterminator=os.linesep) |
| for test in tests: |
| data = test.split(".") |
| if len(data) > 1: |
| subsec = " ".join(data[1].split("_")).title() |
| rowdict = { |
| "section": data[0].capitalize(), |
| "subsection": subsec, |
| "title": test, |
| "reference": test |
| } |
| cw.writerow(rowdict) |
| else: |
| logger.info("{} can't be exported".format(test)) |
| |
| |
| def native_and_unit_first(a, b): |
| if a[0].startswith('unit_testing'): |
| return -1 |
| if b[0].startswith('unit_testing'): |
| return 1 |
| if a[0].startswith('native_posix'): |
| return -1 |
| if b[0].startswith('native_posix'): |
| return 1 |
| if a[0].split("/", 1)[0].endswith("_bsim"): |
| return -1 |
| if b[0].split("/", 1)[0].endswith("_bsim"): |
| return 1 |
| |
| return (a > b) - (a < b) |
| |
| |
| class HardwareMap: |
| manufacturer = [ |
| 'ARM', |
| 'SEGGER', |
| 'MBED', |
| 'STMicroelectronics', |
| 'Atmel Corp.', |
| 'Texas Instruments', |
| 'Silicon Labs', |
| 'NXP Semiconductors', |
| 'Microchip Technology Inc.', |
| 'FTDI' |
| ] |
| |
| runner_mapping = { |
| 'pyocd': [ |
| 'DAPLink CMSIS-DAP', |
| 'MBED CMSIS-DAP' |
| ], |
| 'jlink': [ |
| 'J-Link', |
| 'J-Link OB' |
| ], |
| 'openocd': [ |
| 'STM32 STLink', '^XDS110.*' |
| ], |
| 'dediprog': [ |
| 'TTL232R-3V3', |
| 'MCP2200 USB Serial Port Emulator' |
| ] |
| } |
| |
| def __init__(self): |
| self.detected = [] |
| self.connected_hardware = [] |
| |
| def load_device_from_cmdline(self, serial, platform): |
| device = { |
| "serial": serial, |
| "platform": platform, |
| "counter": 0, |
| "available": True, |
| "connected": True |
| } |
| self.connected_hardware.append(device) |
| |
| def load_hardware_map(self, map_file): |
| with open(map_file, 'r') as stream: |
| try: |
| self.connected_hardware = yaml.safe_load(stream) |
| except yaml.YAMLError as exc: |
| print(exc) |
| for i in self.connected_hardware: |
| i['counter'] = 0 |
| |
| def scan_hw(self): |
| from serial.tools import list_ports |
| |
| serial_devices = list_ports.comports() |
| logger.info("Scanning connected hardware...") |
| for d in serial_devices: |
| if d.manufacturer in self.manufacturer: |
| |
| # TI XDS110 can have multiple serial devices for a single board |
| # assume endpoint 0 is the serial, skip all others |
| if d.manufacturer == 'Texas Instruments' and not d.location.endswith('0'): |
| continue |
| s_dev = {} |
| s_dev['platform'] = "unknown" |
| s_dev['id'] = d.serial_number |
| s_dev['serial'] = d.device |
| s_dev['product'] = d.product |
| s_dev['runner'] = 'unknown' |
| for runner, _ in self.runner_mapping.items(): |
| products = self.runner_mapping.get(runner) |
| if d.product in products: |
| s_dev['runner'] = runner |
| continue |
| # Try regex matching |
| for p in products: |
| if re.match(p, d.product): |
| s_dev['runner'] = runner |
| |
| s_dev['available'] = True |
| s_dev['connected'] = True |
| self.detected.append(s_dev) |
| else: |
| logger.warning("Unsupported device (%s): %s" % (d.manufacturer, d)) |
| |
| def write_map(self, hwm_file): |
| # use existing map |
| if os.path.exists(hwm_file): |
| with open(hwm_file, 'r') as yaml_file: |
| hwm = yaml.load(yaml_file, Loader=yaml.FullLoader) |
| # disconnect everything |
| for h in hwm: |
| h['connected'] = False |
| h['serial'] = None |
| |
| for d in self.detected: |
| for h in hwm: |
| if d['id'] == h['id'] and d['product'] == h['product']: |
| h['connected'] = True |
| h['serial'] = d['serial'] |
| d['match'] = True |
| |
| new = list(filter(lambda n: not n.get('match', False), self.detected)) |
| hwm = hwm + new |
| |
| logger.info("Registered devices:") |
| self.dump(hwm) |
| |
| with open(hwm_file, 'w') as yaml_file: |
| yaml.dump(hwm, yaml_file, default_flow_style=False) |
| |
| else: |
| # create new file |
| with open(hwm_file, 'w') as yaml_file: |
| yaml.dump(self.detected, yaml_file, default_flow_style=False) |
| logger.info("Detected devices:") |
| self.dump(self.detected) |
| |
| @staticmethod |
| def dump(hwmap=[], filtered=[], header=[], connected_only=False): |
| print("") |
| table = [] |
| if not header: |
| header = ["Platform", "ID", "Serial device"] |
| for p in sorted(hwmap, key=lambda i: i['platform']): |
| platform = p.get('platform') |
| connected = p.get('connected', False) |
| if filtered and platform not in filtered: |
| continue |
| |
| if not connected_only or connected: |
| table.append([platform, p.get('id', None), p.get('serial')]) |
| |
| print(tabulate(table, headers=header, tablefmt="github")) |
| |
| options = None |
| |
| |
| def main(): |
| start_time = time.time() |
| global VERBOSE |
| global options |
| |
| options = parse_arguments() |
| |
| # Cleanup |
| if options.no_clean or options.only_failed or options.test_only: |
| if os.path.exists(options.outdir): |
| logger.info("Keeping artifacts untouched") |
| elif os.path.exists(options.outdir): |
| for i in range(1, 100): |
| new_out = options.outdir + ".{}".format(i) |
| if not os.path.exists(new_out): |
| logger.info("Renaming output directory to {}".format(new_out)) |
| shutil.move(options.outdir, new_out) |
| break |
| |
| os.makedirs(options.outdir, exist_ok=True) |
| |
| # create file handler which logs even debug messages |
| if options.log_file: |
| fh = logging.FileHandler(options.log_file) |
| else: |
| fh = logging.FileHandler(os.path.join(options.outdir, "sanitycheck.log")) |
| |
| fh.setLevel(logging.DEBUG) |
| |
| # create console handler with a higher log level |
| ch = logging.StreamHandler() |
| |
| VERBOSE += options.verbose |
| if VERBOSE > 1: |
| ch.setLevel(logging.DEBUG) |
| else: |
| ch.setLevel(logging.INFO) |
| |
| # create formatter and add it to the handlers |
| if options.timestamps: |
| formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') |
| else: |
| formatter = logging.Formatter('%(levelname)-7s - %(message)s') |
| |
| formatter_file = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') |
| ch.setFormatter(formatter) |
| fh.setFormatter(formatter_file) |
| |
| # add the handlers to logger |
| logger.addHandler(ch) |
| logger.addHandler(fh) |
| |
| hwm = HardwareMap() |
| if options.generate_hardware_map: |
| hwm.scan_hw() |
| hwm.write_map(options.generate_hardware_map) |
| return |
| |
| if not options.device_testing and options.hardware_map: |
| hwm.load_hardware_map(options.hardware_map) |
| |
| logger.info("Available devices:") |
| table = [] |
| hwm.dump(hwmap=hwm.connected_hardware, connected_only=True) |
| return |
| |
| if options.west_runner and not options.west_flash: |
| logger.error("west-runner requires west-flash to be enabled") |
| sys.exit(1) |
| |
| if options.west_flash and not options.device_testing: |
| logger.error("west-flash requires device-testing to be enabled") |
| sys.exit(1) |
| |
| if options.coverage: |
| options.enable_coverage = True |
| |
| if not options.coverage_platform: |
| options.coverage_platform = options.platform |
| |
| if options.size: |
| for fn in options.size: |
| size_report(SizeCalculator(fn, [])) |
| sys.exit(0) |
| |
| if options.subset: |
| subset, sets = options.subset.split("/") |
| if int(subset) > 0 and int(sets) >= int(subset): |
| logger.info("Running only a subset: %s/%s" % (subset, sets)) |
| else: |
| logger.error("You have provided a wrong subset value: %s." % options.subset) |
| return |
| |
| if not options.testcase_root: |
| options.testcase_root = [os.path.join(ZEPHYR_BASE, "tests"), |
| os.path.join(ZEPHYR_BASE, "samples")] |
| |
| if options.show_footprint or options.compare_report or options.release: |
| options.enable_size_report = True |
| |
| suite = TestSuite(options.board_root, options.testcase_root, options.outdir) |
| |
| # Set testsuite options from command line. |
| suite.build_only = options.build_only |
| suite.cmake_only = options.cmake_only |
| suite.test_only = options.test_only |
| suite.enable_slow = options.enable_slow |
| suite.device_testing = options.device_testing |
| suite.fixture = options.fixture |
| suite.enable_asan = options.enable_asan |
| suite.enable_lsan = options.enable_lsan |
| suite.enable_coverage = options.enable_coverage |
| suite.enable_valgrind = options.enable_valgrind |
| suite.coverage_platform = options.coverage_platform |
| suite.inline_logs = options.inline_logs |
| suite.enable_size_report = options.enable_size_report |
| |
| # Set number of jobs |
| if options.jobs: |
| suite.jobs = options.jobs |
| elif options.build_only: |
| suite.jobs = multiprocessing.cpu_count() * 2 |
| else: |
| suite.jobs = multiprocessing.cpu_count() |
| logger.info("JOBS: %d" % suite.jobs) |
| |
| suite.add_testcases() |
| suite.add_configurations() |
| |
| if options.device_testing: |
| if options.hardware_map: |
| hwm.load_hardware_map(options.hardware_map) |
| suite.connected_hardware = hwm.connected_hardware |
| if not options.platform: |
| options.platform = [] |
| for platform in hwm.connected_hardware: |
| if platform['connected']: |
| options.platform.append(platform['platform']) |
| |
| elif options.device_serial: # back-ward compatibility |
| if options.platform and len(options.platform) == 1: |
| hwm.load_device_from_cmdline(options.device_serial, options.platform[0]) |
| suite.connected_hardware = hwm.connected_hardware |
| else: |
| logger.error("""When --device-testing is used with --device-serial, only one |
| platform is allowed""") |
| |
| if suite.load_errors: |
| sys.exit(1) |
| |
| if options.list_tags: |
| tags = set() |
| for _, tc in suite.testcases.items(): |
| tags = tags.union(tc.tags) |
| |
| for t in tags: |
| print("- {}".format(t)) |
| |
| return |
| |
| if options.export_tests: |
| cnt = 0 |
| tests = suite.get_all_tests() |
| export_tests(options.export_tests, tests) |
| return |
| |
| run_individual_tests = [] |
| |
| if options.test: |
| run_individual_tests = options.test |
| |
| if options.list_tests or options.test_tree or options.list_test_duplicates or options.sub_test: |
| cnt = 0 |
| all_tests = suite.get_all_tests() |
| |
| if options.list_test_duplicates: |
| import collections |
| dupes = [item for item, count in collections.Counter(all_tests).items() if count > 1] |
| if dupes: |
| print("Tests with duplicate identifiers:") |
| for dupe in dupes: |
| print("- {}".format(dupe)) |
| for dc in suite.get_testcase(dupe): |
| print(" - {}".format(dc)) |
| else: |
| print("No duplicates found.") |
| return |
| |
| if options.sub_test: |
| for st in options.sub_test: |
| subtests = suite.get_testcase(st) |
| for sti in subtests: |
| run_individual_tests.append(sti.name) |
| |
| if run_individual_tests: |
| logger.info("Running the following tests:") |
| for test in run_individual_tests: |
| print(" - {}".format(test)) |
| else: |
| logger.info("Tests not found") |
| return |
| |
| elif options.list_tests or options.test_tree: |
| if options.test_tree: |
| testsuite = Node("Testsuite") |
| samples = Node("Samples", parent=testsuite) |
| tests = Node("Tests", parent=testsuite) |
| |
| for test in sorted(all_tests): |
| cnt = cnt + 1 |
| if options.list_tests: |
| print(" - {}".format(test)) |
| |
| if options.test_tree: |
| if test.startswith("sample."): |
| sec = test.split(".") |
| area = find(samples, lambda node: node.name == sec[1] and node.parent == samples) |
| if not area: |
| area = Node(sec[1], parent=samples) |
| |
| t = Node(test, parent=area) |
| else: |
| sec = test.split(".") |
| area = find(tests, lambda node: node.name == sec[0] and node.parent == tests) |
| if not area: |
| area = Node(sec[0], parent=tests) |
| |
| if area and len(sec) > 2: |
| subarea = find(area, lambda node: node.name == sec[1] and node.parent == area) |
| if not subarea: |
| subarea = Node(sec[1], parent=area) |
| |
| t = Node(test, parent=subarea) |
| |
| if options.list_tests: |
| print("{} total.".format(cnt)) |
| |
| if options.test_tree: |
| for pre, _, node in RenderTree(testsuite): |
| print("%s%s" % (pre, node.name)) |
| |
| return |
| |
| discards = [] |
| |
| if options.only_failed: |
| suite.get_last_failed() |
| suite.selected_platforms = set(p.platform.name for p in suite.instances.values()) |
| elif options.load_tests: |
| suite.load_from_file(options.load_tests) |
| elif options.test_only: |
| last_run = os.path.join(options.outdir, "sanitycheck.csv") |
| suite.load_from_file(last_run) |
| else: |
| discards = suite.apply_filters( |
| build_only=options.build_only, |
| enable_slow=options.enable_slow, |
| platform=options.platform, |
| arch=options.arch, |
| tag=options.tag, |
| exclude_tag=options.exclude_tag, |
| force_toolchain=options.force_toolchain, |
| all=options.all, |
| run_individual_tests=run_individual_tests, |
| device_testing=options.device_testing |
| |
| ) |
| |
| if VERBOSE > 1 and discards: |
| # if we are using command line platform filter, no need to list every |
| # other platform as excluded, we know that already. |
| # Show only the discards that apply to the selected platforms on the |
| # command line |
| |
| for i, reason in discards.items(): |
| if options.platform and i.platform.name not in options.platform: |
| continue |
| logger.debug( |
| "{:<25} {:<50} {}SKIPPED{}: {}".format( |
| i.platform.name, |
| i.testcase.name, |
| Fore.YELLOW, |
| Fore.RESET, |
| reason)) |
| |
| if options.report_excluded: |
| all_tests = suite.get_all_tests() |
| to_be_run = set() |
| for i, p in suite.instances.items(): |
| to_be_run.update(p.testcase.cases) |
| |
| if all_tests - to_be_run: |
| print("Tests that never build or run:") |
| for not_run in all_tests - to_be_run: |
| print("- {}".format(not_run)) |
| |
| return |
| |
| if options.subset: |
| # suite.instances = OrderedDict(sorted(suite.instances.items(), |
| # key=cmp_to_key(native_and_unit_first))) |
| subset, sets = options.subset.split("/") |
| total = len(suite.instances) |
| per_set = round(total / int(sets)) |
| start = (int(subset) - 1) * per_set |
| if subset == sets: |
| end = total |
| else: |
| end = start + per_set |
| |
| sliced_instances = islice(suite.instances.items(), start, end) |
| suite.instances = OrderedDict(sliced_instances) |
| |
| if options.save_tests: |
| suite.csv_report(options.save_tests) |
| return |
| |
| logger.info("%d test configurations selected, %d configurations discarded due to filters." % |
| (len(suite.instances), len(discards))) |
| |
| if options.device_testing: |
| print("\nDevice testing on:") |
| hwm.dump(suite.connected_hardware, suite.selected_platforms) |
| print("") |
| |
| if options.dry_run: |
| duration = time.time() - start_time |
| logger.info("Completed in %d seconds" % (duration)) |
| return |
| |
| retries = options.retry_failed + 1 |
| completed = 0 |
| |
| suite.update() |
| suite.start_time = start_time |
| |
| while True: |
| completed += 1 |
| |
| if completed > 1: |
| logger.info("%d Iteration:" % (completed)) |
| time.sleep(60) # waiting for the system to settle down |
| suite.total_done = suite.total_tests - suite.total_failed |
| suite.total_failed = 0 |
| |
| suite.execute() |
| print("") |
| |
| retries = retries - 1 |
| if retries == 0 or suite.total_failed == 0: |
| break |
| |
| suite.misc_reports(options.compare_report, options.show_footprint, |
| options.all_deltas, options.footprint_threshold, options.last_metrics) |
| |
| suite.duration = time.time() - start_time |
| suite.summary(options.disable_unrecognized_section_test) |
| |
| if options.coverage: |
| if options.gcov_tool is None: |
| use_system_gcov = False |
| |
| for plat in options.coverage_platform: |
| ts_plat = suite.get_platform(plat) |
| if ts_plat and (ts_plat.type in {"native", "unit"}): |
| use_system_gcov = True |
| |
| if use_system_gcov or "ZEPHYR_SDK_INSTALL_DIR" not in os.environ: |
| options.gcov_tool = "gcov" |
| else: |
| options.gcov_tool = os.path.join(os.environ["ZEPHYR_SDK_INSTALL_DIR"], |
| "i586-zephyr-elf/bin/i586-zephyr-elf-gcov") |
| |
| logger.info("Generating coverage files...") |
| coverage_tool = CoverageTool.factory(options.coverage_tool) |
| coverage_tool.add_ignore_file('generated') |
| coverage_tool.add_ignore_directory('tests') |
| coverage_tool.add_ignore_directory('samples') |
| coverage_tool.generate(options.outdir) |
| |
| if options.device_testing: |
| print("\nHardware distribution summary:\n") |
| table = [] |
| header = ['Board', 'ID', 'Counter'] |
| for p in hwm.connected_hardware: |
| if p['connected'] and p['platform'] in suite.selected_platforms: |
| row = [p['platform'], p.get('id', None), p['counter']] |
| table.append(row) |
| print(tabulate(table, headers=header, tablefmt="github")) |
| |
| suite.save_reports(options.report_name, |
| options.report_dir, |
| options.no_update, |
| options.release, |
| options.only_failed) |
| |
| if suite.total_failed or (suite.warnings and options.warnings_as_errors): |
| sys.exit(1) |
| |
| |
| if __name__ == "__main__": |
| main() |