|  | #!/usr/bin/env python3 | 
|  | # vim: set syntax=python ts=4 : | 
|  | # | 
|  | # Copyright (c) 2018 Intel Corporation | 
|  | # SPDX-License-Identifier: Apache-2.0 | 
|  |  | 
|  | import os | 
|  | import contextlib | 
|  | import string | 
|  | import mmap | 
|  | import sys | 
|  | import re | 
|  | import subprocess | 
|  | import select | 
|  | import shutil | 
|  | import shlex | 
|  | import signal | 
|  | import threading | 
|  | import concurrent.futures | 
|  | from collections import OrderedDict | 
|  | import queue | 
|  | import time | 
|  | import csv | 
|  | import glob | 
|  | import concurrent | 
|  | import xml.etree.ElementTree as ET | 
|  | import logging | 
|  | import pty | 
|  | from pathlib import Path | 
|  | from distutils.spawn import find_executable | 
|  | from colorama import Fore | 
|  | import pickle | 
|  | import platform | 
|  | import yaml | 
|  | import json | 
|  | from multiprocessing import Lock, Process, Value | 
|  |  | 
|  | try: | 
|  | # Use the C LibYAML parser if available, rather than the Python parser. | 
|  | # It's much faster. | 
|  | from yaml import CSafeLoader as SafeLoader | 
|  | from yaml import CDumper as Dumper | 
|  | except ImportError: | 
|  | from yaml import SafeLoader, Dumper | 
|  |  | 
|  | try: | 
|  | import serial | 
|  | except ImportError: | 
|  | print("Install pyserial python module with pip to use --device-testing option.") | 
|  |  | 
|  | try: | 
|  | from tabulate import tabulate | 
|  | except ImportError: | 
|  | print("Install tabulate python module with pip to use --device-testing option.") | 
|  |  | 
|  | try: | 
|  | import psutil | 
|  | except ImportError: | 
|  | print("Install psutil python module with pip to run in Qemu.") | 
|  |  | 
|  | ZEPHYR_BASE = os.getenv("ZEPHYR_BASE") | 
|  | if not ZEPHYR_BASE: | 
|  | sys.exit("$ZEPHYR_BASE environment variable undefined") | 
|  |  | 
|  | # This is needed to load edt.pickle files. | 
|  | sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts", "dts", | 
|  | "python-devicetree", "src")) | 
|  | from devicetree import edtlib  # pylint: disable=unused-import | 
|  |  | 
|  | # Use this for internal comparisons; that's what canonicalization is | 
|  | # for. Don't use it when invoking other components of the build system | 
|  | # to avoid confusing and hard to trace inconsistencies in error messages | 
|  | # and logs, generated Makefiles, etc. compared to when users invoke these | 
|  | # components directly. | 
|  | # Note "normalization" is different from canonicalization, see os.path. | 
|  | canonical_zephyr_base = os.path.realpath(ZEPHYR_BASE) | 
|  |  | 
|  | sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts/")) | 
|  |  | 
|  | import scl | 
|  | import expr_parser | 
|  |  | 
|  | logger = logging.getLogger('twister') | 
|  | logger.setLevel(logging.DEBUG) | 
|  |  | 
|  |  | 
|  | class ExecutionCounter(object): | 
|  | def __init__(self, total=0): | 
|  | self._done = Value('i', 0) | 
|  | self._passed = Value('i', 0) | 
|  | self._skipped_configs = Value('i', 0) | 
|  | self._skipped_runtime = Value('i', 0) | 
|  | self._skipped_cases = Value('i', 0) | 
|  | self._error = Value('i', 0) | 
|  | self._failed = Value('i', 0) | 
|  | self._total = Value('i', total) | 
|  | self._cases = Value('i', 0) | 
|  |  | 
|  |  | 
|  | self.lock = Lock() | 
|  |  | 
|  | @property | 
|  | def cases(self): | 
|  | with self._cases.get_lock(): | 
|  | return self._cases.value | 
|  |  | 
|  | @cases.setter | 
|  | def cases(self, value): | 
|  | with self._cases.get_lock(): | 
|  | self._cases.value = value | 
|  |  | 
|  | @property | 
|  | def skipped_cases(self): | 
|  | with self._skipped_cases.get_lock(): | 
|  | return self._skipped_cases.value | 
|  |  | 
|  | @skipped_cases.setter | 
|  | def skipped_cases(self, value): | 
|  | with self._skipped_cases.get_lock(): | 
|  | self._skipped_cases.value = value | 
|  |  | 
|  | @property | 
|  | def error(self): | 
|  | with self._error.get_lock(): | 
|  | return self._error.value | 
|  |  | 
|  | @error.setter | 
|  | def error(self, value): | 
|  | with self._error.get_lock(): | 
|  | self._error.value = value | 
|  |  | 
|  | @property | 
|  | def done(self): | 
|  | with self._done.get_lock(): | 
|  | return self._done.value | 
|  |  | 
|  | @done.setter | 
|  | def done(self, value): | 
|  | with self._done.get_lock(): | 
|  | self._done.value = value | 
|  |  | 
|  | @property | 
|  | def passed(self): | 
|  | with self._passed.get_lock(): | 
|  | return self._passed.value | 
|  |  | 
|  | @passed.setter | 
|  | def passed(self, value): | 
|  | with self._passed.get_lock(): | 
|  | self._passed.value = value | 
|  |  | 
|  | @property | 
|  | def skipped_configs(self): | 
|  | with self._skipped_configs.get_lock(): | 
|  | return self._skipped_configs.value | 
|  |  | 
|  | @skipped_configs.setter | 
|  | def skipped_configs(self, value): | 
|  | with self._skipped_configs.get_lock(): | 
|  | self._skipped_configs.value = value | 
|  |  | 
|  | @property | 
|  | def skipped_runtime(self): | 
|  | with self._skipped_runtime.get_lock(): | 
|  | return self._skipped_runtime.value | 
|  |  | 
|  | @skipped_runtime.setter | 
|  | def skipped_runtime(self, value): | 
|  | with self._skipped_runtime.get_lock(): | 
|  | self._skipped_runtime.value = value | 
|  |  | 
|  | @property | 
|  | def failed(self): | 
|  | with self._failed.get_lock(): | 
|  | return self._failed.value | 
|  |  | 
|  | @failed.setter | 
|  | def failed(self, value): | 
|  | with self._failed.get_lock(): | 
|  | self._failed.value = value | 
|  |  | 
|  | @property | 
|  | def total(self): | 
|  | with self._total.get_lock(): | 
|  | return self._total.value | 
|  |  | 
|  | class CMakeCacheEntry: | 
|  | '''Represents a CMake cache entry. | 
|  |  | 
|  | This class understands the type system in a CMakeCache.txt, and | 
|  | converts the following cache types to Python types: | 
|  |  | 
|  | Cache Type    Python type | 
|  | ----------    ------------------------------------------- | 
|  | FILEPATH      str | 
|  | PATH          str | 
|  | STRING        str OR list of str (if ';' is in the value) | 
|  | BOOL          bool | 
|  | INTERNAL      str OR list of str (if ';' is in the value) | 
|  | ----------    ------------------------------------------- | 
|  | ''' | 
|  |  | 
|  | # Regular expression for a cache entry. | 
|  | # | 
|  | # CMake variable names can include escape characters, allowing a | 
|  | # wider set of names than is easy to match with a regular | 
|  | # expression. To be permissive here, use a non-greedy match up to | 
|  | # the first colon (':'). This breaks if the variable name has a | 
|  | # colon inside, but it's good enough. | 
|  | CACHE_ENTRY = re.compile( | 
|  | r'''(?P<name>.*?)                               # name | 
|  | :(?P<type>FILEPATH|PATH|STRING|BOOL|INTERNAL)  # type | 
|  | =(?P<value>.*)                                 # value | 
|  | ''', re.X) | 
|  |  | 
|  | @classmethod | 
|  | def _to_bool(cls, val): | 
|  | # Convert a CMake BOOL string into a Python bool. | 
|  | # | 
|  | #   "True if the constant is 1, ON, YES, TRUE, Y, or a | 
|  | #   non-zero number. False if the constant is 0, OFF, NO, | 
|  | #   FALSE, N, IGNORE, NOTFOUND, the empty string, or ends in | 
|  | #   the suffix -NOTFOUND. Named boolean constants are | 
|  | #   case-insensitive. If the argument is not one of these | 
|  | #   constants, it is treated as a variable." | 
|  | # | 
|  | # https://cmake.org/cmake/help/v3.0/command/if.html | 
|  | val = val.upper() | 
|  | if val in ('ON', 'YES', 'TRUE', 'Y'): | 
|  | return 1 | 
|  | elif val in ('OFF', 'NO', 'FALSE', 'N', 'IGNORE', 'NOTFOUND', ''): | 
|  | return 0 | 
|  | elif val.endswith('-NOTFOUND'): | 
|  | return 0 | 
|  | else: | 
|  | try: | 
|  | v = int(val) | 
|  | return v != 0 | 
|  | except ValueError as exc: | 
|  | raise ValueError('invalid bool {}'.format(val)) from exc | 
|  |  | 
|  | @classmethod | 
|  | def from_line(cls, line, line_no): | 
|  | # Comments can only occur at the beginning of a line. | 
|  | # (The value of an entry could contain a comment character). | 
|  | if line.startswith('//') or line.startswith('#'): | 
|  | return None | 
|  |  | 
|  | # Whitespace-only lines do not contain cache entries. | 
|  | if not line.strip(): | 
|  | return None | 
|  |  | 
|  | m = cls.CACHE_ENTRY.match(line) | 
|  | if not m: | 
|  | return None | 
|  |  | 
|  | name, type_, value = (m.group(g) for g in ('name', 'type', 'value')) | 
|  | if type_ == 'BOOL': | 
|  | try: | 
|  | value = cls._to_bool(value) | 
|  | except ValueError as exc: | 
|  | args = exc.args + ('on line {}: {}'.format(line_no, line),) | 
|  | raise ValueError(args) from exc | 
|  | elif type_ in ['STRING', 'INTERNAL']: | 
|  | # If the value is a CMake list (i.e. is a string which | 
|  | # contains a ';'), convert to a Python list. | 
|  | if ';' in value: | 
|  | value = value.split(';') | 
|  |  | 
|  | return CMakeCacheEntry(name, value) | 
|  |  | 
|  | def __init__(self, name, value): | 
|  | self.name = name | 
|  | self.value = value | 
|  |  | 
|  | def __str__(self): | 
|  | fmt = 'CMakeCacheEntry(name={}, value={})' | 
|  | return fmt.format(self.name, self.value) | 
|  |  | 
|  |  | 
|  | class CMakeCache: | 
|  | '''Parses and represents a CMake cache file.''' | 
|  |  | 
|  | @staticmethod | 
|  | def from_file(cache_file): | 
|  | return CMakeCache(cache_file) | 
|  |  | 
|  | def __init__(self, cache_file): | 
|  | self.cache_file = cache_file | 
|  | self.load(cache_file) | 
|  |  | 
|  | def load(self, cache_file): | 
|  | entries = [] | 
|  | with open(cache_file, 'r') as cache: | 
|  | for line_no, line in enumerate(cache): | 
|  | entry = CMakeCacheEntry.from_line(line, line_no) | 
|  | if entry: | 
|  | entries.append(entry) | 
|  | self._entries = OrderedDict((e.name, e) for e in entries) | 
|  |  | 
|  | def get(self, name, default=None): | 
|  | entry = self._entries.get(name) | 
|  | if entry is not None: | 
|  | return entry.value | 
|  | else: | 
|  | return default | 
|  |  | 
|  | def get_list(self, name, default=None): | 
|  | if default is None: | 
|  | default = [] | 
|  | entry = self._entries.get(name) | 
|  | if entry is not None: | 
|  | value = entry.value | 
|  | if isinstance(value, list): | 
|  | return value | 
|  | elif isinstance(value, str): | 
|  | return [value] if value else [] | 
|  | else: | 
|  | msg = 'invalid value {} type {}' | 
|  | raise RuntimeError(msg.format(value, type(value))) | 
|  | else: | 
|  | return default | 
|  |  | 
|  | def __contains__(self, name): | 
|  | return name in self._entries | 
|  |  | 
|  | def __getitem__(self, name): | 
|  | return self._entries[name].value | 
|  |  | 
|  | def __setitem__(self, name, entry): | 
|  | if not isinstance(entry, CMakeCacheEntry): | 
|  | msg = 'improper type {} for value {}, expecting CMakeCacheEntry' | 
|  | raise TypeError(msg.format(type(entry), entry)) | 
|  | self._entries[name] = entry | 
|  |  | 
|  | def __delitem__(self, name): | 
|  | del self._entries[name] | 
|  |  | 
|  | def __iter__(self): | 
|  | return iter(self._entries.values()) | 
|  |  | 
|  |  | 
|  | class TwisterException(Exception): | 
|  | pass | 
|  |  | 
|  |  | 
|  | class TwisterRuntimeError(TwisterException): | 
|  | pass | 
|  |  | 
|  |  | 
|  | class ConfigurationError(TwisterException): | 
|  | def __init__(self, cfile, message): | 
|  | TwisterException.__init__(self, cfile + ": " + message) | 
|  |  | 
|  |  | 
|  | class BuildError(TwisterException): | 
|  | pass | 
|  |  | 
|  |  | 
|  | class ExecutionError(TwisterException): | 
|  | pass | 
|  |  | 
|  |  | 
|  | class HarnessImporter: | 
|  |  | 
|  | def __init__(self, name): | 
|  | sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts/pylib/twister")) | 
|  | module = __import__("harness") | 
|  | if name: | 
|  | my_class = getattr(module, name) | 
|  | else: | 
|  | my_class = getattr(module, "Test") | 
|  |  | 
|  | self.instance = my_class() | 
|  |  | 
|  |  | 
|  | class Handler: | 
|  | def __init__(self, instance, type_str="build"): | 
|  | """Constructor | 
|  |  | 
|  | """ | 
|  | self.state = "waiting" | 
|  | self.run = False | 
|  | self.duration = 0 | 
|  | self.type_str = type_str | 
|  |  | 
|  | self.binary = None | 
|  | self.pid_fn = None | 
|  | self.call_make_run = False | 
|  |  | 
|  | self.name = instance.name | 
|  | self.instance = instance | 
|  | self.timeout = instance.testcase.timeout | 
|  | self.sourcedir = instance.testcase.source_dir | 
|  | self.build_dir = instance.build_dir | 
|  | self.log = os.path.join(self.build_dir, "handler.log") | 
|  | self.returncode = 0 | 
|  | self.set_state("running", self.duration) | 
|  | self.generator = None | 
|  | self.generator_cmd = None | 
|  |  | 
|  | self.args = [] | 
|  |  | 
|  | def set_state(self, state, duration): | 
|  | self.state = state | 
|  | self.duration = duration | 
|  |  | 
|  | def get_state(self): | 
|  | ret = (self.state, self.duration) | 
|  | return ret | 
|  |  | 
|  | def record(self, harness): | 
|  | if harness.recording: | 
|  | filename = os.path.join(self.build_dir, "recording.csv") | 
|  | with open(filename, "at") as csvfile: | 
|  | cw = csv.writer(csvfile, harness.fieldnames, lineterminator=os.linesep) | 
|  | cw.writerow(harness.fieldnames) | 
|  | for instance in harness.recording: | 
|  | cw.writerow(instance) | 
|  |  | 
|  |  | 
|  | class BinaryHandler(Handler): | 
|  | def __init__(self, instance, type_str): | 
|  | """Constructor | 
|  |  | 
|  | @param instance Test Instance | 
|  | """ | 
|  | super().__init__(instance, type_str) | 
|  |  | 
|  | self.terminated = False | 
|  | self.call_west_flash = False | 
|  |  | 
|  | # Tool options | 
|  | self.valgrind = False | 
|  | self.lsan = False | 
|  | self.asan = False | 
|  | self.ubsan = False | 
|  | self.coverage = False | 
|  |  | 
|  | def try_kill_process_by_pid(self): | 
|  | if self.pid_fn: | 
|  | pid = int(open(self.pid_fn).read()) | 
|  | os.unlink(self.pid_fn) | 
|  | self.pid_fn = None  # clear so we don't try to kill the binary twice | 
|  | try: | 
|  | os.kill(pid, signal.SIGTERM) | 
|  | except ProcessLookupError: | 
|  | pass | 
|  |  | 
|  | def terminate(self, proc): | 
|  | # encapsulate terminate functionality so we do it consistently where ever | 
|  | # we might want to terminate the proc.  We need try_kill_process_by_pid | 
|  | # because of both how newer ninja (1.6.0 or greater) and .NET / renode | 
|  | # work.  Newer ninja's don't seem to pass SIGTERM down to the children | 
|  | # so we need to use try_kill_process_by_pid. | 
|  | for child in psutil.Process(proc.pid).children(recursive=True): | 
|  | try: | 
|  | os.kill(child.pid, signal.SIGTERM) | 
|  | except ProcessLookupError: | 
|  | pass | 
|  | proc.terminate() | 
|  | # sleep for a while before attempting to kill | 
|  | time.sleep(0.5) | 
|  | proc.kill() | 
|  | self.terminated = True | 
|  |  | 
|  | def _output_reader(self, proc): | 
|  | self.line = proc.stdout.readline() | 
|  |  | 
|  | def _output_handler(self, proc, harness): | 
|  | if harness.is_pytest: | 
|  | harness.handle(None) | 
|  | return | 
|  |  | 
|  | log_out_fp = open(self.log, "wt") | 
|  | timeout_extended = False | 
|  | timeout_time = time.time() + self.timeout | 
|  | while True: | 
|  | this_timeout = timeout_time - time.time() | 
|  | if this_timeout < 0: | 
|  | break | 
|  | reader_t = threading.Thread(target=self._output_reader, args=(proc,), daemon=True) | 
|  | reader_t.start() | 
|  | reader_t.join(this_timeout) | 
|  | if not reader_t.is_alive(): | 
|  | line = self.line | 
|  | logger.debug("OUTPUT: {0}".format(line.decode('utf-8').rstrip())) | 
|  | log_out_fp.write(line.decode('utf-8')) | 
|  | log_out_fp.flush() | 
|  | harness.handle(line.decode('utf-8').rstrip()) | 
|  | if harness.state: | 
|  | if not timeout_extended or harness.capture_coverage: | 
|  | timeout_extended = True | 
|  | if harness.capture_coverage: | 
|  | timeout_time = time.time() + 30 | 
|  | else: | 
|  | timeout_time = time.time() + 2 | 
|  | else: | 
|  | reader_t.join(0) | 
|  | break | 
|  | try: | 
|  | # POSIX arch based ztests end on their own, | 
|  | # so let's give it up to 100ms to do so | 
|  | proc.wait(0.1) | 
|  | except subprocess.TimeoutExpired: | 
|  | self.terminate(proc) | 
|  |  | 
|  | log_out_fp.close() | 
|  |  | 
|  | def handle(self): | 
|  |  | 
|  | harness_name = self.instance.testcase.harness.capitalize() | 
|  | harness_import = HarnessImporter(harness_name) | 
|  | harness = harness_import.instance | 
|  | harness.configure(self.instance) | 
|  |  | 
|  | if self.call_make_run: | 
|  | command = [self.generator_cmd, "run"] | 
|  | elif self.call_west_flash: | 
|  | command = ["west", "flash", "--skip-rebuild", "-d", self.build_dir] | 
|  | else: | 
|  | command = [self.binary] | 
|  |  | 
|  | run_valgrind = False | 
|  | if self.valgrind and shutil.which("valgrind"): | 
|  | command = ["valgrind", "--error-exitcode=2", | 
|  | "--leak-check=full", | 
|  | "--suppressions=" + ZEPHYR_BASE + "/scripts/valgrind.supp", | 
|  | "--log-file=" + self.build_dir + "/valgrind.log" | 
|  | ] + command | 
|  | run_valgrind = True | 
|  |  | 
|  | logger.debug("Spawning process: " + | 
|  | " ".join(shlex.quote(word) for word in command) + os.linesep + | 
|  | "in directory: " + self.build_dir) | 
|  |  | 
|  | start_time = time.time() | 
|  |  | 
|  | env = os.environ.copy() | 
|  | if self.asan: | 
|  | env["ASAN_OPTIONS"] = "log_path=stdout:" + \ | 
|  | env.get("ASAN_OPTIONS", "") | 
|  | if not self.lsan: | 
|  | env["ASAN_OPTIONS"] += "detect_leaks=0" | 
|  |  | 
|  | if self.ubsan: | 
|  | env["UBSAN_OPTIONS"] = "log_path=stdout:halt_on_error=1:" + \ | 
|  | env.get("UBSAN_OPTIONS", "") | 
|  |  | 
|  | with subprocess.Popen(command, stdout=subprocess.PIPE, | 
|  | stderr=subprocess.PIPE, cwd=self.build_dir, env=env) as proc: | 
|  | logger.debug("Spawning BinaryHandler Thread for %s" % self.name) | 
|  | t = threading.Thread(target=self._output_handler, args=(proc, harness,), daemon=True) | 
|  | t.start() | 
|  | t.join() | 
|  | if t.is_alive(): | 
|  | self.terminate(proc) | 
|  | t.join() | 
|  | proc.wait() | 
|  | self.returncode = proc.returncode | 
|  | self.try_kill_process_by_pid() | 
|  |  | 
|  | handler_time = time.time() - start_time | 
|  |  | 
|  | if self.coverage: | 
|  | subprocess.call(["GCOV_PREFIX=" + self.build_dir, | 
|  | "gcov", self.sourcedir, "-b", "-s", self.build_dir], shell=True) | 
|  |  | 
|  | # FIXME: This is needed when killing the simulator, the console is | 
|  | # garbled and needs to be reset. Did not find a better way to do that. | 
|  | if sys.stdout.isatty(): | 
|  | subprocess.call(["stty", "sane"]) | 
|  |  | 
|  | if harness.is_pytest: | 
|  | harness.pytest_run(self.log) | 
|  | self.instance.results = harness.tests | 
|  |  | 
|  | if not self.terminated and self.returncode != 0: | 
|  | # When a process is killed, the default handler returns 128 + SIGTERM | 
|  | # so in that case the return code itself is not meaningful | 
|  | self.set_state("failed", handler_time) | 
|  | self.instance.reason = "Failed" | 
|  | elif run_valgrind and self.returncode == 2: | 
|  | self.set_state("failed", handler_time) | 
|  | self.instance.reason = "Valgrind error" | 
|  | elif harness.state: | 
|  | self.set_state(harness.state, handler_time) | 
|  | if harness.state == "failed": | 
|  | self.instance.reason = "Failed" | 
|  | else: | 
|  | self.set_state("timeout", handler_time) | 
|  | self.instance.reason = "Timeout" | 
|  |  | 
|  | self.record(harness) | 
|  |  | 
|  |  | 
|  | class DeviceHandler(Handler): | 
|  |  | 
|  | def __init__(self, instance, type_str): | 
|  | """Constructor | 
|  |  | 
|  | @param instance Test Instance | 
|  | """ | 
|  | super().__init__(instance, type_str) | 
|  |  | 
|  | self.suite = None | 
|  |  | 
|  | def monitor_serial(self, ser, halt_fileno, harness): | 
|  | if harness.is_pytest: | 
|  | harness.handle(None) | 
|  | return | 
|  |  | 
|  | log_out_fp = open(self.log, "wt") | 
|  |  | 
|  | ser_fileno = ser.fileno() | 
|  | readlist = [halt_fileno, ser_fileno] | 
|  |  | 
|  | if self.coverage: | 
|  | # Set capture_coverage to True to indicate that right after | 
|  | # test results we should get coverage data, otherwise we exit | 
|  | # from the test. | 
|  | harness.capture_coverage = True | 
|  |  | 
|  | ser.flush() | 
|  |  | 
|  | while ser.isOpen(): | 
|  | readable, _, _ = select.select(readlist, [], [], self.timeout) | 
|  |  | 
|  | if halt_fileno in readable: | 
|  | logger.debug('halted') | 
|  | ser.close() | 
|  | break | 
|  | if ser_fileno not in readable: | 
|  | continue  # Timeout. | 
|  |  | 
|  | serial_line = None | 
|  | try: | 
|  | serial_line = ser.readline() | 
|  | except TypeError: | 
|  | pass | 
|  | except serial.SerialException: | 
|  | ser.close() | 
|  | break | 
|  |  | 
|  | # Just because ser_fileno has data doesn't mean an entire line | 
|  | # is available yet. | 
|  | if serial_line: | 
|  | sl = serial_line.decode('utf-8', 'ignore').lstrip() | 
|  | logger.debug("DEVICE: {0}".format(sl.rstrip())) | 
|  |  | 
|  | log_out_fp.write(sl) | 
|  | log_out_fp.flush() | 
|  | harness.handle(sl.rstrip()) | 
|  |  | 
|  | if harness.state: | 
|  | if not harness.capture_coverage: | 
|  | ser.close() | 
|  | break | 
|  |  | 
|  | log_out_fp.close() | 
|  |  | 
|  | def device_is_available(self, instance): | 
|  | device = instance.platform.name | 
|  | fixture = instance.testcase.harness_config.get("fixture") | 
|  | for d in self.suite.duts: | 
|  | if fixture and fixture not in d.fixtures: | 
|  | continue | 
|  | if d.platform != device or not (d.serial or d.serial_pty): | 
|  | continue | 
|  | d.lock.acquire() | 
|  | avail = False | 
|  | if d.available: | 
|  | d.available = 0 | 
|  | d.counter += 1 | 
|  | avail = True | 
|  | d.lock.release() | 
|  | if avail: | 
|  | return d | 
|  |  | 
|  | return None | 
|  |  | 
|  | def make_device_available(self, serial): | 
|  | for d in self.suite.duts: | 
|  | if d.serial == serial or d.serial_pty: | 
|  | d.available = 1 | 
|  |  | 
|  | @staticmethod | 
|  | def run_custom_script(script, timeout): | 
|  | with subprocess.Popen(script, stderr=subprocess.PIPE, stdout=subprocess.PIPE) as proc: | 
|  | try: | 
|  | stdout, _ = proc.communicate(timeout=timeout) | 
|  | logger.debug(stdout.decode()) | 
|  |  | 
|  | except subprocess.TimeoutExpired: | 
|  | proc.kill() | 
|  | proc.communicate() | 
|  | logger.error("{} timed out".format(script)) | 
|  |  | 
|  | def handle(self): | 
|  | out_state = "failed" | 
|  | runner = None | 
|  |  | 
|  | hardware = self.device_is_available(self.instance) | 
|  | while not hardware: | 
|  | logger.debug("Waiting for device {} to become available".format(self.instance.platform.name)) | 
|  | time.sleep(1) | 
|  | hardware = self.device_is_available(self.instance) | 
|  |  | 
|  | runner = hardware.runner or self.suite.west_runner | 
|  | serial_pty = hardware.serial_pty | 
|  |  | 
|  | ser_pty_process = None | 
|  | if serial_pty: | 
|  | master, slave = pty.openpty() | 
|  | try: | 
|  | ser_pty_process = subprocess.Popen(re.split(',| ', serial_pty), stdout=master, stdin=master, stderr=master) | 
|  | except subprocess.CalledProcessError as error: | 
|  | logger.error("Failed to run subprocess {}, error {}".format(serial_pty, error.output)) | 
|  | return | 
|  |  | 
|  | serial_device = os.ttyname(slave) | 
|  | else: | 
|  | serial_device = hardware.serial | 
|  |  | 
|  | logger.debug("Using serial device {}".format(serial_device)) | 
|  |  | 
|  | if (self.suite.west_flash is not None) or runner: | 
|  | command = ["west", "flash", "--skip-rebuild", "-d", self.build_dir] | 
|  | command_extra_args = [] | 
|  |  | 
|  | # There are three ways this option is used. | 
|  | # 1) bare: --west-flash | 
|  | #    This results in options.west_flash == [] | 
|  | # 2) with a value: --west-flash="--board-id=42" | 
|  | #    This results in options.west_flash == "--board-id=42" | 
|  | # 3) Multiple values: --west-flash="--board-id=42,--erase" | 
|  | #    This results in options.west_flash == "--board-id=42 --erase" | 
|  | if self.suite.west_flash and self.suite.west_flash != []: | 
|  | command_extra_args.extend(self.suite.west_flash.split(',')) | 
|  |  | 
|  | if runner: | 
|  | command.append("--runner") | 
|  | command.append(runner) | 
|  |  | 
|  | board_id = hardware.probe_id or hardware.id | 
|  | product = hardware.product | 
|  | if board_id is not None: | 
|  | if runner == "pyocd": | 
|  | command_extra_args.append("--board-id") | 
|  | command_extra_args.append(board_id) | 
|  | elif runner == "nrfjprog": | 
|  | command_extra_args.append("--snr") | 
|  | command_extra_args.append(board_id) | 
|  | elif runner == "openocd" and product == "STM32 STLink": | 
|  | command_extra_args.append("--cmd-pre-init") | 
|  | command_extra_args.append("hla_serial %s" % (board_id)) | 
|  | elif runner == "openocd" and product == "STLINK-V3": | 
|  | command_extra_args.append("--cmd-pre-init") | 
|  | command_extra_args.append("hla_serial %s" % (board_id)) | 
|  | elif runner == "openocd" and product == "EDBG CMSIS-DAP": | 
|  | command_extra_args.append("--cmd-pre-init") | 
|  | command_extra_args.append("cmsis_dap_serial %s" % (board_id)) | 
|  | elif runner == "jlink": | 
|  | command.append("--tool-opt=-SelectEmuBySN  %s" % (board_id)) | 
|  |  | 
|  | if command_extra_args != []: | 
|  | command.append('--') | 
|  | command.extend(command_extra_args) | 
|  | else: | 
|  | command = [self.generator_cmd, "-C", self.build_dir, "flash"] | 
|  |  | 
|  | pre_script = hardware.pre_script | 
|  | post_flash_script = hardware.post_flash_script | 
|  | post_script = hardware.post_script | 
|  |  | 
|  | if pre_script: | 
|  | self.run_custom_script(pre_script, 30) | 
|  |  | 
|  | try: | 
|  | ser = serial.Serial( | 
|  | serial_device, | 
|  | baudrate=115200, | 
|  | parity=serial.PARITY_NONE, | 
|  | stopbits=serial.STOPBITS_ONE, | 
|  | bytesize=serial.EIGHTBITS, | 
|  | timeout=self.timeout | 
|  | ) | 
|  | except serial.SerialException as e: | 
|  | self.set_state("failed", 0) | 
|  | self.instance.reason = "Failed" | 
|  | logger.error("Serial device error: %s" % (str(e))) | 
|  |  | 
|  | if serial_pty and ser_pty_process: | 
|  | ser_pty_process.terminate() | 
|  | outs, errs = ser_pty_process.communicate() | 
|  | logger.debug("Process {} terminated outs: {} errs {}".format(serial_pty, outs, errs)) | 
|  |  | 
|  | self.make_device_available(serial_device) | 
|  | return | 
|  |  | 
|  | ser.flush() | 
|  |  | 
|  | harness_name = self.instance.testcase.harness.capitalize() | 
|  | harness_import = HarnessImporter(harness_name) | 
|  | harness = harness_import.instance | 
|  | harness.configure(self.instance) | 
|  | read_pipe, write_pipe = os.pipe() | 
|  | start_time = time.time() | 
|  |  | 
|  | t = threading.Thread(target=self.monitor_serial, daemon=True, | 
|  | args=(ser, read_pipe, harness)) | 
|  | t.start() | 
|  |  | 
|  | d_log = "{}/device.log".format(self.instance.build_dir) | 
|  | logger.debug('Flash command: %s', command) | 
|  | try: | 
|  | stdout = stderr = None | 
|  | with subprocess.Popen(command, stderr=subprocess.PIPE, stdout=subprocess.PIPE) as proc: | 
|  | try: | 
|  | (stdout, stderr) = proc.communicate(timeout=30) | 
|  | logger.debug(stdout.decode()) | 
|  |  | 
|  | if proc.returncode != 0: | 
|  | self.instance.reason = "Device issue (Flash?)" | 
|  | with open(d_log, "w") as dlog_fp: | 
|  | dlog_fp.write(stderr.decode()) | 
|  | os.write(write_pipe, b'x')  # halt the thread | 
|  | out_state = "flash_error" | 
|  | except subprocess.TimeoutExpired: | 
|  | proc.kill() | 
|  | (stdout, stderr) = proc.communicate() | 
|  | self.instance.reason = "Device issue (Timeout)" | 
|  |  | 
|  | with open(d_log, "w") as dlog_fp: | 
|  | dlog_fp.write(stderr.decode()) | 
|  |  | 
|  | except subprocess.CalledProcessError: | 
|  | os.write(write_pipe, b'x')  # halt the thread | 
|  |  | 
|  | if post_flash_script: | 
|  | self.run_custom_script(post_flash_script, 30) | 
|  |  | 
|  | t.join(self.timeout) | 
|  | if t.is_alive(): | 
|  | logger.debug("Timed out while monitoring serial output on {}".format(self.instance.platform.name)) | 
|  | out_state = "timeout" | 
|  |  | 
|  | if ser.isOpen(): | 
|  | ser.close() | 
|  |  | 
|  | if serial_pty: | 
|  | ser_pty_process.terminate() | 
|  | outs, errs = ser_pty_process.communicate() | 
|  | logger.debug("Process {} terminated outs: {} errs {}".format(serial_pty, outs, errs)) | 
|  |  | 
|  | os.close(write_pipe) | 
|  | os.close(read_pipe) | 
|  |  | 
|  | handler_time = time.time() - start_time | 
|  |  | 
|  | if out_state in ["timeout", "flash_error"]: | 
|  | for c in self.instance.testcase.cases: | 
|  | if c not in harness.tests: | 
|  | harness.tests[c] = "BLOCK" | 
|  |  | 
|  | if out_state == "timeout": | 
|  | self.instance.reason = "Timeout" | 
|  | elif out_state == "flash_error": | 
|  | self.instance.reason = "Flash error" | 
|  |  | 
|  | if harness.is_pytest: | 
|  | harness.pytest_run(self.log) | 
|  | self.instance.results = harness.tests | 
|  |  | 
|  | # sometimes a test instance hasn't been executed successfully with an | 
|  | # empty dictionary results, in order to include it into final report, | 
|  | # so fill the results as BLOCK | 
|  | if self.instance.results == {}: | 
|  | for k in self.instance.testcase.cases: | 
|  | self.instance.results[k] = 'BLOCK' | 
|  |  | 
|  | if harness.state: | 
|  | self.set_state(harness.state, handler_time) | 
|  | if harness.state == "failed": | 
|  | self.instance.reason = "Failed" | 
|  | else: | 
|  | self.set_state(out_state, handler_time) | 
|  |  | 
|  | if post_script: | 
|  | self.run_custom_script(post_script, 30) | 
|  |  | 
|  | self.make_device_available(serial_device) | 
|  | self.record(harness) | 
|  |  | 
|  |  | 
|  | class QEMUHandler(Handler): | 
|  | """Spawns a thread to monitor QEMU output from pipes | 
|  |  | 
|  | We pass QEMU_PIPE to 'make run' and monitor the pipes for output. | 
|  | We need to do this as once qemu starts, it runs forever until killed. | 
|  | Test cases emit special messages to the console as they run, we check | 
|  | for these to collect whether the test passed or failed. | 
|  | """ | 
|  |  | 
|  | def __init__(self, instance, type_str): | 
|  | """Constructor | 
|  |  | 
|  | @param instance Test instance | 
|  | """ | 
|  |  | 
|  | super().__init__(instance, type_str) | 
|  | self.fifo_fn = os.path.join(instance.build_dir, "qemu-fifo") | 
|  |  | 
|  | self.pid_fn = os.path.join(instance.build_dir, "qemu.pid") | 
|  |  | 
|  | if "ignore_qemu_crash" in instance.testcase.tags: | 
|  | self.ignore_qemu_crash = True | 
|  | self.ignore_unexpected_eof = True | 
|  | else: | 
|  | self.ignore_qemu_crash = False | 
|  | self.ignore_unexpected_eof = False | 
|  |  | 
|  | @staticmethod | 
|  | def _get_cpu_time(pid): | 
|  | """get process CPU time. | 
|  |  | 
|  | The guest virtual time in QEMU icount mode isn't host time and | 
|  | it's maintained by counting guest instructions, so we use QEMU | 
|  | process exection time to mostly simulate the time of guest OS. | 
|  | """ | 
|  | proc = psutil.Process(pid) | 
|  | cpu_time = proc.cpu_times() | 
|  | return cpu_time.user + cpu_time.system | 
|  |  | 
|  | @staticmethod | 
|  | def _thread(handler, timeout, outdir, logfile, fifo_fn, pid_fn, results, harness, | 
|  | ignore_unexpected_eof=False): | 
|  | fifo_in = fifo_fn + ".in" | 
|  | fifo_out = fifo_fn + ".out" | 
|  |  | 
|  | # These in/out nodes are named from QEMU's perspective, not ours | 
|  | if os.path.exists(fifo_in): | 
|  | os.unlink(fifo_in) | 
|  | os.mkfifo(fifo_in) | 
|  | if os.path.exists(fifo_out): | 
|  | os.unlink(fifo_out) | 
|  | os.mkfifo(fifo_out) | 
|  |  | 
|  | # We don't do anything with out_fp but we need to open it for | 
|  | # writing so that QEMU doesn't block, due to the way pipes work | 
|  | out_fp = open(fifo_in, "wb") | 
|  | # Disable internal buffering, we don't | 
|  | # want read() or poll() to ever block if there is data in there | 
|  | in_fp = open(fifo_out, "rb", buffering=0) | 
|  | log_out_fp = open(logfile, "wt") | 
|  |  | 
|  | start_time = time.time() | 
|  | timeout_time = start_time + timeout | 
|  | p = select.poll() | 
|  | p.register(in_fp, select.POLLIN) | 
|  | out_state = None | 
|  |  | 
|  | line = "" | 
|  | timeout_extended = False | 
|  |  | 
|  | pid = 0 | 
|  | if os.path.exists(pid_fn): | 
|  | pid = int(open(pid_fn).read()) | 
|  |  | 
|  | while True: | 
|  | this_timeout = int((timeout_time - time.time()) * 1000) | 
|  | if this_timeout < 0 or not p.poll(this_timeout): | 
|  | try: | 
|  | if pid and this_timeout > 0: | 
|  | #there's possibility we polled nothing because | 
|  | #of not enough CPU time scheduled by host for | 
|  | #QEMU process during p.poll(this_timeout) | 
|  | cpu_time = QEMUHandler._get_cpu_time(pid) | 
|  | if cpu_time < timeout and not out_state: | 
|  | timeout_time = time.time() + (timeout - cpu_time) | 
|  | continue | 
|  | except ProcessLookupError: | 
|  | out_state = "failed" | 
|  | break | 
|  |  | 
|  | if not out_state: | 
|  | out_state = "timeout" | 
|  | break | 
|  |  | 
|  | if pid == 0 and os.path.exists(pid_fn): | 
|  | pid = int(open(pid_fn).read()) | 
|  |  | 
|  | if harness.is_pytest: | 
|  | harness.handle(None) | 
|  | out_state = harness.state | 
|  | break | 
|  |  | 
|  | try: | 
|  | c = in_fp.read(1).decode("utf-8") | 
|  | except UnicodeDecodeError: | 
|  | # Test is writing something weird, fail | 
|  | out_state = "unexpected byte" | 
|  | break | 
|  |  | 
|  | if c == "": | 
|  | # EOF, this shouldn't happen unless QEMU crashes | 
|  | if not ignore_unexpected_eof: | 
|  | out_state = "unexpected eof" | 
|  | break | 
|  | line = line + c | 
|  | if c != "\n": | 
|  | continue | 
|  |  | 
|  | # line contains a full line of data output from QEMU | 
|  | log_out_fp.write(line) | 
|  | log_out_fp.flush() | 
|  | line = line.strip() | 
|  | logger.debug(f"QEMU ({pid}): {line}") | 
|  |  | 
|  | harness.handle(line) | 
|  | if harness.state: | 
|  | # if we have registered a fail make sure the state is not | 
|  | # overridden by a false success message coming from the | 
|  | # testsuite | 
|  | if out_state not in ['failed', 'unexpected eof', 'unexpected byte']: | 
|  | out_state = harness.state | 
|  |  | 
|  | # if we get some state, that means test is doing well, we reset | 
|  | # the timeout and wait for 2 more seconds to catch anything | 
|  | # printed late. We wait much longer if code | 
|  | # coverage is enabled since dumping this information can | 
|  | # take some time. | 
|  | if not timeout_extended or harness.capture_coverage: | 
|  | timeout_extended = True | 
|  | if harness.capture_coverage: | 
|  | timeout_time = time.time() + 30 | 
|  | else: | 
|  | timeout_time = time.time() + 2 | 
|  | line = "" | 
|  |  | 
|  | if harness.is_pytest: | 
|  | harness.pytest_run(logfile) | 
|  | out_state = harness.state | 
|  |  | 
|  | handler.record(harness) | 
|  |  | 
|  | handler_time = time.time() - start_time | 
|  | logger.debug(f"QEMU ({pid}) complete ({out_state}) after {handler_time} seconds") | 
|  |  | 
|  | if out_state == "timeout": | 
|  | handler.instance.reason = "Timeout" | 
|  | handler.set_state("failed", handler_time) | 
|  | elif out_state == "failed": | 
|  | handler.instance.reason = "Failed" | 
|  | handler.set_state("failed", handler_time) | 
|  | elif out_state in ['unexpected eof', 'unexpected byte']: | 
|  | handler.instance.reason = out_state | 
|  | handler.set_state("failed", handler_time) | 
|  | else: | 
|  | handler.set_state(out_state, handler_time) | 
|  |  | 
|  | log_out_fp.close() | 
|  | out_fp.close() | 
|  | in_fp.close() | 
|  | if pid: | 
|  | try: | 
|  | if pid: | 
|  | os.kill(pid, signal.SIGTERM) | 
|  | except ProcessLookupError: | 
|  | # Oh well, as long as it's dead! User probably sent Ctrl-C | 
|  | pass | 
|  |  | 
|  | os.unlink(fifo_in) | 
|  | os.unlink(fifo_out) | 
|  |  | 
|  | def handle(self): | 
|  | self.results = {} | 
|  | self.run = True | 
|  |  | 
|  | # We pass this to QEMU which looks for fifos with .in and .out | 
|  | # suffixes. | 
|  |  | 
|  | self.fifo_fn = os.path.join(self.instance.build_dir, "qemu-fifo") | 
|  | self.pid_fn = os.path.join(self.instance.build_dir, "qemu.pid") | 
|  |  | 
|  | if os.path.exists(self.pid_fn): | 
|  | os.unlink(self.pid_fn) | 
|  |  | 
|  | self.log_fn = self.log | 
|  |  | 
|  | harness_import = HarnessImporter(self.instance.testcase.harness.capitalize()) | 
|  | harness = harness_import.instance | 
|  | harness.configure(self.instance) | 
|  |  | 
|  | self.thread = threading.Thread(name=self.name, target=QEMUHandler._thread, | 
|  | args=(self, self.timeout, self.build_dir, | 
|  | self.log_fn, self.fifo_fn, | 
|  | self.pid_fn, self.results, harness, | 
|  | self.ignore_unexpected_eof)) | 
|  |  | 
|  | self.instance.results = harness.tests | 
|  | self.thread.daemon = True | 
|  | logger.debug("Spawning QEMUHandler Thread for %s" % self.name) | 
|  | self.thread.start() | 
|  | if sys.stdout.isatty(): | 
|  | subprocess.call(["stty", "sane"]) | 
|  |  | 
|  | logger.debug("Running %s (%s)" % (self.name, self.type_str)) | 
|  | command = [self.generator_cmd] | 
|  | command += ["-C", self.build_dir, "run"] | 
|  |  | 
|  | is_timeout = False | 
|  | qemu_pid = None | 
|  |  | 
|  | with subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=self.build_dir) as proc: | 
|  | logger.debug("Spawning QEMUHandler Thread for %s" % self.name) | 
|  |  | 
|  | try: | 
|  | proc.wait(self.timeout) | 
|  | except subprocess.TimeoutExpired: | 
|  | # sometimes QEMU can't handle SIGTERM signal correctly | 
|  | # in that case kill -9 QEMU process directly and leave | 
|  | # twister to judge testing result by console output | 
|  |  | 
|  | is_timeout = True | 
|  | if os.path.exists(self.pid_fn): | 
|  | qemu_pid = int(open(self.pid_fn).read()) | 
|  | try: | 
|  | os.kill(qemu_pid, signal.SIGKILL) | 
|  | except ProcessLookupError: | 
|  | pass | 
|  | proc.wait() | 
|  | if harness.state == "passed": | 
|  | self.returncode = 0 | 
|  | else: | 
|  | self.returncode = proc.returncode | 
|  | else: | 
|  | proc.terminate() | 
|  | proc.kill() | 
|  | self.returncode = proc.returncode | 
|  | else: | 
|  | if os.path.exists(self.pid_fn): | 
|  | qemu_pid = int(open(self.pid_fn).read()) | 
|  | logger.debug(f"No timeout, return code from QEMU ({qemu_pid}): {proc.returncode}") | 
|  | self.returncode = proc.returncode | 
|  |  | 
|  | # Need to wait for harness to finish processing | 
|  | # output from QEMU. Otherwise it might miss some | 
|  | # error messages. | 
|  | self.thread.join() | 
|  |  | 
|  | if os.path.exists(self.pid_fn): | 
|  | qemu_pid = int(open(self.pid_fn).read()) | 
|  | os.unlink(self.pid_fn) | 
|  |  | 
|  | logger.debug(f"return code from QEMU ({qemu_pid}): {self.returncode}") | 
|  |  | 
|  | if (self.returncode != 0 and not self.ignore_qemu_crash) or not harness.state: | 
|  | self.set_state("failed", 0) | 
|  | if is_timeout: | 
|  | self.instance.reason = "Timeout" | 
|  | else: | 
|  | self.instance.reason = "Exited with {}".format(self.returncode) | 
|  |  | 
|  | def get_fifo(self): | 
|  | return self.fifo_fn | 
|  |  | 
|  |  | 
|  | class SizeCalculator: | 
|  | alloc_sections = [ | 
|  | "bss", | 
|  | "noinit", | 
|  | "app_bss", | 
|  | "app_noinit", | 
|  | "ccm_bss", | 
|  | "ccm_noinit" | 
|  | ] | 
|  |  | 
|  | rw_sections = [ | 
|  | "datas", | 
|  | "initlevel", | 
|  | "exceptions", | 
|  | "initshell", | 
|  | "_static_thread_data_area", | 
|  | "k_timer_area", | 
|  | "k_mem_slab_area", | 
|  | "k_mem_pool_area", | 
|  | "sw_isr_table", | 
|  | "k_sem_area", | 
|  | "k_mutex_area", | 
|  | "app_shmem_regions", | 
|  | "_k_fifo_area", | 
|  | "_k_lifo_area", | 
|  | "k_stack_area", | 
|  | "k_msgq_area", | 
|  | "k_mbox_area", | 
|  | "k_pipe_area", | 
|  | "net_if_area", | 
|  | "net_if_dev_area", | 
|  | "net_l2_area", | 
|  | "net_l2_data", | 
|  | "k_queue_area", | 
|  | "_net_buf_pool_area", | 
|  | "app_datas", | 
|  | "kobject_data", | 
|  | "mmu_tables", | 
|  | "app_pad", | 
|  | "priv_stacks", | 
|  | "ccm_data", | 
|  | "usb_descriptor", | 
|  | "usb_data", "usb_bos_desc", | 
|  | "uart_mux", | 
|  | 'log_backends_sections', | 
|  | 'log_dynamic_sections', | 
|  | 'log_const_sections', | 
|  | "app_smem", | 
|  | 'shell_root_cmds_sections', | 
|  | 'log_const_sections', | 
|  | "font_entry_sections", | 
|  | "priv_stacks_noinit", | 
|  | "_GCOV_BSS_SECTION_NAME", | 
|  | "gcov", | 
|  | "nocache", | 
|  | "devices", | 
|  | "k_heap_area", | 
|  | ] | 
|  |  | 
|  | # These get copied into RAM only on non-XIP | 
|  | ro_sections = [ | 
|  | "rom_start", | 
|  | "text", | 
|  | "ctors", | 
|  | "init_array", | 
|  | "reset", | 
|  | "z_object_assignment_area", | 
|  | "rodata", | 
|  | "net_l2", | 
|  | "vector", | 
|  | "sw_isr_table", | 
|  | "settings_handler_static_area", | 
|  | "bt_l2cap_fixed_chan_area", | 
|  | "bt_l2cap_br_fixed_chan_area", | 
|  | "bt_gatt_service_static_area", | 
|  | "vectors", | 
|  | "net_socket_register_area", | 
|  | "net_ppp_proto", | 
|  | "shell_area", | 
|  | "tracing_backend_area", | 
|  | "ppp_protocol_handler_area", | 
|  | ] | 
|  |  | 
|  | def __init__(self, filename, extra_sections): | 
|  | """Constructor | 
|  |  | 
|  | @param filename Path to the output binary | 
|  | The <filename> is parsed by objdump to determine section sizes | 
|  | """ | 
|  | # Make sure this is an ELF binary | 
|  | with open(filename, "rb") as f: | 
|  | magic = f.read(4) | 
|  |  | 
|  | try: | 
|  | if magic != b'\x7fELF': | 
|  | raise TwisterRuntimeError("%s is not an ELF binary" % filename) | 
|  | except Exception as e: | 
|  | print(str(e)) | 
|  | sys.exit(2) | 
|  |  | 
|  | # Search for CONFIG_XIP in the ELF's list of symbols using NM and AWK. | 
|  | # GREP can not be used as it returns an error if the symbol is not | 
|  | # found. | 
|  | is_xip_command = "nm " + filename + \ | 
|  | " | awk '/CONFIG_XIP/ { print $3 }'" | 
|  | is_xip_output = subprocess.check_output( | 
|  | is_xip_command, shell=True, stderr=subprocess.STDOUT).decode( | 
|  | "utf-8").strip() | 
|  | try: | 
|  | if is_xip_output.endswith("no symbols"): | 
|  | raise TwisterRuntimeError("%s has no symbol information" % filename) | 
|  | except Exception as e: | 
|  | print(str(e)) | 
|  | sys.exit(2) | 
|  |  | 
|  | self.is_xip = (len(is_xip_output) != 0) | 
|  |  | 
|  | self.filename = filename | 
|  | self.sections = [] | 
|  | self.rom_size = 0 | 
|  | self.ram_size = 0 | 
|  | self.extra_sections = extra_sections | 
|  |  | 
|  | self._calculate_sizes() | 
|  |  | 
|  | def get_ram_size(self): | 
|  | """Get the amount of RAM the application will use up on the device | 
|  |  | 
|  | @return amount of RAM, in bytes | 
|  | """ | 
|  | return self.ram_size | 
|  |  | 
|  | def get_rom_size(self): | 
|  | """Get the size of the data that this application uses on device's flash | 
|  |  | 
|  | @return amount of ROM, in bytes | 
|  | """ | 
|  | return self.rom_size | 
|  |  | 
|  | def unrecognized_sections(self): | 
|  | """Get a list of sections inside the binary that weren't recognized | 
|  |  | 
|  | @return list of unrecognized section names | 
|  | """ | 
|  | slist = [] | 
|  | for v in self.sections: | 
|  | if not v["recognized"]: | 
|  | slist.append(v["name"]) | 
|  | return slist | 
|  |  | 
|  | def _calculate_sizes(self): | 
|  | """ Calculate RAM and ROM usage by section """ | 
|  | objdump_command = "objdump -h " + self.filename | 
|  | objdump_output = subprocess.check_output( | 
|  | objdump_command, shell=True).decode("utf-8").splitlines() | 
|  |  | 
|  | for line in objdump_output: | 
|  | words = line.split() | 
|  |  | 
|  | if not words:  # Skip lines that are too short | 
|  | continue | 
|  |  | 
|  | index = words[0] | 
|  | if not index[0].isdigit():  # Skip lines that do not start | 
|  | continue  # with a digit | 
|  |  | 
|  | name = words[1]  # Skip lines with section names | 
|  | if name[0] == '.':  # starting with '.' | 
|  | continue | 
|  |  | 
|  | # TODO this doesn't actually reflect the size in flash or RAM as | 
|  | # it doesn't include linker-imposed padding between sections. | 
|  | # It is close though. | 
|  | size = int(words[2], 16) | 
|  | if size == 0: | 
|  | continue | 
|  |  | 
|  | load_addr = int(words[4], 16) | 
|  | virt_addr = int(words[3], 16) | 
|  |  | 
|  | # Add section to memory use totals (for both non-XIP and XIP scenarios) | 
|  | # Unrecognized section names are not included in the calculations. | 
|  | recognized = True | 
|  | if name in SizeCalculator.alloc_sections: | 
|  | self.ram_size += size | 
|  | stype = "alloc" | 
|  | elif name in SizeCalculator.rw_sections: | 
|  | self.ram_size += size | 
|  | self.rom_size += size | 
|  | stype = "rw" | 
|  | elif name in SizeCalculator.ro_sections: | 
|  | self.rom_size += size | 
|  | if not self.is_xip: | 
|  | self.ram_size += size | 
|  | stype = "ro" | 
|  | else: | 
|  | stype = "unknown" | 
|  | if name not in self.extra_sections: | 
|  | recognized = False | 
|  |  | 
|  | self.sections.append({"name": name, "load_addr": load_addr, | 
|  | "size": size, "virt_addr": virt_addr, | 
|  | "type": stype, "recognized": recognized}) | 
|  |  | 
|  |  | 
|  |  | 
|  | class TwisterConfigParser: | 
|  | """Class to read test case files with semantic checking | 
|  | """ | 
|  |  | 
|  | def __init__(self, filename, schema): | 
|  | """Instantiate a new TwisterConfigParser object | 
|  |  | 
|  | @param filename Source .yaml file to read | 
|  | """ | 
|  | self.data = {} | 
|  | self.schema = schema | 
|  | self.filename = filename | 
|  | self.tests = {} | 
|  | self.common = {} | 
|  |  | 
|  | def load(self): | 
|  | self.data = scl.yaml_load_verify(self.filename, self.schema) | 
|  |  | 
|  | if 'tests' in self.data: | 
|  | self.tests = self.data['tests'] | 
|  | if 'common' in self.data: | 
|  | self.common = self.data['common'] | 
|  |  | 
|  | def _cast_value(self, value, typestr): | 
|  | if isinstance(value, str): | 
|  | v = value.strip() | 
|  | if typestr == "str": | 
|  | return v | 
|  |  | 
|  | elif typestr == "float": | 
|  | return float(value) | 
|  |  | 
|  | elif typestr == "int": | 
|  | return int(value) | 
|  |  | 
|  | elif typestr == "bool": | 
|  | return value | 
|  |  | 
|  | elif typestr.startswith("list") and isinstance(value, list): | 
|  | return value | 
|  | elif typestr.startswith("list") and isinstance(value, str): | 
|  | vs = v.split() | 
|  | if len(typestr) > 4 and typestr[4] == ":": | 
|  | return [self._cast_value(vsi, typestr[5:]) for vsi in vs] | 
|  | else: | 
|  | return vs | 
|  |  | 
|  | elif typestr.startswith("set"): | 
|  | vs = v.split() | 
|  | if len(typestr) > 3 and typestr[3] == ":": | 
|  | return {self._cast_value(vsi, typestr[4:]) for vsi in vs} | 
|  | else: | 
|  | return set(vs) | 
|  |  | 
|  | elif typestr.startswith("map"): | 
|  | return value | 
|  | else: | 
|  | raise ConfigurationError( | 
|  | self.filename, "unknown type '%s'" % value) | 
|  |  | 
|  | def get_test(self, name, valid_keys): | 
|  | """Get a dictionary representing the keys/values within a test | 
|  |  | 
|  | @param name The test in the .yaml file to retrieve data from | 
|  | @param valid_keys A dictionary representing the intended semantics | 
|  | for this test. Each key in this dictionary is a key that could | 
|  | be specified, if a key is given in the .yaml file which isn't in | 
|  | here, it will generate an error. Each value in this dictionary | 
|  | is another dictionary containing metadata: | 
|  |  | 
|  | "default" - Default value if not given | 
|  | "type" - Data type to convert the text value to. Simple types | 
|  | supported are "str", "float", "int", "bool" which will get | 
|  | converted to respective Python data types. "set" and "list" | 
|  | may also be specified which will split the value by | 
|  | whitespace (but keep the elements as strings). finally, | 
|  | "list:<type>" and "set:<type>" may be given which will | 
|  | perform a type conversion after splitting the value up. | 
|  | "required" - If true, raise an error if not defined. If false | 
|  | and "default" isn't specified, a type conversion will be | 
|  | done on an empty string | 
|  | @return A dictionary containing the test key-value pairs with | 
|  | type conversion and default values filled in per valid_keys | 
|  | """ | 
|  |  | 
|  | d = {} | 
|  | for k, v in self.common.items(): | 
|  | d[k] = v | 
|  |  | 
|  | for k, v in self.tests[name].items(): | 
|  | if k in d: | 
|  | if isinstance(d[k], str): | 
|  | # By default, we just concatenate string values of keys | 
|  | # which appear both in "common" and per-test sections, | 
|  | # but some keys are handled in adhoc way based on their | 
|  | # semantics. | 
|  | if k == "filter": | 
|  | d[k] = "(%s) and (%s)" % (d[k], v) | 
|  | else: | 
|  | d[k] += " " + v | 
|  | else: | 
|  | d[k] = v | 
|  |  | 
|  | for k, kinfo in valid_keys.items(): | 
|  | if k not in d: | 
|  | if "required" in kinfo: | 
|  | required = kinfo["required"] | 
|  | else: | 
|  | required = False | 
|  |  | 
|  | if required: | 
|  | raise ConfigurationError( | 
|  | self.filename, | 
|  | "missing required value for '%s' in test '%s'" % | 
|  | (k, name)) | 
|  | else: | 
|  | if "default" in kinfo: | 
|  | default = kinfo["default"] | 
|  | else: | 
|  | default = self._cast_value("", kinfo["type"]) | 
|  | d[k] = default | 
|  | else: | 
|  | try: | 
|  | d[k] = self._cast_value(d[k], kinfo["type"]) | 
|  | except ValueError: | 
|  | raise ConfigurationError( | 
|  | self.filename, "bad %s value '%s' for key '%s' in name '%s'" % | 
|  | (kinfo["type"], d[k], k, name)) | 
|  |  | 
|  | return d | 
|  |  | 
|  |  | 
|  | class Platform: | 
|  | """Class representing metadata for a particular platform | 
|  |  | 
|  | Maps directly to BOARD when building""" | 
|  |  | 
|  | platform_schema = scl.yaml_load(os.path.join(ZEPHYR_BASE, | 
|  | "scripts", "schemas", "twister", "platform-schema.yaml")) | 
|  |  | 
|  | def __init__(self): | 
|  | """Constructor. | 
|  |  | 
|  | """ | 
|  |  | 
|  | self.name = "" | 
|  | self.twister = True | 
|  | # if no RAM size is specified by the board, take a default of 128K | 
|  | self.ram = 128 | 
|  |  | 
|  | self.ignore_tags = [] | 
|  | self.only_tags = [] | 
|  | self.default = False | 
|  | # if no flash size is specified by the board, take a default of 512K | 
|  | self.flash = 512 | 
|  | self.supported = set() | 
|  |  | 
|  | self.arch = "" | 
|  | self.type = "na" | 
|  | self.simulation = "na" | 
|  | self.supported_toolchains = [] | 
|  | self.env = [] | 
|  | self.env_satisfied = True | 
|  | self.filter_data = dict() | 
|  |  | 
|  | def load(self, platform_file): | 
|  | scp = TwisterConfigParser(platform_file, self.platform_schema) | 
|  | scp.load() | 
|  | data = scp.data | 
|  |  | 
|  | self.name = data['identifier'] | 
|  | self.twister = data.get("twister", True) | 
|  | # if no RAM size is specified by the board, take a default of 128K | 
|  | self.ram = data.get("ram", 128) | 
|  | testing = data.get("testing", {}) | 
|  | self.ignore_tags = testing.get("ignore_tags", []) | 
|  | self.only_tags = testing.get("only_tags", []) | 
|  | self.default = testing.get("default", False) | 
|  | # if no flash size is specified by the board, take a default of 512K | 
|  | self.flash = data.get("flash", 512) | 
|  | self.supported = set() | 
|  | for supp_feature in data.get("supported", []): | 
|  | for item in supp_feature.split(":"): | 
|  | self.supported.add(item) | 
|  |  | 
|  | self.arch = data['arch'] | 
|  | self.type = data.get('type', "na") | 
|  | self.simulation = data.get('simulation', "na") | 
|  | self.supported_toolchains = data.get("toolchain", []) | 
|  | self.env = data.get("env", []) | 
|  | self.env_satisfied = True | 
|  | for env in self.env: | 
|  | if not os.environ.get(env, None): | 
|  | self.env_satisfied = False | 
|  |  | 
|  | def __repr__(self): | 
|  | return "<%s on %s>" % (self.name, self.arch) | 
|  |  | 
|  |  | 
|  | class DisablePyTestCollectionMixin(object): | 
|  | __test__ = False | 
|  |  | 
|  |  | 
|  | class TestCase(DisablePyTestCollectionMixin): | 
|  | """Class representing a test application | 
|  | """ | 
|  |  | 
|  | def __init__(self, testcase_root, workdir, name): | 
|  | """TestCase constructor. | 
|  |  | 
|  | This gets called by TestSuite as it finds and reads test yaml files. | 
|  | Multiple TestCase instances may be generated from a single testcase.yaml, | 
|  | each one corresponds to an entry within that file. | 
|  |  | 
|  | We need to have a unique name for every single test case. Since | 
|  | a testcase.yaml can define multiple tests, the canonical name for | 
|  | the test case is <workdir>/<name>. | 
|  |  | 
|  | @param testcase_root os.path.abspath() of one of the --testcase-root | 
|  | @param workdir Sub-directory of testcase_root where the | 
|  | .yaml test configuration file was found | 
|  | @param name Name of this test case, corresponding to the entry name | 
|  | in the test case configuration file. For many test cases that just | 
|  | define one test, can be anything and is usually "test". This is | 
|  | really only used to distinguish between different cases when | 
|  | the testcase.yaml defines multiple tests | 
|  | """ | 
|  |  | 
|  |  | 
|  | self.source_dir = "" | 
|  | self.yamlfile = "" | 
|  | self.cases = [] | 
|  | self.name = self.get_unique(testcase_root, workdir, name) | 
|  | self.id = name | 
|  |  | 
|  | self.type = None | 
|  | self.tags = set() | 
|  | self.extra_args = None | 
|  | self.extra_configs = None | 
|  | self.arch_allow = None | 
|  | self.arch_exclude = None | 
|  | self.skip = False | 
|  | self.platform_exclude = None | 
|  | self.platform_allow = None | 
|  | self.toolchain_exclude = None | 
|  | self.toolchain_allow = None | 
|  | self.tc_filter = None | 
|  | self.timeout = 60 | 
|  | self.harness = "" | 
|  | self.harness_config = {} | 
|  | self.build_only = True | 
|  | self.build_on_all = False | 
|  | self.slow = False | 
|  | self.min_ram = -1 | 
|  | self.depends_on = None | 
|  | self.min_flash = -1 | 
|  | self.extra_sections = None | 
|  | self.integration_platforms = [] | 
|  |  | 
|  | @staticmethod | 
|  | def get_unique(testcase_root, workdir, name): | 
|  |  | 
|  | canonical_testcase_root = os.path.realpath(testcase_root) | 
|  | if Path(canonical_zephyr_base) in Path(canonical_testcase_root).parents: | 
|  | # This is in ZEPHYR_BASE, so include path in name for uniqueness | 
|  | # FIXME: We should not depend on path of test for unique names. | 
|  | relative_tc_root = os.path.relpath(canonical_testcase_root, | 
|  | start=canonical_zephyr_base) | 
|  | else: | 
|  | relative_tc_root = "" | 
|  |  | 
|  | # workdir can be "." | 
|  | unique = os.path.normpath(os.path.join(relative_tc_root, workdir, name)) | 
|  | check = name.split(".") | 
|  | if len(check) < 2: | 
|  | raise TwisterException(f"""bad test name '{name}' in {testcase_root}/{workdir}. \ | 
|  | Tests should reference the category and subsystem with a dot as a separator. | 
|  | """ | 
|  | ) | 
|  | return unique | 
|  |  | 
|  | @staticmethod | 
|  | def scan_file(inf_name): | 
|  | suite_regex = re.compile( | 
|  | # do not match until end-of-line, otherwise we won't allow | 
|  | # stc_regex below to catch the ones that are declared in the same | 
|  | # line--as we only search starting the end of this match | 
|  | br"^\s*ztest_test_suite\(\s*(?P<suite_name>[a-zA-Z0-9_]+)\s*,", | 
|  | re.MULTILINE) | 
|  | stc_regex = re.compile( | 
|  | br"^\s*"  # empy space at the beginning is ok | 
|  | # catch the case where it is declared in the same sentence, e.g: | 
|  | # | 
|  | # ztest_test_suite(mutex_complex, ztest_user_unit_test(TESTNAME)); | 
|  | br"(?:ztest_test_suite\([a-zA-Z0-9_]+,\s*)?" | 
|  | # Catch ztest[_user]_unit_test-[_setup_teardown](TESTNAME) | 
|  | br"ztest_(?:1cpu_)?(?:user_)?unit_test(?:_setup_teardown)?" | 
|  | # Consume the argument that becomes the extra testcse | 
|  | br"\(\s*" | 
|  | br"(?P<stc_name>[a-zA-Z0-9_]+)" | 
|  | # _setup_teardown() variant has two extra arguments that we ignore | 
|  | br"(?:\s*,\s*[a-zA-Z0-9_]+\s*,\s*[a-zA-Z0-9_]+)?" | 
|  | br"\s*\)", | 
|  | # We don't check how it finishes; we don't care | 
|  | re.MULTILINE) | 
|  | suite_run_regex = re.compile( | 
|  | br"^\s*ztest_run_test_suite\((?P<suite_name>[a-zA-Z0-9_]+)\)", | 
|  | re.MULTILINE) | 
|  | achtung_regex = re.compile( | 
|  | br"(#ifdef|#endif)", | 
|  | re.MULTILINE) | 
|  | warnings = None | 
|  |  | 
|  | with open(inf_name) as inf: | 
|  | if os.name == 'nt': | 
|  | mmap_args = {'fileno': inf.fileno(), 'length': 0, 'access': mmap.ACCESS_READ} | 
|  | else: | 
|  | mmap_args = {'fileno': inf.fileno(), 'length': 0, 'flags': mmap.MAP_PRIVATE, 'prot': mmap.PROT_READ, | 
|  | 'offset': 0} | 
|  |  | 
|  | with contextlib.closing(mmap.mmap(**mmap_args)) as main_c: | 
|  | suite_regex_match = suite_regex.search(main_c) | 
|  | if not suite_regex_match: | 
|  | # can't find ztest_test_suite, maybe a client, because | 
|  | # it includes ztest.h | 
|  | return None, None | 
|  |  | 
|  | suite_run_match = suite_run_regex.search(main_c) | 
|  | if not suite_run_match: | 
|  | raise ValueError("can't find ztest_run_test_suite") | 
|  |  | 
|  | achtung_matches = re.findall( | 
|  | achtung_regex, | 
|  | main_c[suite_regex_match.end():suite_run_match.start()]) | 
|  | if achtung_matches: | 
|  | warnings = "found invalid %s in ztest_test_suite()" \ | 
|  | % ", ".join(sorted({match.decode() for match in achtung_matches},reverse = True)) | 
|  | _matches = re.findall( | 
|  | stc_regex, | 
|  | main_c[suite_regex_match.end():suite_run_match.start()]) | 
|  | for match in _matches: | 
|  | if not match.decode().startswith("test_"): | 
|  | warnings = "Found a test that does not start with test_" | 
|  | matches = [match.decode().replace("test_", "", 1) for match in _matches] | 
|  | return matches, warnings | 
|  |  | 
|  | def scan_path(self, path): | 
|  | subcases = [] | 
|  | for filename in glob.glob(os.path.join(path, "src", "*.c*")): | 
|  | try: | 
|  | _subcases, warnings = self.scan_file(filename) | 
|  | if warnings: | 
|  | logger.error("%s: %s" % (filename, warnings)) | 
|  | raise TwisterRuntimeError("%s: %s" % (filename, warnings)) | 
|  | if _subcases: | 
|  | subcases += _subcases | 
|  | except ValueError as e: | 
|  | logger.error("%s: can't find: %s" % (filename, e)) | 
|  |  | 
|  | for filename in glob.glob(os.path.join(path, "*.c")): | 
|  | try: | 
|  | _subcases, warnings = self.scan_file(filename) | 
|  | if warnings: | 
|  | logger.error("%s: %s" % (filename, warnings)) | 
|  | if _subcases: | 
|  | subcases += _subcases | 
|  | except ValueError as e: | 
|  | logger.error("%s: can't find: %s" % (filename, e)) | 
|  | return subcases | 
|  |  | 
|  | def parse_subcases(self, test_path): | 
|  | results = self.scan_path(test_path) | 
|  | for sub in results: | 
|  | name = "{}.{}".format(self.id, sub) | 
|  | self.cases.append(name) | 
|  |  | 
|  | if not results: | 
|  | self.cases.append(self.id) | 
|  |  | 
|  | def __str__(self): | 
|  | return self.name | 
|  |  | 
|  |  | 
|  | class TestInstance(DisablePyTestCollectionMixin): | 
|  | """Class representing the execution of a particular TestCase on a platform | 
|  |  | 
|  | @param test The TestCase object we want to build/execute | 
|  | @param platform Platform object that we want to build and run against | 
|  | @param base_outdir Base directory for all test results. The actual | 
|  | out directory used is <outdir>/<platform>/<test case name> | 
|  | """ | 
|  |  | 
|  | def __init__(self, testcase, platform, outdir): | 
|  |  | 
|  | self.testcase = testcase | 
|  | self.platform = platform | 
|  |  | 
|  | self.status = None | 
|  | self.reason = "Unknown" | 
|  | self.metrics = dict() | 
|  | self.handler = None | 
|  | self.outdir = outdir | 
|  |  | 
|  | self.name = os.path.join(platform.name, testcase.name) | 
|  | self.build_dir = os.path.join(outdir, platform.name, testcase.name) | 
|  |  | 
|  | self.run = False | 
|  |  | 
|  | self.results = {} | 
|  |  | 
|  | def __getstate__(self): | 
|  | d = self.__dict__.copy() | 
|  | return d | 
|  |  | 
|  | def __setstate__(self, d): | 
|  | self.__dict__.update(d) | 
|  |  | 
|  | def __lt__(self, other): | 
|  | return self.name < other.name | 
|  |  | 
|  |  | 
|  | @staticmethod | 
|  | def testcase_runnable(testcase, fixtures): | 
|  | can_run = False | 
|  | # console harness allows us to run the test and capture data. | 
|  | if testcase.harness in [ 'console', 'ztest', 'pytest']: | 
|  | can_run = True | 
|  | # if we have a fixture that is also being supplied on the | 
|  | # command-line, then we need to run the test, not just build it. | 
|  | fixture = testcase.harness_config.get('fixture') | 
|  | if fixture: | 
|  | can_run = (fixture in fixtures) | 
|  |  | 
|  | elif testcase.harness: | 
|  | can_run = False | 
|  | else: | 
|  | can_run = True | 
|  |  | 
|  | return can_run | 
|  |  | 
|  |  | 
|  | # Global testsuite parameters | 
|  | def check_runnable(self, enable_slow=False, filter='buildable', fixtures=[]): | 
|  |  | 
|  | # right now we only support building on windows. running is still work | 
|  | # in progress. | 
|  | if os.name == 'nt': | 
|  | return False | 
|  |  | 
|  | # we asked for build-only on the command line | 
|  | if self.testcase.build_only: | 
|  | return False | 
|  |  | 
|  | # Do not run slow tests: | 
|  | skip_slow = self.testcase.slow and not enable_slow | 
|  | if skip_slow: | 
|  | return False | 
|  |  | 
|  | target_ready = bool(self.testcase.type == "unit" or \ | 
|  | self.platform.type == "native" or \ | 
|  | self.platform.simulation in ["mdb-nsim", "nsim", "renode", "qemu", "tsim", "armfvp"] or \ | 
|  | filter == 'runnable') | 
|  |  | 
|  | if self.platform.simulation == "nsim": | 
|  | if not find_executable("nsimdrv"): | 
|  | target_ready = False | 
|  |  | 
|  | if self.platform.simulation == "mdb-nsim": | 
|  | if not find_executable("mdb"): | 
|  | target_ready = False | 
|  |  | 
|  | if self.platform.simulation == "renode": | 
|  | if not find_executable("renode"): | 
|  | target_ready = False | 
|  |  | 
|  | if self.platform.simulation == "tsim": | 
|  | if not find_executable("tsim-leon3"): | 
|  | target_ready = False | 
|  |  | 
|  | testcase_runnable = self.testcase_runnable(self.testcase, fixtures) | 
|  |  | 
|  | return testcase_runnable and target_ready | 
|  |  | 
|  | def create_overlay(self, platform, enable_asan=False, enable_ubsan=False, enable_coverage=False, coverage_platform=[]): | 
|  | # Create this in a "twister/" subdirectory otherwise this | 
|  | # will pass this overlay to kconfig.py *twice* and kconfig.cmake | 
|  | # will silently give that second time precedence over any | 
|  | # --extra-args=CONFIG_* | 
|  | subdir = os.path.join(self.build_dir, "twister") | 
|  |  | 
|  | content = "" | 
|  |  | 
|  | if self.testcase.extra_configs: | 
|  | content = "\n".join(self.testcase.extra_configs) | 
|  |  | 
|  | if enable_coverage: | 
|  | if platform.name in coverage_platform: | 
|  | content = content + "\nCONFIG_COVERAGE=y" | 
|  | content = content + "\nCONFIG_COVERAGE_DUMP=y" | 
|  |  | 
|  | if enable_asan: | 
|  | if platform.type == "native": | 
|  | content = content + "\nCONFIG_ASAN=y" | 
|  |  | 
|  | if enable_ubsan: | 
|  | if platform.type == "native": | 
|  | content = content + "\nCONFIG_UBSAN=y" | 
|  |  | 
|  | if content: | 
|  | os.makedirs(subdir, exist_ok=True) | 
|  | file = os.path.join(subdir, "testcase_extra.conf") | 
|  | with open(file, "w") as f: | 
|  | f.write(content) | 
|  |  | 
|  | return content | 
|  |  | 
|  | def calculate_sizes(self): | 
|  | """Get the RAM/ROM sizes of a test case. | 
|  |  | 
|  | This can only be run after the instance has been executed by | 
|  | MakeGenerator, otherwise there won't be any binaries to measure. | 
|  |  | 
|  | @return A SizeCalculator object | 
|  | """ | 
|  | fns = glob.glob(os.path.join(self.build_dir, "zephyr", "*.elf")) | 
|  | fns.extend(glob.glob(os.path.join(self.build_dir, "zephyr", "*.exe"))) | 
|  | fns = [x for x in fns if not x.endswith('_prebuilt.elf')] | 
|  | if len(fns) != 1: | 
|  | raise BuildError("Missing/multiple output ELF binary") | 
|  |  | 
|  | return SizeCalculator(fns[0], self.testcase.extra_sections) | 
|  |  | 
|  | def fill_results_by_status(self): | 
|  | """Fills results according to self.status | 
|  |  | 
|  | The method is used to propagate the instance level status | 
|  | to the test cases inside. Useful when the whole instance is skipped | 
|  | and the info is required also at the test cases level for reporting. | 
|  | Should be used with caution, e.g. should not be used | 
|  | to fill all results with passes | 
|  | """ | 
|  | status_to_verdict = { | 
|  | 'skipped': 'SKIP', | 
|  | 'error': 'BLOCK', | 
|  | 'failure': 'FAILED' | 
|  | } | 
|  |  | 
|  | for k in self.results: | 
|  | self.results[k] = status_to_verdict[self.status] | 
|  |  | 
|  | def __repr__(self): | 
|  | return "<TestCase %s on %s>" % (self.testcase.name, self.platform.name) | 
|  |  | 
|  |  | 
|  | class CMake(): | 
|  | config_re = re.compile('(CONFIG_[A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$') | 
|  | dt_re = re.compile('([A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$') | 
|  |  | 
|  | def __init__(self, testcase, platform, source_dir, build_dir): | 
|  |  | 
|  | self.cwd = None | 
|  | self.capture_output = True | 
|  |  | 
|  | self.defconfig = {} | 
|  | self.cmake_cache = {} | 
|  |  | 
|  | self.instance = None | 
|  | self.testcase = testcase | 
|  | self.platform = platform | 
|  | self.source_dir = source_dir | 
|  | self.build_dir = build_dir | 
|  | self.log = "build.log" | 
|  | self.generator = None | 
|  | self.generator_cmd = None | 
|  |  | 
|  | def parse_generated(self): | 
|  | self.defconfig = {} | 
|  | return {} | 
|  |  | 
|  | def run_build(self, args=[]): | 
|  |  | 
|  | logger.debug("Building %s for %s" % (self.source_dir, self.platform.name)) | 
|  |  | 
|  | cmake_args = [] | 
|  | cmake_args.extend(args) | 
|  | cmake = shutil.which('cmake') | 
|  | cmd = [cmake] + cmake_args | 
|  | kwargs = dict() | 
|  |  | 
|  | if self.capture_output: | 
|  | kwargs['stdout'] = subprocess.PIPE | 
|  | # CMake sends the output of message() to stderr unless it's STATUS | 
|  | kwargs['stderr'] = subprocess.STDOUT | 
|  |  | 
|  | if self.cwd: | 
|  | kwargs['cwd'] = self.cwd | 
|  |  | 
|  | p = subprocess.Popen(cmd, **kwargs) | 
|  | out, _ = p.communicate() | 
|  |  | 
|  | results = {} | 
|  | if p.returncode == 0: | 
|  | msg = "Finished building %s for %s" % (self.source_dir, self.platform.name) | 
|  |  | 
|  | self.instance.status = "passed" | 
|  | results = {'msg': msg, "returncode": p.returncode, "instance": self.instance} | 
|  |  | 
|  | if out: | 
|  | log_msg = out.decode(sys.getdefaultencoding()) | 
|  | with open(os.path.join(self.build_dir, self.log), "a") as log: | 
|  | log.write(log_msg) | 
|  |  | 
|  | else: | 
|  | return None | 
|  | else: | 
|  | # A real error occurred, raise an exception | 
|  | log_msg = "" | 
|  | if out: | 
|  | log_msg = out.decode(sys.getdefaultencoding()) | 
|  | with open(os.path.join(self.build_dir, self.log), "a") as log: | 
|  | log.write(log_msg) | 
|  |  | 
|  | if log_msg: | 
|  | res = re.findall("region `(FLASH|RAM|ICCM|DCCM|SRAM)' overflowed by", log_msg) | 
|  | if res and not self.overflow_as_errors: | 
|  | logger.debug("Test skipped due to {} Overflow".format(res[0])) | 
|  | self.instance.status = "skipped" | 
|  | self.instance.reason = "{} overflow".format(res[0]) | 
|  | else: | 
|  | self.instance.status = "error" | 
|  | self.instance.reason = "Build failure" | 
|  |  | 
|  | results = { | 
|  | "returncode": p.returncode, | 
|  | "instance": self.instance, | 
|  | } | 
|  |  | 
|  | return results | 
|  |  | 
|  | def run_cmake(self, args=[]): | 
|  |  | 
|  | if self.warnings_as_errors: | 
|  | ldflags = "-Wl,--fatal-warnings" | 
|  | cflags = "-Werror" | 
|  | aflags = "-Wa,--fatal-warnings" | 
|  | gen_defines_args = "--err-on-deprecated-properties" | 
|  | else: | 
|  | ldflags = cflags = aflags = "" | 
|  | gen_defines_args = "" | 
|  |  | 
|  | logger.debug("Running cmake on %s for %s" % (self.source_dir, self.platform.name)) | 
|  | cmake_args = [ | 
|  | f'-B{self.build_dir}', | 
|  | f'-S{self.source_dir}', | 
|  | f'-DEXTRA_CFLAGS="{cflags}"', | 
|  | f'-DEXTRA_AFLAGS="{aflags}', | 
|  | f'-DEXTRA_LDFLAGS="{ldflags}"', | 
|  | f'-DEXTRA_GEN_DEFINES_ARGS={gen_defines_args}', | 
|  | f'-G{self.generator}' | 
|  | ] | 
|  |  | 
|  | if self.cmake_only: | 
|  | cmake_args.append("-DCMAKE_EXPORT_COMPILE_COMMANDS=1") | 
|  |  | 
|  | args = ["-D{}".format(a.replace('"', '')) for a in args] | 
|  | cmake_args.extend(args) | 
|  |  | 
|  | cmake_opts = ['-DBOARD={}'.format(self.platform.name)] | 
|  | cmake_args.extend(cmake_opts) | 
|  |  | 
|  |  | 
|  | logger.debug("Calling cmake with arguments: {}".format(cmake_args)) | 
|  | cmake = shutil.which('cmake') | 
|  | cmd = [cmake] + cmake_args | 
|  | kwargs = dict() | 
|  |  | 
|  | if self.capture_output: | 
|  | kwargs['stdout'] = subprocess.PIPE | 
|  | # CMake sends the output of message() to stderr unless it's STATUS | 
|  | kwargs['stderr'] = subprocess.STDOUT | 
|  |  | 
|  | if self.cwd: | 
|  | kwargs['cwd'] = self.cwd | 
|  |  | 
|  | p = subprocess.Popen(cmd, **kwargs) | 
|  | out, _ = p.communicate() | 
|  |  | 
|  | if p.returncode == 0: | 
|  | filter_results = self.parse_generated() | 
|  | msg = "Finished building %s for %s" % (self.source_dir, self.platform.name) | 
|  | logger.debug(msg) | 
|  | results = {'msg': msg, 'filter': filter_results} | 
|  |  | 
|  | else: | 
|  | self.instance.status = "error" | 
|  | self.instance.reason = "Cmake build failure" | 
|  | logger.error("Cmake build failure: %s for %s" % (self.source_dir, self.platform.name)) | 
|  | results = {"returncode": p.returncode} | 
|  |  | 
|  | if out: | 
|  | with open(os.path.join(self.build_dir, self.log), "a") as log: | 
|  | log_msg = out.decode(sys.getdefaultencoding()) | 
|  | log.write(log_msg) | 
|  |  | 
|  | return results | 
|  |  | 
|  | @staticmethod | 
|  | def run_cmake_script(args=[]): | 
|  |  | 
|  | logger.debug("Running cmake script %s" % (args[0])) | 
|  |  | 
|  | cmake_args = ["-D{}".format(a.replace('"', '')) for a in args[1:]] | 
|  | cmake_args.extend(['-P', args[0]]) | 
|  |  | 
|  | logger.debug("Calling cmake with arguments: {}".format(cmake_args)) | 
|  | cmake = shutil.which('cmake') | 
|  | if not cmake: | 
|  | msg = "Unable to find `cmake` in path" | 
|  | logger.error(msg) | 
|  | raise Exception(msg) | 
|  | cmd = [cmake] + cmake_args | 
|  |  | 
|  | kwargs = dict() | 
|  | kwargs['stdout'] = subprocess.PIPE | 
|  | # CMake sends the output of message() to stderr unless it's STATUS | 
|  | kwargs['stderr'] = subprocess.STDOUT | 
|  |  | 
|  | p = subprocess.Popen(cmd, **kwargs) | 
|  | out, _ = p.communicate() | 
|  |  | 
|  | if p.returncode == 0: | 
|  | msg = "Finished running  %s" % (args[0]) | 
|  | logger.debug(msg) | 
|  | results = {"returncode": p.returncode, "msg": msg, "stdout": out} | 
|  |  | 
|  | else: | 
|  | logger.error("Cmake script failure: %s" % (args[0])) | 
|  | results = {"returncode": p.returncode} | 
|  |  | 
|  | return results | 
|  |  | 
|  |  | 
|  | class FilterBuilder(CMake): | 
|  |  | 
|  | def __init__(self, testcase, platform, source_dir, build_dir): | 
|  | super().__init__(testcase, platform, source_dir, build_dir) | 
|  |  | 
|  | self.log = "config-twister.log" | 
|  |  | 
|  | def parse_generated(self): | 
|  |  | 
|  | if self.platform.name == "unit_testing": | 
|  | return {} | 
|  |  | 
|  | cmake_cache_path = os.path.join(self.build_dir, "CMakeCache.txt") | 
|  | defconfig_path = os.path.join(self.build_dir, "zephyr", ".config") | 
|  |  | 
|  | with open(defconfig_path, "r") as fp: | 
|  | defconfig = {} | 
|  | for line in fp.readlines(): | 
|  | m = self.config_re.match(line) | 
|  | if not m: | 
|  | if line.strip() and not line.startswith("#"): | 
|  | sys.stderr.write("Unrecognized line %s\n" % line) | 
|  | continue | 
|  | defconfig[m.group(1)] = m.group(2).strip() | 
|  |  | 
|  | self.defconfig = defconfig | 
|  |  | 
|  | cmake_conf = {} | 
|  | try: | 
|  | cache = CMakeCache.from_file(cmake_cache_path) | 
|  | except FileNotFoundError: | 
|  | cache = {} | 
|  |  | 
|  | for k in iter(cache): | 
|  | cmake_conf[k.name] = k.value | 
|  |  | 
|  | self.cmake_cache = cmake_conf | 
|  |  | 
|  | filter_data = { | 
|  | "ARCH": self.platform.arch, | 
|  | "PLATFORM": self.platform.name | 
|  | } | 
|  | filter_data.update(os.environ) | 
|  | filter_data.update(self.defconfig) | 
|  | filter_data.update(self.cmake_cache) | 
|  |  | 
|  | edt_pickle = os.path.join(self.build_dir, "zephyr", "edt.pickle") | 
|  | if self.testcase and self.testcase.tc_filter: | 
|  | try: | 
|  | if os.path.exists(edt_pickle): | 
|  | with open(edt_pickle, 'rb') as f: | 
|  | edt = pickle.load(f) | 
|  | else: | 
|  | edt = None | 
|  | res = expr_parser.parse(self.testcase.tc_filter, filter_data, edt) | 
|  |  | 
|  | except (ValueError, SyntaxError) as se: | 
|  | sys.stderr.write( | 
|  | "Failed processing %s\n" % self.testcase.yamlfile) | 
|  | raise se | 
|  |  | 
|  | if not res: | 
|  | return {os.path.join(self.platform.name, self.testcase.name): True} | 
|  | else: | 
|  | return {os.path.join(self.platform.name, self.testcase.name): False} | 
|  | else: | 
|  | self.platform.filter_data = filter_data | 
|  | return filter_data | 
|  |  | 
|  |  | 
|  | class ProjectBuilder(FilterBuilder): | 
|  |  | 
|  | def __init__(self, suite, instance, **kwargs): | 
|  | super().__init__(instance.testcase, instance.platform, instance.testcase.source_dir, instance.build_dir) | 
|  |  | 
|  | self.log = "build.log" | 
|  | self.instance = instance | 
|  | self.suite = suite | 
|  | self.filtered_tests = 0 | 
|  |  | 
|  | self.lsan = kwargs.get('lsan', False) | 
|  | self.asan = kwargs.get('asan', False) | 
|  | self.ubsan = kwargs.get('ubsan', False) | 
|  | self.valgrind = kwargs.get('valgrind', False) | 
|  | self.extra_args = kwargs.get('extra_args', []) | 
|  | self.device_testing = kwargs.get('device_testing', False) | 
|  | self.cmake_only = kwargs.get('cmake_only', False) | 
|  | self.cleanup = kwargs.get('cleanup', False) | 
|  | self.coverage = kwargs.get('coverage', False) | 
|  | self.inline_logs = kwargs.get('inline_logs', False) | 
|  | self.generator = kwargs.get('generator', None) | 
|  | self.generator_cmd = kwargs.get('generator_cmd', None) | 
|  | self.verbose = kwargs.get('verbose', None) | 
|  | self.warnings_as_errors = kwargs.get('warnings_as_errors', True) | 
|  | self.overflow_as_errors = kwargs.get('overflow_as_errors', False) | 
|  |  | 
|  | @staticmethod | 
|  | def log_info(filename, inline_logs): | 
|  | filename = os.path.abspath(os.path.realpath(filename)) | 
|  | if inline_logs: | 
|  | logger.info("{:-^100}".format(filename)) | 
|  |  | 
|  | try: | 
|  | with open(filename) as fp: | 
|  | data = fp.read() | 
|  | except Exception as e: | 
|  | data = "Unable to read log data (%s)\n" % (str(e)) | 
|  |  | 
|  | logger.error(data) | 
|  |  | 
|  | logger.info("{:-^100}".format(filename)) | 
|  | else: | 
|  | logger.error("see: " + Fore.YELLOW + filename + Fore.RESET) | 
|  |  | 
|  | def log_info_file(self, inline_logs): | 
|  | build_dir = self.instance.build_dir | 
|  | h_log = "{}/handler.log".format(build_dir) | 
|  | b_log = "{}/build.log".format(build_dir) | 
|  | v_log = "{}/valgrind.log".format(build_dir) | 
|  | d_log = "{}/device.log".format(build_dir) | 
|  |  | 
|  | if os.path.exists(v_log) and "Valgrind" in self.instance.reason: | 
|  | self.log_info("{}".format(v_log), inline_logs) | 
|  | elif os.path.exists(h_log) and os.path.getsize(h_log) > 0: | 
|  | self.log_info("{}".format(h_log), inline_logs) | 
|  | elif os.path.exists(d_log) and os.path.getsize(d_log) > 0: | 
|  | self.log_info("{}".format(d_log), inline_logs) | 
|  | else: | 
|  | self.log_info("{}".format(b_log), inline_logs) | 
|  |  | 
|  | def setup_handler(self): | 
|  |  | 
|  | instance = self.instance | 
|  | args = [] | 
|  |  | 
|  | # FIXME: Needs simplification | 
|  | if instance.platform.simulation == "qemu": | 
|  | instance.handler = QEMUHandler(instance, "qemu") | 
|  | args.append("QEMU_PIPE=%s" % instance.handler.get_fifo()) | 
|  | instance.handler.call_make_run = True | 
|  | elif instance.testcase.type == "unit": | 
|  | instance.handler = BinaryHandler(instance, "unit") | 
|  | instance.handler.binary = os.path.join(instance.build_dir, "testbinary") | 
|  | if self.coverage: | 
|  | args.append("COVERAGE=1") | 
|  | elif instance.platform.type == "native": | 
|  | handler = BinaryHandler(instance, "native") | 
|  |  | 
|  | handler.asan = self.asan | 
|  | handler.valgrind = self.valgrind | 
|  | handler.lsan = self.lsan | 
|  | handler.ubsan = self.ubsan | 
|  | handler.coverage = self.coverage | 
|  |  | 
|  | handler.binary = os.path.join(instance.build_dir, "zephyr", "zephyr.exe") | 
|  | instance.handler = handler | 
|  | elif instance.platform.simulation == "renode": | 
|  | if find_executable("renode"): | 
|  | instance.handler = BinaryHandler(instance, "renode") | 
|  | instance.handler.pid_fn = os.path.join(instance.build_dir, "renode.pid") | 
|  | instance.handler.call_make_run = True | 
|  | elif instance.platform.simulation == "tsim": | 
|  | instance.handler = BinaryHandler(instance, "tsim") | 
|  | instance.handler.call_make_run = True | 
|  | elif self.device_testing: | 
|  | instance.handler = DeviceHandler(instance, "device") | 
|  | instance.handler.coverage = self.coverage | 
|  | elif instance.platform.simulation == "nsim": | 
|  | if find_executable("nsimdrv"): | 
|  | instance.handler = BinaryHandler(instance, "nsim") | 
|  | instance.handler.call_make_run = True | 
|  | elif instance.platform.simulation == "mdb-nsim": | 
|  | if find_executable("mdb"): | 
|  | instance.handler = BinaryHandler(instance, "nsim") | 
|  | instance.handler.pid_fn = os.path.join(instance.build_dir, "mdb.pid") | 
|  | instance.handler.call_west_flash = True | 
|  | elif instance.platform.simulation == "armfvp": | 
|  | instance.handler = BinaryHandler(instance, "armfvp") | 
|  | instance.handler.call_make_run = True | 
|  |  | 
|  | if instance.handler: | 
|  | instance.handler.args = args | 
|  | instance.handler.generator_cmd = self.generator_cmd | 
|  | instance.handler.generator = self.generator | 
|  |  | 
|  | def process(self, pipeline, done, message, lock, results): | 
|  | op = message.get('op') | 
|  |  | 
|  | if not self.instance.handler: | 
|  | self.setup_handler() | 
|  |  | 
|  | # The build process, call cmake and build with configured generator | 
|  | if op == "cmake": | 
|  | res = self.cmake() | 
|  | if self.instance.status in ["failed", "error"]: | 
|  | pipeline.put({"op": "report", "test": self.instance}) | 
|  | elif self.cmake_only: | 
|  | if self.instance.status is None: | 
|  | self.instance.status = "passed" | 
|  | pipeline.put({"op": "report", "test": self.instance}) | 
|  | else: | 
|  | if self.instance.name in res['filter'] and res['filter'][self.instance.name]: | 
|  | logger.debug("filtering %s" % self.instance.name) | 
|  | self.instance.status = "skipped" | 
|  | self.instance.reason = "filter" | 
|  | results.skipped_runtime += 1 | 
|  | for case in self.instance.testcase.cases: | 
|  | self.instance.results.update({case: 'SKIP'}) | 
|  | pipeline.put({"op": "report", "test": self.instance}) | 
|  | else: | 
|  | pipeline.put({"op": "build", "test": self.instance}) | 
|  |  | 
|  | elif op == "build": | 
|  | logger.debug("build test: %s" % self.instance.name) | 
|  | res = self.build() | 
|  |  | 
|  | if not res: | 
|  | self.instance.status = "error" | 
|  | self.instance.reason = "Build Failure" | 
|  | pipeline.put({"op": "report", "test": self.instance}) | 
|  | else: | 
|  | # Count skipped cases during build, for example | 
|  | # due to ram/rom overflow. | 
|  | inst = res.get("instance", None) | 
|  | if inst and inst.status == "skipped": | 
|  | results.skipped_runtime += 1 | 
|  |  | 
|  | if res.get('returncode', 1) > 0: | 
|  | pipeline.put({"op": "report", "test": self.instance}) | 
|  | else: | 
|  | if self.instance.run and self.instance.handler: | 
|  | pipeline.put({"op": "run", "test": self.instance}) | 
|  | else: | 
|  | pipeline.put({"op": "report", "test": self.instance}) | 
|  | # Run the generated binary using one of the supported handlers | 
|  | elif op == "run": | 
|  | logger.debug("run test: %s" % self.instance.name) | 
|  | self.run() | 
|  | self.instance.status, _ = self.instance.handler.get_state() | 
|  | logger.debug(f"run status: {self.instance.name} {self.instance.status}") | 
|  |  | 
|  | # to make it work with pickle | 
|  | self.instance.handler.thread = None | 
|  | self.instance.handler.suite = None | 
|  | pipeline.put({ | 
|  | "op": "report", | 
|  | "test": self.instance, | 
|  | "status": self.instance.status, | 
|  | "reason": self.instance.reason | 
|  | } | 
|  | ) | 
|  |  | 
|  | # Report results and output progress to screen | 
|  | elif op == "report": | 
|  | with lock: | 
|  | done.put(self.instance) | 
|  | self.report_out(results) | 
|  |  | 
|  | if self.cleanup and not self.coverage and self.instance.status == "passed": | 
|  | pipeline.put({ | 
|  | "op": "cleanup", | 
|  | "test": self.instance | 
|  | }) | 
|  |  | 
|  | elif op == "cleanup": | 
|  | if self.device_testing: | 
|  | self.cleanup_device_testing_artifacts() | 
|  | else: | 
|  | self.cleanup_artifacts() | 
|  |  | 
|  | def cleanup_artifacts(self, additional_keep=[]): | 
|  | logger.debug("Cleaning up {}".format(self.instance.build_dir)) | 
|  | allow = [ | 
|  | 'zephyr/.config', | 
|  | 'handler.log', | 
|  | 'build.log', | 
|  | 'device.log', | 
|  | 'recording.csv', | 
|  | ] | 
|  |  | 
|  | allow += additional_keep | 
|  |  | 
|  | allow = [os.path.join(self.instance.build_dir, file) for file in allow] | 
|  |  | 
|  | for dirpath, dirnames, filenames in os.walk(self.instance.build_dir, topdown=False): | 
|  | for name in filenames: | 
|  | path = os.path.join(dirpath, name) | 
|  | if path not in allow: | 
|  | os.remove(path) | 
|  | # Remove empty directories and symbolic links to directories | 
|  | for dir in dirnames: | 
|  | path = os.path.join(dirpath, dir) | 
|  | if os.path.islink(path): | 
|  | os.remove(path) | 
|  | elif not os.listdir(path): | 
|  | os.rmdir(path) | 
|  |  | 
|  | def cleanup_device_testing_artifacts(self): | 
|  | logger.debug("Cleaning up for Device Testing {}".format(self.instance.build_dir)) | 
|  |  | 
|  | sanitizelist = [ | 
|  | 'CMakeCache.txt', | 
|  | 'zephyr/runners.yaml', | 
|  | ] | 
|  | keep = [ | 
|  | 'zephyr/zephyr.hex', | 
|  | 'zephyr/zephyr.bin', | 
|  | 'zephyr/zephyr.elf', | 
|  | ] | 
|  |  | 
|  | keep += sanitizelist | 
|  |  | 
|  | self.cleanup_artifacts(keep) | 
|  |  | 
|  | # sanitize paths so files are relocatable | 
|  | for file in sanitizelist: | 
|  | file = os.path.join(self.instance.build_dir, file) | 
|  |  | 
|  | with open(file, "rt") as fin: | 
|  | data = fin.read() | 
|  | data = data.replace(canonical_zephyr_base+"/", "") | 
|  |  | 
|  | with open(file, "wt") as fin: | 
|  | fin.write(data) | 
|  |  | 
|  | def report_out(self, results): | 
|  | total_to_do = results.total - results.skipped_configs | 
|  | total_tests_width = len(str(total_to_do)) | 
|  | results.done += 1 | 
|  | instance = self.instance | 
|  |  | 
|  | if instance.status in ["error", "failed", "timeout", "flash_error"]: | 
|  | if instance.status == "error": | 
|  | results.error += 1 | 
|  | results.failed += 1 | 
|  | if self.verbose: | 
|  | status = Fore.RED + "FAILED " + Fore.RESET + instance.reason | 
|  | else: | 
|  | print("") | 
|  | logger.error( | 
|  | "{:<25} {:<50} {}FAILED{}: {}".format( | 
|  | instance.platform.name, | 
|  | instance.testcase.name, | 
|  | Fore.RED, | 
|  | Fore.RESET, | 
|  | instance.reason)) | 
|  | if not self.verbose: | 
|  | self.log_info_file(self.inline_logs) | 
|  | elif instance.status == "skipped": | 
|  | status = Fore.YELLOW + "SKIPPED" + Fore.RESET | 
|  | elif instance.status == "passed": | 
|  | status = Fore.GREEN + "PASSED" + Fore.RESET | 
|  | else: | 
|  | logger.debug(f"Unknown status = {instance.status}") | 
|  | status = Fore.YELLOW + "UNKNOWN" + Fore.RESET | 
|  |  | 
|  | if self.verbose: | 
|  | if self.cmake_only: | 
|  | more_info = "cmake" | 
|  | elif instance.status == "skipped": | 
|  | more_info = instance.reason | 
|  | else: | 
|  | if instance.handler and instance.run: | 
|  | more_info = instance.handler.type_str | 
|  | htime = instance.handler.duration | 
|  | if htime: | 
|  | more_info += " {:.3f}s".format(htime) | 
|  | else: | 
|  | more_info = "build" | 
|  |  | 
|  | logger.info("{:>{}}/{} {:<25} {:<50} {} ({})".format( | 
|  | results.done, total_tests_width, total_to_do, instance.platform.name, | 
|  | instance.testcase.name, status, more_info)) | 
|  |  | 
|  | if instance.status in ["error", "failed", "timeout"]: | 
|  | self.log_info_file(self.inline_logs) | 
|  | else: | 
|  | completed_perc = 0 | 
|  | if total_to_do > 0: | 
|  | completed_perc = int((float(results.done) / total_to_do) * 100) | 
|  |  | 
|  | skipped = results.skipped_configs + results.skipped_runtime | 
|  | sys.stdout.write("\rINFO    - Total complete: %s%4d/%4d%s  %2d%%  skipped: %s%4d%s, failed: %s%4d%s" % ( | 
|  | Fore.GREEN, | 
|  | results.done, | 
|  | total_to_do, | 
|  | Fore.RESET, | 
|  | completed_perc, | 
|  | Fore.YELLOW if skipped > 0 else Fore.RESET, | 
|  | skipped, | 
|  | Fore.RESET, | 
|  | Fore.RED if results.failed > 0 else Fore.RESET, | 
|  | results.failed, | 
|  | Fore.RESET | 
|  | ) | 
|  | ) | 
|  | sys.stdout.flush() | 
|  |  | 
|  | def cmake(self): | 
|  |  | 
|  | instance = self.instance | 
|  | args = self.testcase.extra_args[:] | 
|  | args += self.extra_args | 
|  |  | 
|  | if instance.handler: | 
|  | args += instance.handler.args | 
|  |  | 
|  | # merge overlay files into one variable | 
|  | def extract_overlays(args): | 
|  | re_overlay = re.compile('OVERLAY_CONFIG=(.*)') | 
|  | other_args = [] | 
|  | overlays = [] | 
|  | for arg in args: | 
|  | match = re_overlay.search(arg) | 
|  | if match: | 
|  | overlays.append(match.group(1).strip('\'"')) | 
|  | else: | 
|  | other_args.append(arg) | 
|  |  | 
|  | args[:] = other_args | 
|  | return overlays | 
|  |  | 
|  | overlays = extract_overlays(args) | 
|  |  | 
|  | if os.path.exists(os.path.join(instance.build_dir, | 
|  | "twister", "testcase_extra.conf")): | 
|  | overlays.append(os.path.join(instance.build_dir, | 
|  | "twister", "testcase_extra.conf")) | 
|  |  | 
|  | if overlays: | 
|  | args.append("OVERLAY_CONFIG=\"%s\"" % (" ".join(overlays))) | 
|  |  | 
|  | res = self.run_cmake(args) | 
|  | return res | 
|  |  | 
|  | def build(self): | 
|  | res = self.run_build(['--build', self.build_dir]) | 
|  | return res | 
|  |  | 
|  | def run(self): | 
|  |  | 
|  | instance = self.instance | 
|  |  | 
|  | if instance.handler: | 
|  | if instance.handler.type_str == "device": | 
|  | instance.handler.suite = self.suite | 
|  |  | 
|  | instance.handler.handle() | 
|  |  | 
|  | sys.stdout.flush() | 
|  |  | 
|  | class TestSuite(DisablePyTestCollectionMixin): | 
|  | config_re = re.compile('(CONFIG_[A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$') | 
|  | dt_re = re.compile('([A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$') | 
|  |  | 
|  | tc_schema = scl.yaml_load( | 
|  | os.path.join(ZEPHYR_BASE, | 
|  | "scripts", "schemas", "twister", "testcase-schema.yaml")) | 
|  | quarantine_schema = scl.yaml_load( | 
|  | os.path.join(ZEPHYR_BASE, | 
|  | "scripts", "schemas", "twister", "quarantine-schema.yaml")) | 
|  |  | 
|  | testcase_valid_keys = {"tags": {"type": "set", "required": False}, | 
|  | "type": {"type": "str", "default": "integration"}, | 
|  | "extra_args": {"type": "list"}, | 
|  | "extra_configs": {"type": "list"}, | 
|  | "build_only": {"type": "bool", "default": False}, | 
|  | "build_on_all": {"type": "bool", "default": False}, | 
|  | "skip": {"type": "bool", "default": False}, | 
|  | "slow": {"type": "bool", "default": False}, | 
|  | "timeout": {"type": "int", "default": 60}, | 
|  | "min_ram": {"type": "int", "default": 8}, | 
|  | "depends_on": {"type": "set"}, | 
|  | "min_flash": {"type": "int", "default": 32}, | 
|  | "arch_allow": {"type": "set"}, | 
|  | "arch_exclude": {"type": "set"}, | 
|  | "extra_sections": {"type": "list", "default": []}, | 
|  | "integration_platforms": {"type": "list", "default": []}, | 
|  | "platform_exclude": {"type": "set"}, | 
|  | "platform_allow": {"type": "set"}, | 
|  | "toolchain_exclude": {"type": "set"}, | 
|  | "toolchain_allow": {"type": "set"}, | 
|  | "filter": {"type": "str"}, | 
|  | "harness": {"type": "str"}, | 
|  | "harness_config": {"type": "map", "default": {}} | 
|  | } | 
|  |  | 
|  | RELEASE_DATA = os.path.join(ZEPHYR_BASE, "scripts", "release", | 
|  | "twister_last_release.csv") | 
|  |  | 
|  | SAMPLE_FILENAME = 'sample.yaml' | 
|  | TESTCASE_FILENAME = 'testcase.yaml' | 
|  |  | 
|  | def __init__(self, board_root_list=[], testcase_roots=[], outdir=None): | 
|  |  | 
|  | self.roots = testcase_roots | 
|  | if not isinstance(board_root_list, list): | 
|  | self.board_roots = [board_root_list] | 
|  | else: | 
|  | self.board_roots = board_root_list | 
|  |  | 
|  | # Testsuite Options | 
|  | self.coverage_platform = [] | 
|  | self.build_only = False | 
|  | self.cmake_only = False | 
|  | self.cleanup = False | 
|  | self.enable_slow = False | 
|  | self.device_testing = False | 
|  | self.fixtures = [] | 
|  | self.enable_coverage = False | 
|  | self.enable_ubsan = False | 
|  | self.enable_lsan = False | 
|  | self.enable_asan = False | 
|  | self.enable_valgrind = False | 
|  | self.extra_args = [] | 
|  | self.inline_logs = False | 
|  | self.enable_sizes_report = False | 
|  | self.west_flash = None | 
|  | self.west_runner = None | 
|  | self.generator = None | 
|  | self.generator_cmd = None | 
|  | self.warnings_as_errors = True | 
|  | self.overflow_as_errors = False | 
|  | self.quarantine_verify = False | 
|  |  | 
|  | # Keep track of which test cases we've filtered out and why | 
|  | self.testcases = {} | 
|  | self.quarantine = {} | 
|  | self.platforms = [] | 
|  | self.selected_platforms = [] | 
|  | self.filtered_platforms = [] | 
|  | self.default_platforms = [] | 
|  | self.outdir = os.path.abspath(outdir) | 
|  | self.discards = {} | 
|  | self.load_errors = 0 | 
|  | self.instances = dict() | 
|  |  | 
|  | self.total_platforms = 0 | 
|  | self.start_time = 0 | 
|  | self.duration = 0 | 
|  | self.warnings = 0 | 
|  |  | 
|  | # hardcoded for now | 
|  | self.duts = [] | 
|  |  | 
|  | # run integration tests only | 
|  | self.integration = False | 
|  |  | 
|  | self.pipeline = None | 
|  | self.version = "NA" | 
|  |  | 
|  | def check_zephyr_version(self): | 
|  | try: | 
|  | subproc = subprocess.run(["git", "describe", "--abbrev=12"], | 
|  | stdout=subprocess.PIPE, | 
|  | universal_newlines=True, | 
|  | cwd=ZEPHYR_BASE) | 
|  | if subproc.returncode == 0: | 
|  | self.version = subproc.stdout.strip() | 
|  | logger.info(f"Zephyr version: {self.version}") | 
|  | except OSError: | 
|  | logger.info("Cannot read zephyr version.") | 
|  |  | 
|  | def get_platform_instances(self, platform): | 
|  | filtered_dict = {k:v for k,v in self.instances.items() if k.startswith(platform + "/")} | 
|  | return filtered_dict | 
|  |  | 
|  | def config(self): | 
|  | logger.info("coverage platform: {}".format(self.coverage_platform)) | 
|  |  | 
|  | # Debug Functions | 
|  | @staticmethod | 
|  | def info(what): | 
|  | sys.stdout.write(what + "\n") | 
|  | sys.stdout.flush() | 
|  |  | 
|  | def update_counting(self, results=None, initial=False): | 
|  | results.skipped_configs = 0 | 
|  | results.skipped_cases = 0 | 
|  | for instance in self.instances.values(): | 
|  | if initial: | 
|  | results.cases += len(instance.testcase.cases) | 
|  | if instance.status == 'skipped': | 
|  | results.skipped_configs += 1 | 
|  | results.skipped_cases += len(instance.testcase.cases) | 
|  | elif instance.status == "passed": | 
|  | results.passed += 1 | 
|  | for res in instance.results.values(): | 
|  | if res == 'SKIP': | 
|  | results.skipped_cases += 1 | 
|  |  | 
|  | def compare_metrics(self, filename): | 
|  | # name, datatype, lower results better | 
|  | interesting_metrics = [("ram_size", int, True), | 
|  | ("rom_size", int, True)] | 
|  |  | 
|  | if not os.path.exists(filename): | 
|  | logger.error("Cannot compare metrics, %s not found" % filename) | 
|  | return [] | 
|  |  | 
|  | results = [] | 
|  | saved_metrics = {} | 
|  | with open(filename) as fp: | 
|  | cr = csv.DictReader(fp) | 
|  | for row in cr: | 
|  | d = {} | 
|  | for m, _, _ in interesting_metrics: | 
|  | d[m] = row[m] | 
|  | saved_metrics[(row["test"], row["platform"])] = d | 
|  |  | 
|  | for instance in self.instances.values(): | 
|  | mkey = (instance.testcase.name, instance.platform.name) | 
|  | if mkey not in saved_metrics: | 
|  | continue | 
|  | sm = saved_metrics[mkey] | 
|  | for metric, mtype, lower_better in interesting_metrics: | 
|  | if metric not in instance.metrics: | 
|  | continue | 
|  | if sm[metric] == "": | 
|  | continue | 
|  | delta = instance.metrics.get(metric, 0) - mtype(sm[metric]) | 
|  | if delta == 0: | 
|  | continue | 
|  | results.append((instance, metric, instance.metrics.get(metric, 0), delta, | 
|  | lower_better)) | 
|  | return results | 
|  |  | 
|  | def footprint_reports(self, report, show_footprint, all_deltas, | 
|  | footprint_threshold, last_metrics): | 
|  | if not report: | 
|  | return | 
|  |  | 
|  | logger.debug("running footprint_reports") | 
|  | deltas = self.compare_metrics(report) | 
|  | warnings = 0 | 
|  | if deltas and show_footprint: | 
|  | for i, metric, value, delta, lower_better in deltas: | 
|  | if not all_deltas and ((delta < 0 and lower_better) or | 
|  | (delta > 0 and not lower_better)): | 
|  | continue | 
|  |  | 
|  | percentage = 0 | 
|  | if value > delta: | 
|  | percentage = (float(delta) / float(value - delta)) | 
|  |  | 
|  | if not all_deltas and (percentage < (footprint_threshold / 100.0)): | 
|  | continue | 
|  |  | 
|  | logger.info("{:<25} {:<60} {}{}{}: {} {:<+4}, is now {:6} {:+.2%}".format( | 
|  | i.platform.name, i.testcase.name, Fore.YELLOW, | 
|  | "INFO" if all_deltas else "WARNING", Fore.RESET, | 
|  | metric, delta, value, percentage)) | 
|  | warnings += 1 | 
|  |  | 
|  | if warnings: | 
|  | logger.warning("Deltas based on metrics from last %s" % | 
|  | ("release" if not last_metrics else "run")) | 
|  |  | 
|  | def summary(self, results, unrecognized_sections): | 
|  | failed = 0 | 
|  | run = 0 | 
|  | for instance in self.instances.values(): | 
|  | if instance.status == "failed": | 
|  | failed += 1 | 
|  | elif instance.metrics.get("unrecognized") and not unrecognized_sections: | 
|  | logger.error("%sFAILED%s: %s has unrecognized binary sections: %s" % | 
|  | (Fore.RED, Fore.RESET, instance.name, | 
|  | str(instance.metrics.get("unrecognized", [])))) | 
|  | failed += 1 | 
|  |  | 
|  | if instance.metrics.get('handler_time', None): | 
|  | run += 1 | 
|  |  | 
|  | if results.total and results.total != results.skipped_configs: | 
|  | pass_rate = (float(results.passed) / float(results.total - results.skipped_configs)) | 
|  | else: | 
|  | pass_rate = 0 | 
|  |  | 
|  | logger.info( | 
|  | "{}{} of {}{} test configurations passed ({:.2%}), {}{}{} failed, {} skipped with {}{}{} warnings in {:.2f} seconds".format( | 
|  | Fore.RED if failed else Fore.GREEN, | 
|  | results.passed, | 
|  | results.total - results.skipped_configs, | 
|  | Fore.RESET, | 
|  | pass_rate, | 
|  | Fore.RED if results.failed else Fore.RESET, | 
|  | results.failed, | 
|  | Fore.RESET, | 
|  | results.skipped_configs, | 
|  | Fore.YELLOW if self.warnings else Fore.RESET, | 
|  | self.warnings, | 
|  | Fore.RESET, | 
|  | self.duration)) | 
|  |  | 
|  | self.total_platforms = len(self.platforms) | 
|  | # if we are only building, do not report about tests being executed. | 
|  | if self.platforms and not self.build_only: | 
|  | logger.info("In total {} test cases were executed, {} skipped on {} out of total {} platforms ({:02.2f}%)".format( | 
|  | results.cases - results.skipped_cases, | 
|  | results.skipped_cases, | 
|  | len(self.filtered_platforms), | 
|  | self.total_platforms, | 
|  | (100 * len(self.filtered_platforms) / len(self.platforms)) | 
|  | )) | 
|  |  | 
|  | logger.info(f"{Fore.GREEN}{run}{Fore.RESET} test configurations executed on platforms, \ | 
|  | {Fore.RED}{results.total - run - results.skipped_configs}{Fore.RESET} test configurations were only built.") | 
|  |  | 
|  | def save_reports(self, name, suffix, report_dir, no_update, release, only_failed, platform_reports, json_report): | 
|  | if not self.instances: | 
|  | return | 
|  |  | 
|  | logger.info("Saving reports...") | 
|  | if name: | 
|  | report_name = name | 
|  | else: | 
|  | report_name = "twister" | 
|  |  | 
|  | if report_dir: | 
|  | os.makedirs(report_dir, exist_ok=True) | 
|  | filename = os.path.join(report_dir, report_name) | 
|  | outdir = report_dir | 
|  | else: | 
|  | filename = os.path.join(self.outdir, report_name) | 
|  | outdir = self.outdir | 
|  |  | 
|  | if suffix: | 
|  | filename = "{}_{}".format(filename, suffix) | 
|  |  | 
|  | if not no_update: | 
|  | self.xunit_report(filename + ".xml", full_report=False, | 
|  | append=only_failed, version=self.version) | 
|  | self.xunit_report(filename + "_report.xml", full_report=True, | 
|  | append=only_failed, version=self.version) | 
|  | self.csv_report(filename + ".csv") | 
|  |  | 
|  | if json_report: | 
|  | self.json_report(filename + ".json", append=only_failed, version=self.version) | 
|  |  | 
|  | if platform_reports: | 
|  | self.target_report(outdir, suffix, append=only_failed) | 
|  | if self.discards: | 
|  | self.discard_report(filename + "_discard.csv") | 
|  |  | 
|  | if release: | 
|  | self.csv_report(self.RELEASE_DATA) | 
|  |  | 
|  | def add_configurations(self): | 
|  |  | 
|  | for board_root in self.board_roots: | 
|  | board_root = os.path.abspath(board_root) | 
|  |  | 
|  | logger.debug("Reading platform configuration files under %s..." % | 
|  | board_root) | 
|  |  | 
|  | for file in glob.glob(os.path.join(board_root, "*", "*", "*.yaml")): | 
|  | try: | 
|  | platform = Platform() | 
|  | platform.load(file) | 
|  | if platform.name in [p.name for p in self.platforms]: | 
|  | logger.error(f"Duplicate platform {platform.name} in {file}") | 
|  | raise Exception(f"Duplicate platform identifier {platform.name} found") | 
|  | if platform.twister: | 
|  | self.platforms.append(platform) | 
|  | if platform.default: | 
|  | self.default_platforms.append(platform.name) | 
|  |  | 
|  | except RuntimeError as e: | 
|  | logger.error("E: %s: can't load: %s" % (file, e)) | 
|  | self.load_errors += 1 | 
|  |  | 
|  | def get_all_tests(self): | 
|  | tests = [] | 
|  | for _, tc in self.testcases.items(): | 
|  | for case in tc.cases: | 
|  | tests.append(case) | 
|  |  | 
|  | return tests | 
|  |  | 
|  | @staticmethod | 
|  | def get_toolchain(): | 
|  | toolchain_script = Path(ZEPHYR_BASE) / Path('cmake/verify-toolchain.cmake') | 
|  | result = CMake.run_cmake_script([toolchain_script, "FORMAT=json"]) | 
|  |  | 
|  | try: | 
|  | if result['returncode']: | 
|  | raise TwisterRuntimeError("E: Variable ZEPHYR_TOOLCHAIN_VARIANT is not defined") | 
|  | except Exception as e: | 
|  | print(str(e)) | 
|  | sys.exit(2) | 
|  | toolchain = json.loads(result['stdout'])['ZEPHYR_TOOLCHAIN_VARIANT'] | 
|  | logger.info(f"Using '{toolchain}' toolchain.") | 
|  |  | 
|  | return toolchain | 
|  |  | 
|  | def add_testcases(self, testcase_filter=[]): | 
|  | for root in self.roots: | 
|  | root = os.path.abspath(root) | 
|  |  | 
|  | logger.debug("Reading test case configuration files under %s..." % root) | 
|  |  | 
|  | for dirpath, _, filenames in os.walk(root, topdown=True): | 
|  | if self.SAMPLE_FILENAME in filenames: | 
|  | filename = self.SAMPLE_FILENAME | 
|  | elif self.TESTCASE_FILENAME in filenames: | 
|  | filename = self.TESTCASE_FILENAME | 
|  | else: | 
|  | continue | 
|  |  | 
|  | logger.debug("Found possible test case in " + dirpath) | 
|  |  | 
|  | tc_path = os.path.join(dirpath, filename) | 
|  |  | 
|  | try: | 
|  | parsed_data = TwisterConfigParser(tc_path, self.tc_schema) | 
|  | parsed_data.load() | 
|  |  | 
|  | tc_path = os.path.dirname(tc_path) | 
|  | workdir = os.path.relpath(tc_path, root) | 
|  |  | 
|  | for name in parsed_data.tests.keys(): | 
|  | tc = TestCase(root, workdir, name) | 
|  |  | 
|  | tc_dict = parsed_data.get_test(name, self.testcase_valid_keys) | 
|  |  | 
|  | tc.source_dir = tc_path | 
|  | tc.yamlfile = tc_path | 
|  |  | 
|  | tc.type = tc_dict["type"] | 
|  | tc.tags = tc_dict["tags"] | 
|  | tc.extra_args = tc_dict["extra_args"] | 
|  | tc.extra_configs = tc_dict["extra_configs"] | 
|  | tc.arch_allow = tc_dict["arch_allow"] | 
|  | tc.arch_exclude = tc_dict["arch_exclude"] | 
|  | tc.skip = tc_dict["skip"] | 
|  | tc.platform_exclude = tc_dict["platform_exclude"] | 
|  | tc.platform_allow = tc_dict["platform_allow"] | 
|  | tc.toolchain_exclude = tc_dict["toolchain_exclude"] | 
|  | tc.toolchain_allow = tc_dict["toolchain_allow"] | 
|  | tc.tc_filter = tc_dict["filter"] | 
|  | tc.timeout = tc_dict["timeout"] | 
|  | tc.harness = tc_dict["harness"] | 
|  | tc.harness_config = tc_dict["harness_config"] | 
|  | if tc.harness == 'console' and not tc.harness_config: | 
|  | raise Exception('Harness config error: console harness defined without a configuration.') | 
|  | tc.build_only = tc_dict["build_only"] | 
|  | tc.build_on_all = tc_dict["build_on_all"] | 
|  | tc.slow = tc_dict["slow"] | 
|  | tc.min_ram = tc_dict["min_ram"] | 
|  | tc.depends_on = tc_dict["depends_on"] | 
|  | tc.min_flash = tc_dict["min_flash"] | 
|  | tc.extra_sections = tc_dict["extra_sections"] | 
|  | tc.integration_platforms = tc_dict["integration_platforms"] | 
|  |  | 
|  | tc.parse_subcases(tc_path) | 
|  |  | 
|  | if testcase_filter: | 
|  | if tc.name and tc.name in testcase_filter: | 
|  | self.testcases[tc.name] = tc | 
|  | else: | 
|  | self.testcases[tc.name] = tc | 
|  |  | 
|  | except Exception as e: | 
|  | logger.error("%s: can't load (skipping): %s" % (tc_path, e)) | 
|  | self.load_errors += 1 | 
|  | return len(self.testcases) | 
|  |  | 
|  | def get_platform(self, name): | 
|  | selected_platform = None | 
|  | for platform in self.platforms: | 
|  | if platform.name == name: | 
|  | selected_platform = platform | 
|  | break | 
|  | return selected_platform | 
|  |  | 
|  | def load_quarantine(self, file): | 
|  | """ | 
|  | Loads quarantine list from the given yaml file. Creates a dictionary | 
|  | of all tests configurations (platform + scenario: comment) that shall be | 
|  | skipped due to quarantine | 
|  | """ | 
|  |  | 
|  | # Load yaml into quarantine_yaml | 
|  | quarantine_yaml = scl.yaml_load_verify(file, self.quarantine_schema) | 
|  |  | 
|  | # Create quarantine_list with a product of the listed | 
|  | # platforms and scenarios for each entry in quarantine yaml | 
|  | quarantine_list = [] | 
|  | for quar_dict in quarantine_yaml: | 
|  | if quar_dict['platforms'][0] == "all": | 
|  | plat = [p.name for p in self.platforms] | 
|  | else: | 
|  | plat = quar_dict['platforms'] | 
|  | comment = quar_dict.get('comment', "NA") | 
|  | quarantine_list.append([{".".join([p, s]): comment} | 
|  | for p in plat for s in quar_dict['scenarios']]) | 
|  |  | 
|  | # Flatten the quarantine_list | 
|  | quarantine_list = [it for sublist in quarantine_list for it in sublist] | 
|  | # Change quarantine_list into a dictionary | 
|  | for d in quarantine_list: | 
|  | self.quarantine.update(d) | 
|  |  | 
|  | def load_from_file(self, file, filter_status=[], filter_platform=[]): | 
|  | try: | 
|  | with open(file, "r") as fp: | 
|  | cr = csv.DictReader(fp) | 
|  | instance_list = [] | 
|  | for row in cr: | 
|  | if row["status"] in filter_status: | 
|  | continue | 
|  | test = row["test"] | 
|  |  | 
|  | platform = self.get_platform(row["platform"]) | 
|  | if filter_platform and platform.name not in filter_platform: | 
|  | continue | 
|  | instance = TestInstance(self.testcases[test], platform, self.outdir) | 
|  | if self.device_testing: | 
|  | tfilter = 'runnable' | 
|  | else: | 
|  | tfilter = 'buildable' | 
|  | instance.run = instance.check_runnable( | 
|  | self.enable_slow, | 
|  | tfilter, | 
|  | self.fixtures | 
|  | ) | 
|  | instance.create_overlay(platform, self.enable_asan, self.enable_ubsan, self.enable_coverage, self.coverage_platform) | 
|  | instance_list.append(instance) | 
|  | self.add_instances(instance_list) | 
|  |  | 
|  | except KeyError as e: | 
|  | logger.error("Key error while parsing tests file.({})".format(str(e))) | 
|  | sys.exit(2) | 
|  |  | 
|  | except FileNotFoundError as e: | 
|  | logger.error("Couldn't find input file with list of tests. ({})".format(e)) | 
|  | sys.exit(2) | 
|  |  | 
|  | def apply_filters(self, **kwargs): | 
|  |  | 
|  | toolchain = self.get_toolchain() | 
|  |  | 
|  | discards = {} | 
|  | platform_filter = kwargs.get('platform') | 
|  | exclude_platform = kwargs.get('exclude_platform', []) | 
|  | testcase_filter = kwargs.get('run_individual_tests', []) | 
|  | arch_filter = kwargs.get('arch') | 
|  | tag_filter = kwargs.get('tag') | 
|  | exclude_tag = kwargs.get('exclude_tag') | 
|  | all_filter = kwargs.get('all') | 
|  | runnable = kwargs.get('runnable') | 
|  | force_toolchain = kwargs.get('force_toolchain') | 
|  | force_platform = kwargs.get('force_platform') | 
|  | emu_filter = kwargs.get('emulation_only') | 
|  |  | 
|  | logger.debug("platform filter: " + str(platform_filter)) | 
|  | logger.debug("    arch_filter: " + str(arch_filter)) | 
|  | logger.debug("     tag_filter: " + str(tag_filter)) | 
|  | logger.debug("    exclude_tag: " + str(exclude_tag)) | 
|  |  | 
|  | default_platforms = False | 
|  | emulation_platforms = False | 
|  |  | 
|  |  | 
|  | if all_filter: | 
|  | logger.info("Selecting all possible platforms per test case") | 
|  | # When --all used, any --platform arguments ignored | 
|  | platform_filter = [] | 
|  | elif not platform_filter and not emu_filter: | 
|  | logger.info("Selecting default platforms per test case") | 
|  | default_platforms = True | 
|  | elif emu_filter: | 
|  | logger.info("Selecting emulation platforms per test case") | 
|  | emulation_platforms = True | 
|  |  | 
|  | if platform_filter: | 
|  | platforms = list(filter(lambda p: p.name in platform_filter, self.platforms)) | 
|  | elif emu_filter: | 
|  | platforms = list(filter(lambda p: p.simulation != 'na', self.platforms)) | 
|  | elif arch_filter: | 
|  | platforms = list(filter(lambda p: p.arch in arch_filter, self.platforms)) | 
|  | elif default_platforms: | 
|  | platforms = list(filter(lambda p: p.default, self.platforms)) | 
|  | else: | 
|  | platforms = self.platforms | 
|  |  | 
|  | logger.info("Building initial testcase list...") | 
|  |  | 
|  | for tc_name, tc in self.testcases.items(): | 
|  |  | 
|  | if tc.build_on_all and not platform_filter: | 
|  | platform_scope = self.platforms | 
|  | elif tc.integration_platforms and self.integration: | 
|  | platform_scope = list(filter(lambda item: item.name in tc.integration_platforms, \ | 
|  | self.platforms)) | 
|  | else: | 
|  | platform_scope = platforms | 
|  |  | 
|  | integration = self.integration and tc.integration_platforms | 
|  |  | 
|  | # If there isn't any overlap between the platform_allow list and the platform_scope | 
|  | # we set the scope to the platform_allow list | 
|  | if tc.platform_allow and not platform_filter and not integration: | 
|  | a = set(platform_scope) | 
|  | b = set(filter(lambda item: item.name in tc.platform_allow, self.platforms)) | 
|  | c = a.intersection(b) | 
|  | if not c: | 
|  | platform_scope = list(filter(lambda item: item.name in tc.platform_allow, \ | 
|  | self.platforms)) | 
|  |  | 
|  | # list of instances per testcase, aka configurations. | 
|  | instance_list = [] | 
|  | for plat in platform_scope: | 
|  | instance = TestInstance(tc, plat, self.outdir) | 
|  | if runnable: | 
|  | tfilter = 'runnable' | 
|  | else: | 
|  | tfilter = 'buildable' | 
|  |  | 
|  | instance.run = instance.check_runnable( | 
|  | self.enable_slow, | 
|  | tfilter, | 
|  | self.fixtures | 
|  | ) | 
|  |  | 
|  | for t in tc.cases: | 
|  | instance.results[t] = None | 
|  |  | 
|  | if runnable and self.duts: | 
|  | for h in self.duts: | 
|  | if h.platform == plat.name: | 
|  | if tc.harness_config.get('fixture') in h.fixtures: | 
|  | instance.run = True | 
|  |  | 
|  | if not force_platform and plat.name in exclude_platform: | 
|  | discards[instance] = discards.get(instance, "Platform is excluded on command line.") | 
|  |  | 
|  | if (plat.arch == "unit") != (tc.type == "unit"): | 
|  | # Discard silently | 
|  | continue | 
|  |  | 
|  | if runnable and not instance.run: | 
|  | discards[instance] = discards.get(instance, "Not runnable on device") | 
|  |  | 
|  | if self.integration and tc.integration_platforms and plat.name not in tc.integration_platforms: | 
|  | discards[instance] = discards.get(instance, "Not part of integration platforms") | 
|  |  | 
|  | if tc.skip: | 
|  | discards[instance] = discards.get(instance, "Skip filter") | 
|  |  | 
|  | if tag_filter and not tc.tags.intersection(tag_filter): | 
|  | discards[instance] = discards.get(instance, "Command line testcase tag filter") | 
|  |  | 
|  | if exclude_tag and tc.tags.intersection(exclude_tag): | 
|  | discards[instance] = discards.get(instance, "Command line testcase exclude filter") | 
|  |  | 
|  | if testcase_filter and tc_name not in testcase_filter: | 
|  | discards[instance] = discards.get(instance, "Testcase name filter") | 
|  |  | 
|  | if arch_filter and plat.arch not in arch_filter: | 
|  | discards[instance] = discards.get(instance, "Command line testcase arch filter") | 
|  |  | 
|  | if not force_platform: | 
|  |  | 
|  | if tc.arch_allow and plat.arch not in tc.arch_allow: | 
|  | discards[instance] = discards.get(instance, "Not in test case arch allow list") | 
|  |  | 
|  | if tc.arch_exclude and plat.arch in tc.arch_exclude: | 
|  | discards[instance] = discards.get(instance, "In test case arch exclude") | 
|  |  | 
|  | if tc.platform_exclude and plat.name in tc.platform_exclude: | 
|  | discards[instance] = discards.get(instance, "In test case platform exclude") | 
|  |  | 
|  | if tc.toolchain_exclude and toolchain in tc.toolchain_exclude: | 
|  | discards[instance] = discards.get(instance, "In test case toolchain exclude") | 
|  |  | 
|  | if platform_filter and plat.name not in platform_filter: | 
|  | discards[instance] = discards.get(instance, "Command line platform filter") | 
|  |  | 
|  | if tc.platform_allow and plat.name not in tc.platform_allow: | 
|  | discards[instance] = discards.get(instance, "Not in testcase platform allow list") | 
|  |  | 
|  | if tc.toolchain_allow and toolchain not in tc.toolchain_allow: | 
|  | discards[instance] = discards.get(instance, "Not in testcase toolchain allow list") | 
|  |  | 
|  | if not plat.env_satisfied: | 
|  | discards[instance] = discards.get(instance, "Environment ({}) not satisfied".format(", ".join(plat.env))) | 
|  |  | 
|  | if not force_toolchain \ | 
|  | and toolchain and (toolchain not in plat.supported_toolchains) \ | 
|  | and tc.type != 'unit': | 
|  | discards[instance] = discards.get(instance, "Not supported by the toolchain") | 
|  |  | 
|  | if plat.ram < tc.min_ram: | 
|  | discards[instance] = discards.get(instance, "Not enough RAM") | 
|  |  | 
|  | if tc.depends_on: | 
|  | dep_intersection = tc.depends_on.intersection(set(plat.supported)) | 
|  | if dep_intersection != set(tc.depends_on): | 
|  | discards[instance] = discards.get(instance, "No hardware support") | 
|  |  | 
|  | if plat.flash < tc.min_flash: | 
|  | discards[instance] = discards.get(instance, "Not enough FLASH") | 
|  |  | 
|  | if set(plat.ignore_tags) & tc.tags: | 
|  | discards[instance] = discards.get(instance, "Excluded tags per platform (exclude_tags)") | 
|  |  | 
|  | if plat.only_tags and not set(plat.only_tags) & tc.tags: | 
|  | discards[instance] = discards.get(instance, "Excluded tags per platform (only_tags)") | 
|  |  | 
|  | test_configuration = ".".join([instance.platform.name, | 
|  | instance.testcase.id]) | 
|  | # skip quarantined tests | 
|  | if test_configuration in self.quarantine and not self.quarantine_verify: | 
|  | discards[instance] = discards.get(instance, | 
|  | f"Quarantine: {self.quarantine[test_configuration]}") | 
|  | # run only quarantined test to verify their statuses (skip everything else) | 
|  | if self.quarantine_verify and test_configuration not in self.quarantine: | 
|  | discards[instance] = discards.get(instance, "Not under quarantine") | 
|  |  | 
|  | # if nothing stopped us until now, it means this configuration | 
|  | # needs to be added. | 
|  | instance_list.append(instance) | 
|  |  | 
|  | # no configurations, so jump to next testcase | 
|  | if not instance_list: | 
|  | continue | 
|  |  | 
|  | # if twister was launched with no platform options at all, we | 
|  | # take all default platforms | 
|  | if default_platforms and not tc.build_on_all and not integration: | 
|  | if tc.platform_allow: | 
|  | a = set(self.default_platforms) | 
|  | b = set(tc.platform_allow) | 
|  | c = a.intersection(b) | 
|  | if c: | 
|  | aa = list(filter(lambda tc: tc.platform.name in c, instance_list)) | 
|  | self.add_instances(aa) | 
|  | else: | 
|  | self.add_instances(instance_list) | 
|  | else: | 
|  | instances = list(filter(lambda tc: tc.platform.default, instance_list)) | 
|  | self.add_instances(instances) | 
|  | elif integration: | 
|  | instances = list(filter(lambda item:  item.platform.name in tc.integration_platforms, instance_list)) | 
|  | self.add_instances(instances) | 
|  |  | 
|  |  | 
|  |  | 
|  | elif emulation_platforms: | 
|  | self.add_instances(instance_list) | 
|  | for instance in list(filter(lambda inst: not inst.platform.simulation != 'na', instance_list)): | 
|  | discards[instance] = discards.get(instance, "Not an emulated platform") | 
|  | else: | 
|  | self.add_instances(instance_list) | 
|  |  | 
|  | for _, case in self.instances.items(): | 
|  | case.create_overlay(case.platform, self.enable_asan, self.enable_ubsan, self.enable_coverage, self.coverage_platform) | 
|  |  | 
|  | self.discards = discards | 
|  | self.selected_platforms = set(p.platform.name for p in self.instances.values()) | 
|  |  | 
|  | for instance in self.discards: | 
|  | instance.reason = self.discards[instance] | 
|  | # If integration mode is on all skips on integration_platforms are treated as errors. | 
|  | # TODO: add quarantine relief here when PR with quarantine feature gets merged | 
|  | if self.integration and instance.platform.name in instance.testcase.integration_platforms: | 
|  | instance.status = "error" | 
|  | instance.reason += " but is one of the integration platforms" | 
|  | instance.fill_results_by_status() | 
|  | self.instances[instance.name] = instance | 
|  | else: | 
|  | instance.status = "skipped" | 
|  | instance.fill_results_by_status() | 
|  |  | 
|  | self.filtered_platforms = set(p.platform.name for p in self.instances.values() | 
|  | if p.status != "skipped" ) | 
|  |  | 
|  | return discards | 
|  |  | 
|  | def add_instances(self, instance_list): | 
|  | for instance in instance_list: | 
|  | self.instances[instance.name] = instance | 
|  |  | 
|  | @staticmethod | 
|  | def calc_one_elf_size(instance): | 
|  | if instance.status not in ["error", "failed", "skipped"]: | 
|  | if instance.platform.type != "native": | 
|  | size_calc = instance.calculate_sizes() | 
|  | instance.metrics["ram_size"] = size_calc.get_ram_size() | 
|  | instance.metrics["rom_size"] = size_calc.get_rom_size() | 
|  | instance.metrics["unrecognized"] = size_calc.unrecognized_sections() | 
|  | else: | 
|  | instance.metrics["ram_size"] = 0 | 
|  | instance.metrics["rom_size"] = 0 | 
|  | instance.metrics["unrecognized"] = [] | 
|  |  | 
|  | instance.metrics["handler_time"] = instance.handler.duration if instance.handler else 0 | 
|  |  | 
|  | def add_tasks_to_queue(self, pipeline, build_only=False, test_only=False): | 
|  | for instance in self.instances.values(): | 
|  | if build_only: | 
|  | instance.run = False | 
|  |  | 
|  | if test_only and instance.run: | 
|  | pipeline.put({"op": "run", "test": instance}) | 
|  | else: | 
|  | if instance.status not in ['passed', 'skipped', 'error']: | 
|  | logger.debug(f"adding {instance.name}") | 
|  | instance.status = None | 
|  | pipeline.put({"op": "cmake", "test": instance}) | 
|  | # If the instance got 'error' status before, proceed to the report stage | 
|  | if instance.status == "error": | 
|  | pipeline.put({"op": "report", "test": instance}) | 
|  |  | 
|  | def pipeline_mgr(self, pipeline, done_queue, lock, results): | 
|  | while True: | 
|  | try: | 
|  | task = pipeline.get_nowait() | 
|  | except queue.Empty: | 
|  | break | 
|  | else: | 
|  | test = task['test'] | 
|  | pb = ProjectBuilder(self, | 
|  | test, | 
|  | lsan=self.enable_lsan, | 
|  | asan=self.enable_asan, | 
|  | ubsan=self.enable_ubsan, | 
|  | coverage=self.enable_coverage, | 
|  | extra_args=self.extra_args, | 
|  | device_testing=self.device_testing, | 
|  | cmake_only=self.cmake_only, | 
|  | cleanup=self.cleanup, | 
|  | valgrind=self.enable_valgrind, | 
|  | inline_logs=self.inline_logs, | 
|  | generator=self.generator, | 
|  | generator_cmd=self.generator_cmd, | 
|  | verbose=self.verbose, | 
|  | warnings_as_errors=self.warnings_as_errors, | 
|  | overflow_as_errors=self.overflow_as_errors | 
|  | ) | 
|  | pb.process(pipeline, done_queue, task, lock, results) | 
|  |  | 
|  | return True | 
|  |  | 
|  | def execute(self, pipeline, done, results): | 
|  | lock = Lock() | 
|  | logger.info("Adding tasks to the queue...") | 
|  | self.add_tasks_to_queue(pipeline, self.build_only, self.test_only) | 
|  | logger.info("Added initial list of jobs to queue") | 
|  |  | 
|  | processes = [] | 
|  | for job in range(self.jobs): | 
|  | logger.debug(f"Launch process {job}") | 
|  | p = Process(target=self.pipeline_mgr, args=(pipeline, done, lock, results, )) | 
|  | processes.append(p) | 
|  | p.start() | 
|  |  | 
|  | try: | 
|  | for p in processes: | 
|  | p.join() | 
|  | except KeyboardInterrupt: | 
|  | logger.info("Execution interrupted") | 
|  | for p in processes: | 
|  | p.terminate() | 
|  |  | 
|  | # FIXME: This needs to move out. | 
|  | if self.enable_size_report and not self.cmake_only: | 
|  | # Parallelize size calculation | 
|  | executor = concurrent.futures.ThreadPoolExecutor(self.jobs) | 
|  | futures = [executor.submit(self.calc_one_elf_size, instance) | 
|  | for instance in self.instances.values()] | 
|  | concurrent.futures.wait(futures) | 
|  | else: | 
|  | for instance in self.instances.values(): | 
|  | instance.metrics["ram_size"] = 0 | 
|  | instance.metrics["rom_size"] = 0 | 
|  | instance.metrics["handler_time"] = instance.handler.duration if instance.handler else 0 | 
|  | instance.metrics["unrecognized"] = [] | 
|  |  | 
|  | return results | 
|  |  | 
|  | def discard_report(self, filename): | 
|  |  | 
|  | try: | 
|  | if not self.discards: | 
|  | raise TwisterRuntimeError("apply_filters() hasn't been run!") | 
|  | except Exception as e: | 
|  | logger.error(str(e)) | 
|  | sys.exit(2) | 
|  | with open(filename, "wt") as csvfile: | 
|  | fieldnames = ["test", "arch", "platform", "reason"] | 
|  | cw = csv.DictWriter(csvfile, fieldnames, lineterminator=os.linesep) | 
|  | cw.writeheader() | 
|  | for instance, reason in sorted(self.discards.items()): | 
|  | rowdict = {"test": instance.testcase.name, | 
|  | "arch": instance.platform.arch, | 
|  | "platform": instance.platform.name, | 
|  | "reason": reason} | 
|  | cw.writerow(rowdict) | 
|  |  | 
|  | def target_report(self, outdir, suffix, append=False): | 
|  | platforms = {inst.platform.name for _, inst in self.instances.items()} | 
|  | for platform in platforms: | 
|  | if suffix: | 
|  | filename = os.path.join(outdir,"{}_{}.xml".format(platform, suffix)) | 
|  | else: | 
|  | filename = os.path.join(outdir,"{}.xml".format(platform)) | 
|  | self.xunit_report(filename, platform, full_report=True, | 
|  | append=append, version=self.version) | 
|  |  | 
|  |  | 
|  | @staticmethod | 
|  | def process_log(log_file): | 
|  | filtered_string = "" | 
|  | if os.path.exists(log_file): | 
|  | with open(log_file, "rb") as f: | 
|  | log = f.read().decode("utf-8") | 
|  | filtered_string = ''.join(filter(lambda x: x in string.printable, log)) | 
|  |  | 
|  | return filtered_string | 
|  |  | 
|  |  | 
|  | def xunit_report(self, filename, platform=None, full_report=False, append=False, version="NA"): | 
|  | total = 0 | 
|  | fails = passes = errors = skips = 0 | 
|  | if platform: | 
|  | selected = [platform] | 
|  | logger.info(f"Writing target report for {platform}...") | 
|  | else: | 
|  | logger.info(f"Writing xunit report {filename}...") | 
|  | selected = self.selected_platforms | 
|  |  | 
|  | if os.path.exists(filename) and append: | 
|  | tree = ET.parse(filename) | 
|  | eleTestsuites = tree.getroot() | 
|  | else: | 
|  | eleTestsuites = ET.Element('testsuites') | 
|  |  | 
|  | for p in selected: | 
|  | inst = self.get_platform_instances(p) | 
|  | fails = 0 | 
|  | passes = 0 | 
|  | errors = 0 | 
|  | skips = 0 | 
|  | duration = 0 | 
|  |  | 
|  | for _, instance in inst.items(): | 
|  | handler_time = instance.metrics.get('handler_time', 0) | 
|  | duration += handler_time | 
|  | if full_report and instance.run: | 
|  | for k in instance.results.keys(): | 
|  | if instance.results[k] == 'PASS': | 
|  | passes += 1 | 
|  | elif instance.results[k] == 'BLOCK': | 
|  | errors += 1 | 
|  | elif instance.results[k] == 'SKIP' or instance.status in ['skipped']: | 
|  | skips += 1 | 
|  | else: | 
|  | fails += 1 | 
|  | else: | 
|  | if instance.status in ["error", "failed", "timeout", "flash_error"]: | 
|  | if instance.reason in ['build_error', 'handler_crash']: | 
|  | errors += 1 | 
|  | else: | 
|  | fails += 1 | 
|  | elif instance.status == 'skipped': | 
|  | skips += 1 | 
|  | elif instance.status == 'passed': | 
|  | passes += 1 | 
|  | else: | 
|  | if instance.status: | 
|  | logger.error(f"{instance.name}: Unknown status {instance.status}") | 
|  | else: | 
|  | logger.error(f"{instance.name}: No status") | 
|  |  | 
|  | total = (errors + passes + fails + skips) | 
|  | # do not produce a report if no tests were actually run (only built) | 
|  | if total == 0: | 
|  | continue | 
|  |  | 
|  | run = p | 
|  | eleTestsuite = None | 
|  |  | 
|  | # When we re-run the tests, we re-use the results and update only with | 
|  | # the newly run tests. | 
|  | if os.path.exists(filename) and append: | 
|  | ts = eleTestsuites.findall(f'testsuite/[@name="{p}"]') | 
|  | if ts: | 
|  | eleTestsuite = ts[0] | 
|  | eleTestsuite.attrib['failures'] = "%d" % fails | 
|  | eleTestsuite.attrib['errors'] = "%d" % errors | 
|  | eleTestsuite.attrib['skipped'] = "%d" % skips | 
|  | else: | 
|  | logger.info(f"Did not find any existing results for {p}") | 
|  | eleTestsuite = ET.SubElement(eleTestsuites, 'testsuite', | 
|  | name=run, time="%f" % duration, | 
|  | tests="%d" % (total), | 
|  | failures="%d" % fails, | 
|  | errors="%d" % (errors), skipped="%s" % (skips)) | 
|  | eleTSPropetries = ET.SubElement(eleTestsuite, 'properties') | 
|  | # Multiple 'property' can be added to 'properties' | 
|  | # differing by name and value | 
|  | ET.SubElement(eleTSPropetries, 'property', name="version", value=version) | 
|  |  | 
|  | else: | 
|  | eleTestsuite = ET.SubElement(eleTestsuites, 'testsuite', | 
|  | name=run, time="%f" % duration, | 
|  | tests="%d" % (total), | 
|  | failures="%d" % fails, | 
|  | errors="%d" % (errors), skipped="%s" % (skips)) | 
|  | eleTSPropetries = ET.SubElement(eleTestsuite, 'properties') | 
|  | # Multiple 'property' can be added to 'properties' | 
|  | # differing by name and value | 
|  | ET.SubElement(eleTSPropetries, 'property', name="version", value=version) | 
|  |  | 
|  | for _, instance in inst.items(): | 
|  | if full_report: | 
|  | tname = os.path.basename(instance.testcase.name) | 
|  | else: | 
|  | tname = instance.testcase.id | 
|  | handler_time = instance.metrics.get('handler_time', 0) | 
|  |  | 
|  | if full_report: | 
|  | for k in instance.results.keys(): | 
|  | # remove testcases that are being re-run from exiting reports | 
|  | for tc in eleTestsuite.findall(f'testcase/[@name="{k}"]'): | 
|  | eleTestsuite.remove(tc) | 
|  |  | 
|  | classname = ".".join(tname.split(".")[:2]) | 
|  | eleTestcase = ET.SubElement( | 
|  | eleTestsuite, 'testcase', | 
|  | classname=classname, | 
|  | name="%s" % (k), time="%f" % handler_time) | 
|  | if instance.results[k] in ['FAIL', 'BLOCK'] or \ | 
|  | (not instance.run and instance.status in ["error", "failed", "timeout"]): | 
|  | if instance.results[k] == 'FAIL': | 
|  | el = ET.SubElement( | 
|  | eleTestcase, | 
|  | 'failure', | 
|  | type="failure", | 
|  | message="failed") | 
|  | else: | 
|  | el = ET.SubElement( | 
|  | eleTestcase, | 
|  | 'error', | 
|  | type="failure", | 
|  | message=instance.reason) | 
|  | log_root = os.path.join(self.outdir, instance.platform.name, instance.testcase.name) | 
|  | log_file = os.path.join(log_root, "handler.log") | 
|  | el.text = self.process_log(log_file) | 
|  |  | 
|  | elif instance.results[k] == 'PASS' \ | 
|  | or (not instance.run and instance.status in ["passed"]): | 
|  | pass | 
|  | elif instance.results[k] == 'SKIP' or (instance.status in ["skipped"]): | 
|  | el = ET.SubElement(eleTestcase, 'skipped', type="skipped", message=instance.reason) | 
|  | else: | 
|  | el = ET.SubElement( | 
|  | eleTestcase, | 
|  | 'error', | 
|  | type="error", | 
|  | message=f"{instance.reason}") | 
|  | else: | 
|  | if platform: | 
|  | classname = ".".join(instance.testcase.name.split(".")[:2]) | 
|  | else: | 
|  | classname = p + ":" + ".".join(instance.testcase.name.split(".")[:2]) | 
|  |  | 
|  | # remove testcases that are being re-run from exiting reports | 
|  | for tc in eleTestsuite.findall(f'testcase/[@classname="{classname}"][@name="{instance.testcase.name}"]'): | 
|  | eleTestsuite.remove(tc) | 
|  |  | 
|  | eleTestcase = ET.SubElement(eleTestsuite, 'testcase', | 
|  | classname=classname, | 
|  | name="%s" % (instance.testcase.name), | 
|  | time="%f" % handler_time) | 
|  |  | 
|  | if instance.status in ["error", "failed", "timeout", "flash_error"]: | 
|  | failure = ET.SubElement( | 
|  | eleTestcase, | 
|  | 'failure', | 
|  | type="failure", | 
|  | message=instance.reason) | 
|  |  | 
|  | log_root = ("%s/%s/%s" % (self.outdir, instance.platform.name, instance.testcase.name)) | 
|  | bl = os.path.join(log_root, "build.log") | 
|  | hl = os.path.join(log_root, "handler.log") | 
|  | log_file = bl | 
|  | if instance.reason != 'Build error': | 
|  | if os.path.exists(hl): | 
|  | log_file = hl | 
|  | else: | 
|  | log_file = bl | 
|  |  | 
|  | failure.text = self.process_log(log_file) | 
|  |  | 
|  | elif instance.status == "skipped": | 
|  | ET.SubElement(eleTestcase, 'skipped', type="skipped", message="Skipped") | 
|  |  | 
|  | result = ET.tostring(eleTestsuites) | 
|  | with open(filename, 'wb') as report: | 
|  | report.write(result) | 
|  |  | 
|  | return fails, passes, errors, skips | 
|  |  | 
|  | def csv_report(self, filename): | 
|  | with open(filename, "wt") as csvfile: | 
|  | fieldnames = ["test", "arch", "platform", "status", | 
|  | "extra_args", "handler", "handler_time", "ram_size", | 
|  | "rom_size"] | 
|  | cw = csv.DictWriter(csvfile, fieldnames, lineterminator=os.linesep) | 
|  | cw.writeheader() | 
|  | for instance in self.instances.values(): | 
|  | rowdict = {"test": instance.testcase.name, | 
|  | "arch": instance.platform.arch, | 
|  | "platform": instance.platform.name, | 
|  | "extra_args": " ".join(instance.testcase.extra_args), | 
|  | "handler": instance.platform.simulation} | 
|  |  | 
|  | rowdict["status"] = instance.status | 
|  | if instance.status not in ["error", "failed", "timeout"]: | 
|  | if instance.handler: | 
|  | rowdict["handler_time"] = instance.metrics.get("handler_time", 0) | 
|  | ram_size = instance.metrics.get("ram_size", 0) | 
|  | rom_size = instance.metrics.get("rom_size", 0) | 
|  | rowdict["ram_size"] = ram_size | 
|  | rowdict["rom_size"] = rom_size | 
|  | cw.writerow(rowdict) | 
|  |  | 
|  | def json_report(self, filename, append=False, version="NA"): | 
|  | logger.info(f"Writing JSON report {filename}") | 
|  | report = {} | 
|  | selected = self.selected_platforms | 
|  | report["environment"] = {"os": os.name, | 
|  | "zephyr_version": version, | 
|  | "toolchain": self.get_toolchain() | 
|  | } | 
|  | json_data = {} | 
|  | if os.path.exists(filename) and append: | 
|  | with open(filename, 'r') as json_file: | 
|  | json_data = json.load(json_file) | 
|  |  | 
|  | suites = json_data.get("testsuites", []) | 
|  | if suites: | 
|  | suite = suites[0] | 
|  | testcases = suite.get("testcases", []) | 
|  | else: | 
|  | suite = {} | 
|  | testcases = [] | 
|  |  | 
|  | for p in selected: | 
|  | inst = self.get_platform_instances(p) | 
|  | for _, instance in inst.items(): | 
|  | testcase = {} | 
|  | handler_log = os.path.join(instance.build_dir, "handler.log") | 
|  | build_log = os.path.join(instance.build_dir, "build.log") | 
|  | device_log = os.path.join(instance.build_dir, "device.log") | 
|  |  | 
|  | handler_time = instance.metrics.get('handler_time', 0) | 
|  | ram_size = instance.metrics.get ("ram_size", 0) | 
|  | rom_size  = instance.metrics.get("rom_size",0) | 
|  |  | 
|  | for k in instance.results.keys(): | 
|  | testcases = list(filter(lambda d: not (d.get('testcase') == k and d.get('platform') == p), testcases )) | 
|  | testcase = {"testcase": k, | 
|  | "arch": instance.platform.arch, | 
|  | "platform": p, | 
|  | } | 
|  | if ram_size: | 
|  | testcase["ram_size"] = ram_size | 
|  | if rom_size: | 
|  | testcase["rom_size"] = rom_size | 
|  |  | 
|  | if instance.results[k] in ["PASS"]: | 
|  | testcase["status"] = "passed" | 
|  | if instance.handler: | 
|  | testcase["execution_time"] =  handler_time | 
|  |  | 
|  | elif instance.results[k] in ['FAIL', 'BLOCK'] or instance.status in ["error", "failed", "timeout"]: | 
|  | testcase["status"] = "failed" | 
|  | testcase["reason"] = instance.reason | 
|  | testcase["execution_time"] =  handler_time | 
|  | if os.path.exists(handler_log): | 
|  | testcase["test_output"] = self.process_log(handler_log) | 
|  | elif os.path.exists(device_log): | 
|  | testcase["device_log"] = self.process_log(device_log) | 
|  | else: | 
|  | testcase["build_log"] = self.process_log(build_log) | 
|  | else: | 
|  | testcase["status"] = "skipped" | 
|  | testcase["reason"] = instance.reason | 
|  | testcases.append(testcase) | 
|  |  | 
|  | suites = [ {"testcases": testcases} ] | 
|  | report["testsuites"] = suites | 
|  |  | 
|  | with open(filename, "wt") as json_file: | 
|  | json.dump(report, json_file, indent=4, separators=(',',':')) | 
|  |  | 
|  | def get_testcase(self, identifier): | 
|  | results = [] | 
|  | for _, tc in self.testcases.items(): | 
|  | for case in tc.cases: | 
|  | if case == identifier: | 
|  | results.append(tc) | 
|  | return results | 
|  |  | 
|  | class CoverageTool: | 
|  | """ Base class for every supported coverage tool | 
|  | """ | 
|  |  | 
|  | def __init__(self): | 
|  | self.gcov_tool = None | 
|  | self.base_dir = None | 
|  |  | 
|  | @staticmethod | 
|  | def factory(tool): | 
|  | if tool == 'lcov': | 
|  | t =  Lcov() | 
|  | elif tool == 'gcovr': | 
|  | t =  Gcovr() | 
|  | else: | 
|  | logger.error("Unsupported coverage tool specified: {}".format(tool)) | 
|  | return None | 
|  |  | 
|  | logger.debug(f"Select {tool} as the coverage tool...") | 
|  | return t | 
|  |  | 
|  | @staticmethod | 
|  | def retrieve_gcov_data(intput_file): | 
|  | logger.debug("Working on %s" % intput_file) | 
|  | extracted_coverage_info = {} | 
|  | capture_data = False | 
|  | capture_complete = False | 
|  | with open(intput_file, 'r') as fp: | 
|  | for line in fp.readlines(): | 
|  | if re.search("GCOV_COVERAGE_DUMP_START", line): | 
|  | capture_data = True | 
|  | continue | 
|  | if re.search("GCOV_COVERAGE_DUMP_END", line): | 
|  | capture_complete = True | 
|  | break | 
|  | # Loop until the coverage data is found. | 
|  | if not capture_data: | 
|  | continue | 
|  | if line.startswith("*"): | 
|  | sp = line.split("<") | 
|  | if len(sp) > 1: | 
|  | # Remove the leading delimiter "*" | 
|  | file_name = sp[0][1:] | 
|  | # Remove the trailing new line char | 
|  | hex_dump = sp[1][:-1] | 
|  | else: | 
|  | continue | 
|  | else: | 
|  | continue | 
|  | extracted_coverage_info.update({file_name: hex_dump}) | 
|  | if not capture_data: | 
|  | capture_complete = True | 
|  | return {'complete': capture_complete, 'data': extracted_coverage_info} | 
|  |  | 
|  | @staticmethod | 
|  | def create_gcda_files(extracted_coverage_info): | 
|  | logger.debug("Generating gcda files") | 
|  | for filename, hexdump_val in extracted_coverage_info.items(): | 
|  | # if kobject_hash is given for coverage gcovr fails | 
|  | # hence skipping it problem only in gcovr v4.1 | 
|  | if "kobject_hash" in filename: | 
|  | filename = (filename[:-4]) + "gcno" | 
|  | try: | 
|  | os.remove(filename) | 
|  | except Exception: | 
|  | pass | 
|  | continue | 
|  |  | 
|  | with open(filename, 'wb') as fp: | 
|  | fp.write(bytes.fromhex(hexdump_val)) | 
|  |  | 
|  | def generate(self, outdir): | 
|  | for filename in glob.glob("%s/**/handler.log" % outdir, recursive=True): | 
|  | gcov_data = self.__class__.retrieve_gcov_data(filename) | 
|  | capture_complete = gcov_data['complete'] | 
|  | extracted_coverage_info = gcov_data['data'] | 
|  | if capture_complete: | 
|  | self.__class__.create_gcda_files(extracted_coverage_info) | 
|  | logger.debug("Gcov data captured: {}".format(filename)) | 
|  | else: | 
|  | logger.error("Gcov data capture incomplete: {}".format(filename)) | 
|  |  | 
|  | with open(os.path.join(outdir, "coverage.log"), "a") as coveragelog: | 
|  | ret = self._generate(outdir, coveragelog) | 
|  | if ret == 0: | 
|  | logger.info("HTML report generated: {}".format( | 
|  | os.path.join(outdir, "coverage", "index.html"))) | 
|  |  | 
|  |  | 
|  | class Lcov(CoverageTool): | 
|  |  | 
|  | def __init__(self): | 
|  | super().__init__() | 
|  | self.ignores = [] | 
|  |  | 
|  | def add_ignore_file(self, pattern): | 
|  | self.ignores.append('*' + pattern + '*') | 
|  |  | 
|  | def add_ignore_directory(self, pattern): | 
|  | self.ignores.append('*/' + pattern + '/*') | 
|  |  | 
|  | def _generate(self, outdir, coveragelog): | 
|  | coveragefile = os.path.join(outdir, "coverage.info") | 
|  | ztestfile = os.path.join(outdir, "ztest.info") | 
|  | cmd = ["lcov", "--gcov-tool", self.gcov_tool, | 
|  | "--capture", "--directory", outdir, | 
|  | "--rc", "lcov_branch_coverage=1", | 
|  | "--output-file", coveragefile] | 
|  | cmd_str = " ".join(cmd) | 
|  | logger.debug(f"Running {cmd_str}...") | 
|  | subprocess.call(cmd, stdout=coveragelog) | 
|  | # We want to remove tests/* and tests/ztest/test/* but save tests/ztest | 
|  | subprocess.call(["lcov", "--gcov-tool", self.gcov_tool, "--extract", | 
|  | coveragefile, | 
|  | os.path.join(self.base_dir, "tests", "ztest", "*"), | 
|  | "--output-file", ztestfile, | 
|  | "--rc", "lcov_branch_coverage=1"], stdout=coveragelog) | 
|  |  | 
|  | if os.path.exists(ztestfile) and os.path.getsize(ztestfile) > 0: | 
|  | subprocess.call(["lcov", "--gcov-tool", self.gcov_tool, "--remove", | 
|  | ztestfile, | 
|  | os.path.join(self.base_dir, "tests/ztest/test/*"), | 
|  | "--output-file", ztestfile, | 
|  | "--rc", "lcov_branch_coverage=1"], | 
|  | stdout=coveragelog) | 
|  | files = [coveragefile, ztestfile] | 
|  | else: | 
|  | files = [coveragefile] | 
|  |  | 
|  | for i in self.ignores: | 
|  | subprocess.call( | 
|  | ["lcov", "--gcov-tool", self.gcov_tool, "--remove", | 
|  | coveragefile, i, "--output-file", | 
|  | coveragefile, "--rc", "lcov_branch_coverage=1"], | 
|  | stdout=coveragelog) | 
|  |  | 
|  | # The --ignore-errors source option is added to avoid it exiting due to | 
|  | # samples/application_development/external_lib/ | 
|  | return subprocess.call(["genhtml", "--legend", "--branch-coverage", | 
|  | "--ignore-errors", "source", | 
|  | "-output-directory", | 
|  | os.path.join(outdir, "coverage")] + files, | 
|  | stdout=coveragelog) | 
|  |  | 
|  |  | 
|  | class Gcovr(CoverageTool): | 
|  |  | 
|  | def __init__(self): | 
|  | super().__init__() | 
|  | self.ignores = [] | 
|  |  | 
|  | def add_ignore_file(self, pattern): | 
|  | self.ignores.append('.*' + pattern + '.*') | 
|  |  | 
|  | def add_ignore_directory(self, pattern): | 
|  | self.ignores.append(".*/" + pattern + '/.*') | 
|  |  | 
|  | @staticmethod | 
|  | def _interleave_list(prefix, list): | 
|  | tuple_list = [(prefix, item) for item in list] | 
|  | return [item for sublist in tuple_list for item in sublist] | 
|  |  | 
|  | def _generate(self, outdir, coveragelog): | 
|  | coveragefile = os.path.join(outdir, "coverage.json") | 
|  | ztestfile = os.path.join(outdir, "ztest.json") | 
|  |  | 
|  | excludes = Gcovr._interleave_list("-e", self.ignores) | 
|  |  | 
|  | # We want to remove tests/* and tests/ztest/test/* but save tests/ztest | 
|  | cmd = ["gcovr", "-r", self.base_dir, "--gcov-executable", | 
|  | self.gcov_tool, "-e", "tests/*"] + excludes + ["--json", "-o", | 
|  | coveragefile, outdir] | 
|  | cmd_str = " ".join(cmd) | 
|  | logger.debug(f"Running {cmd_str}...") | 
|  | subprocess.call(cmd, stdout=coveragelog) | 
|  |  | 
|  | subprocess.call(["gcovr", "-r", self.base_dir, "--gcov-executable", | 
|  | self.gcov_tool, "-f", "tests/ztest", "-e", | 
|  | "tests/ztest/test/*", "--json", "-o", ztestfile, | 
|  | outdir], stdout=coveragelog) | 
|  |  | 
|  | if os.path.exists(ztestfile) and os.path.getsize(ztestfile) > 0: | 
|  | files = [coveragefile, ztestfile] | 
|  | else: | 
|  | files = [coveragefile] | 
|  |  | 
|  | subdir = os.path.join(outdir, "coverage") | 
|  | os.makedirs(subdir, exist_ok=True) | 
|  |  | 
|  | tracefiles = self._interleave_list("--add-tracefile", files) | 
|  |  | 
|  | return subprocess.call(["gcovr", "-r", self.base_dir, "--html", | 
|  | "--html-details"] + tracefiles + | 
|  | ["-o", os.path.join(subdir, "index.html")], | 
|  | stdout=coveragelog) | 
|  |  | 
|  | class DUT(object): | 
|  | def __init__(self, | 
|  | id=None, | 
|  | serial=None, | 
|  | platform=None, | 
|  | product=None, | 
|  | serial_pty=None, | 
|  | connected=False, | 
|  | pre_script=None, | 
|  | post_script=None, | 
|  | post_flash_script=None, | 
|  | runner=None): | 
|  |  | 
|  | self.serial = serial | 
|  | self.platform = platform | 
|  | self.serial_pty = serial_pty | 
|  | self._counter = Value("i", 0) | 
|  | self._available = Value("i", 1) | 
|  | self.connected = connected | 
|  | self.pre_script = pre_script | 
|  | self.id = id | 
|  | self.product = product | 
|  | self.runner = runner | 
|  | self.fixtures = [] | 
|  | self.post_flash_script = post_flash_script | 
|  | self.post_script = post_script | 
|  | self.pre_script = pre_script | 
|  | self.probe_id = None | 
|  | self.notes = None | 
|  | self.lock = Lock() | 
|  | self.match = False | 
|  |  | 
|  |  | 
|  | @property | 
|  | def available(self): | 
|  | with self._available.get_lock(): | 
|  | return self._available.value | 
|  |  | 
|  | @available.setter | 
|  | def available(self, value): | 
|  | with self._available.get_lock(): | 
|  | self._available.value = value | 
|  |  | 
|  | @property | 
|  | def counter(self): | 
|  | with self._counter.get_lock(): | 
|  | return self._counter.value | 
|  |  | 
|  | @counter.setter | 
|  | def counter(self, value): | 
|  | with self._counter.get_lock(): | 
|  | self._counter.value = value | 
|  |  | 
|  | def to_dict(self): | 
|  | d = {} | 
|  | exclude = ['_available', '_counter', 'match'] | 
|  | v = vars(self) | 
|  | for k in v.keys(): | 
|  | if k not in exclude and v[k]: | 
|  | d[k] = v[k] | 
|  | return d | 
|  |  | 
|  |  | 
|  | def __repr__(self): | 
|  | return f"<{self.platform} ({self.product}) on {self.serial}>" | 
|  |  | 
|  | class HardwareMap: | 
|  | schema_path = os.path.join(ZEPHYR_BASE, "scripts", "schemas", "twister", "hwmap-schema.yaml") | 
|  |  | 
|  | manufacturer = [ | 
|  | 'ARM', | 
|  | 'SEGGER', | 
|  | 'MBED', | 
|  | 'STMicroelectronics', | 
|  | 'Atmel Corp.', | 
|  | 'Texas Instruments', | 
|  | 'Silicon Labs', | 
|  | 'NXP Semiconductors', | 
|  | 'Microchip Technology Inc.', | 
|  | 'FTDI', | 
|  | 'Digilent' | 
|  | ] | 
|  |  | 
|  | runner_mapping = { | 
|  | 'pyocd': [ | 
|  | 'DAPLink CMSIS-DAP', | 
|  | 'MBED CMSIS-DAP' | 
|  | ], | 
|  | 'jlink': [ | 
|  | 'J-Link', | 
|  | 'J-Link OB' | 
|  | ], | 
|  | 'openocd': [ | 
|  | 'STM32 STLink', '^XDS110.*', 'STLINK-V3' | 
|  | ], | 
|  | 'dediprog': [ | 
|  | 'TTL232R-3V3', | 
|  | 'MCP2200 USB Serial Port Emulator' | 
|  | ] | 
|  | } | 
|  |  | 
|  | def __init__(self): | 
|  | self.detected = [] | 
|  | self.duts = [] | 
|  |  | 
|  | def add_device(self, serial, platform, pre_script, is_pty): | 
|  | device = DUT(platform=platform, connected=True, pre_script=pre_script) | 
|  |  | 
|  | if is_pty: | 
|  | device.serial_pty = serial | 
|  | else: | 
|  | device.serial = serial | 
|  |  | 
|  | self.duts.append(device) | 
|  |  | 
|  | def load(self, map_file): | 
|  | hwm_schema = scl.yaml_load(self.schema_path) | 
|  | duts = scl.yaml_load_verify(map_file, hwm_schema) | 
|  | for dut in duts: | 
|  | pre_script = dut.get('pre_script') | 
|  | post_script = dut.get('post_script') | 
|  | post_flash_script = dut.get('post_flash_script') | 
|  | platform  = dut.get('platform') | 
|  | id = dut.get('id') | 
|  | runner = dut.get('runner') | 
|  | serial = dut.get('serial') | 
|  | product = dut.get('product') | 
|  | fixtures = dut.get('fixtures', []) | 
|  | new_dut = DUT(platform=platform, | 
|  | product=product, | 
|  | runner=runner, | 
|  | id=id, | 
|  | serial=serial, | 
|  | connected=serial is not None, | 
|  | pre_script=pre_script, | 
|  | post_script=post_script, | 
|  | post_flash_script=post_flash_script) | 
|  | new_dut.fixtures = fixtures | 
|  | new_dut.counter = 0 | 
|  | self.duts.append(new_dut) | 
|  |  | 
|  | def scan(self, persistent=False): | 
|  | from serial.tools import list_ports | 
|  |  | 
|  | if persistent and platform.system() == 'Linux': | 
|  | # On Linux, /dev/serial/by-id provides symlinks to | 
|  | # '/dev/ttyACMx' nodes using names which are unique as | 
|  | # long as manufacturers fill out USB metadata nicely. | 
|  | # | 
|  | # This creates a map from '/dev/ttyACMx' device nodes | 
|  | # to '/dev/serial/by-id/usb-...' symlinks. The symlinks | 
|  | # go into the hardware map because they stay the same | 
|  | # even when the user unplugs / replugs the device. | 
|  | # | 
|  | # Some inexpensive USB/serial adapters don't result | 
|  | # in unique names here, though, so use of this feature | 
|  | # requires explicitly setting persistent=True. | 
|  | by_id = Path('/dev/serial/by-id') | 
|  | def readlink(link): | 
|  | return str((by_id / link).resolve()) | 
|  |  | 
|  | persistent_map = {readlink(link): str(link) | 
|  | for link in by_id.iterdir()} | 
|  | else: | 
|  | persistent_map = {} | 
|  |  | 
|  | serial_devices = list_ports.comports() | 
|  | logger.info("Scanning connected hardware...") | 
|  | for d in serial_devices: | 
|  | if d.manufacturer in self.manufacturer: | 
|  |  | 
|  | # TI XDS110 can have multiple serial devices for a single board | 
|  | # assume endpoint 0 is the serial, skip all others | 
|  | if d.manufacturer == 'Texas Instruments' and not d.location.endswith('0'): | 
|  | continue | 
|  | s_dev = DUT(platform="unknown", | 
|  | id=d.serial_number, | 
|  | serial=persistent_map.get(d.device, d.device), | 
|  | product=d.product, | 
|  | runner='unknown', | 
|  | connected=True) | 
|  |  | 
|  | for runner, _ in self.runner_mapping.items(): | 
|  | products = self.runner_mapping.get(runner) | 
|  | if d.product in products: | 
|  | s_dev.runner = runner | 
|  | continue | 
|  | # Try regex matching | 
|  | for p in products: | 
|  | if re.match(p, d.product): | 
|  | s_dev.runner = runner | 
|  |  | 
|  | s_dev.connected = True | 
|  | self.detected.append(s_dev) | 
|  | else: | 
|  | logger.warning("Unsupported device (%s): %s" % (d.manufacturer, d)) | 
|  |  | 
|  | def save(self, hwm_file): | 
|  | # use existing map | 
|  | self.detected.sort(key=lambda x: x.serial or '') | 
|  | if os.path.exists(hwm_file): | 
|  | with open(hwm_file, 'r') as yaml_file: | 
|  | hwm = yaml.load(yaml_file, Loader=SafeLoader) | 
|  | if hwm: | 
|  | hwm.sort(key=lambda x: x['serial'] or '') | 
|  |  | 
|  | # disconnect everything | 
|  | for h in hwm: | 
|  | h['connected'] = False | 
|  | h['serial'] = None | 
|  |  | 
|  | for _detected in self.detected: | 
|  | for h in hwm: | 
|  | if _detected.id == h['id'] and _detected.product == h['product'] and not _detected.match: | 
|  | h['connected'] = True | 
|  | h['serial'] = _detected.serial | 
|  | _detected.match = True | 
|  |  | 
|  | new_duts = list(filter(lambda d: not d.match, self.detected)) | 
|  | new = [] | 
|  | for d in new_duts: | 
|  | new.append(d.to_dict()) | 
|  |  | 
|  | if hwm: | 
|  | hwm = hwm + new | 
|  | else: | 
|  | hwm = new | 
|  |  | 
|  | with open(hwm_file, 'w') as yaml_file: | 
|  | yaml.dump(hwm, yaml_file, Dumper=Dumper, default_flow_style=False) | 
|  |  | 
|  | self.load(hwm_file) | 
|  | logger.info("Registered devices:") | 
|  | self.dump() | 
|  |  | 
|  | else: | 
|  | # create new file | 
|  | dl = [] | 
|  | for _connected in self.detected: | 
|  | platform  = _connected.platform | 
|  | id = _connected.id | 
|  | runner = _connected.runner | 
|  | serial = _connected.serial | 
|  | product = _connected.product | 
|  | d = { | 
|  | 'platform': platform, | 
|  | 'id': id, | 
|  | 'runner': runner, | 
|  | 'serial': serial, | 
|  | 'product': product, | 
|  | 'connected': _connected.connected | 
|  | } | 
|  | dl.append(d) | 
|  | with open(hwm_file, 'w') as yaml_file: | 
|  | yaml.dump(dl, yaml_file, Dumper=Dumper, default_flow_style=False) | 
|  | logger.info("Detected devices:") | 
|  | self.dump(detected=True) | 
|  |  | 
|  | def dump(self, filtered=[], header=[], connected_only=False, detected=False): | 
|  | print("") | 
|  | table = [] | 
|  | if detected: | 
|  | to_show = self.detected | 
|  | else: | 
|  | to_show = self.duts | 
|  |  | 
|  | if not header: | 
|  | header = ["Platform", "ID", "Serial device"] | 
|  | for p in to_show: | 
|  | platform = p.platform | 
|  | connected = p.connected | 
|  | if filtered and platform not in filtered: | 
|  | continue | 
|  |  | 
|  | if not connected_only or connected: | 
|  | table.append([platform, p.id, p.serial]) | 
|  |  | 
|  | print(tabulate(table, headers=header, tablefmt="github")) |