| #!/usr/bin/env python3 |
| # |
| # Copyright (c) 2024 Intel Corporation |
| # |
| # SPDX-License-Identifier: Apache-2.0 |
| |
| """ |
| This script converts memory footprint data prepared by `./footprint/scripts/track.py` |
| into a JSON files compatible with Twister report schema making them ready for upload |
| to the same ElasticSearch data storage together with other Twister reports |
| for analysis, visualization, etc. |
| |
| The memory footprint input data files (rom.json, ram.json) are expected in directories |
| sturctured as 'ZEPHYR_VERSION/APPLICATION/FEATURE/BOARD' under the input path(s). |
| The BOARD name itself can be in HWMv2 format as 'BOARD/SOC' or 'BOARD/SOC/VARIANT' |
| with the corresponding sub-directories. |
| |
| For example, an input path `./**/*v3.6.0-rc3-*/footprints/**/frdm_k64f/` will be |
| expanded by bash to all sub-directories with the 'footprints' data `v3.6.0-rc3` |
| release commits collected for `frdm_k64f` board. |
| Note: for the above example to work the bash recursive globbing should be active: |
| `shopt -s globstar`. |
| |
| The output `twister_footprint.json` files will be placed into the same directories |
| as the corresponding input files. |
| |
| In Twister report a test instance has either long or short name, each needs test |
| suite name from the test configuration yaml file. |
| This scripts has `--test-name` parameter to customize how to compose test names |
| from the plan.txt columns including an additional (last) one whth explicit |
| test suite name ('dot separated' format). |
| """ |
| |
| from __future__ import annotations |
| |
| from datetime import datetime, timezone |
| import argparse |
| import os |
| import sys |
| import re |
| import csv |
| import logging |
| import json |
| from git import Repo |
| from git.exc import BadName |
| |
| |
| VERSION_COMMIT_RE = re.compile(r".*-g([a-f0-9]{12})$") |
| PLAN_HEADERS = ['name', 'feature', 'board', 'application', 'options', 'suite_name'] |
| TESTSUITE_FILENAME = { 'tests': 'testcase.yaml', 'samples': 'sample.yaml' } |
| FOOTPRINT_FILES = { 'ROM': 'rom.json', 'RAM': 'ram.json' } |
| RESULT_FILENAME = 'twister_footprint.json' |
| HWMv2_LEVELS = 3 |
| |
| logger = None |
| LOG_LEVELS = { |
| 'DEBUG': (logging.DEBUG, 3), |
| 'INFO': (logging.INFO, 2), |
| 'WARNING': (logging.WARNING, 1), |
| 'ERROR': (logging.ERROR, 0) |
| } |
| |
| |
| def init_logs(logger_name=''): |
| global logger |
| |
| log_level = os.environ.get('LOG_LEVEL', 'ERROR') |
| log_level = LOG_LEVELS[log_level][0] if log_level in LOG_LEVELS else logging.ERROR |
| |
| console = logging.StreamHandler(sys.stdout) |
| console.setFormatter(logging.Formatter('%(asctime)s - %(levelname)-8s - %(message)s')) |
| |
| logger = logging.getLogger(logger_name) |
| logger.setLevel(log_level) |
| logger.addHandler(console) |
| |
| def set_verbose(verbose: int): |
| levels = { lvl[1]: lvl[0] for lvl in LOG_LEVELS.values() } |
| if verbose > len(levels): |
| verbose = len(levels) |
| if verbose <= 0: |
| verbose = 0 |
| logger.setLevel(levels[verbose]) |
| |
| |
| def parse_args(): |
| parser = argparse.ArgumentParser(allow_abbrev=False, |
| formatter_class=argparse.RawDescriptionHelpFormatter, |
| description=__doc__) |
| |
| parser.add_argument('input_paths', metavar='INPUT_PATHS', nargs='+', |
| help="Directories with the memory footprint data to convert. " |
| "Each directory must have 'ZEPHYR_VERSION/APPLICATION/FEATURE/BOARD' path structure.") |
| |
| parser.add_argument('-p', '--plan', metavar='PLAN_FILE_CSV', required=True, |
| help="An execution plan (CSV file) with details of what footprint applications " |
| "and platforms were chosen to generate the input data. " |
| "It is also applied to filter input directories and check their names.") |
| |
| parser.add_argument('-o', '--output-fname', metavar='OUTPUT_FNAME', required=False, |
| default=RESULT_FILENAME, |
| help="Destination JSON file name to create at each of INPUT_PATHS. " |
| "Default: '%(default)s'") |
| |
| parser.add_argument('-z', '--zephyr_base', metavar='ZEPHYR_BASE', required=False, |
| default = os.environ.get('ZEPHYR_BASE'), |
| help="Zephyr code base path to use instead of the current ZEPHYR_BASE environment variable. " |
| "The script needs Zephyr repository there to read SHA and commit time of builds. " |
| "Current default: '%(default)s'") |
| |
| parser.add_argument("--test-name", |
| choices=['application/suite_name', 'suite_name', 'application', 'name.feature'], |
| default='name.feature', |
| help="How to compose Twister test instance names using plan.txt columns. " |
| "Default: '%(default)s'" ) |
| |
| parser.add_argument("--no-testsuite-check", |
| dest='testsuite_check', action="store_false", |
| help="Don't check for applications' testsuite configs in ZEPHYR_BASE.") |
| |
| parser.add_argument('-v', '--verbose', required=False, action='count', default=0, |
| help="Increase the logging level for each occurrence. Default level: ERROR") |
| |
| return parser.parse_args() |
| |
| |
| def read_plan(fname: str) -> list[dict]: |
| plan = [] |
| with open(fname) as plan_file: |
| plan_rows = csv.reader(plan_file) |
| plan_vals = [ dict(zip(PLAN_HEADERS, row)) for row in plan_rows ] |
| plan = { f"{p['name']}/{p['feature']}/{p['board']}" : p for p in plan_vals } |
| return plan |
| |
| |
| def get_id_from_path(plan, in_path, max_levels=HWMv2_LEVELS): |
| data_id = {} |
| (in_path, data_id['board']) = os.path.split(in_path) |
| if not data_id['board']: |
| # trailing '/' |
| (in_path, data_id['board']) = os.path.split(in_path) |
| |
| for _ in range(max_levels): |
| (in_path, data_id['feature']) = os.path.split(in_path) |
| (c_head, data_id['app']) = os.path.split(in_path) |
| (c_head, data_id['version']) = os.path.split(c_head) |
| if not all(data_id.values()): |
| # incorrect plan id |
| return None |
| if f"{data_id['app']}/{data_id['feature']}/{data_id['board']}" in plan: |
| return data_id |
| else: |
| # try with HWMv2 board name one more level deep |
| data_id['board'] = f"{data_id['feature']}/{data_id['board']}" |
| |
| # not found |
| return {} |
| |
| |
| def main(): |
| errors = 0 |
| converted = 0 |
| skipped = 0 |
| filtered = 0 |
| |
| run_date = datetime.now(timezone.utc).isoformat(timespec='seconds') |
| |
| init_logs() |
| |
| args = parse_args() |
| |
| set_verbose(args.verbose) |
| |
| if not args.zephyr_base: |
| logging.error("ZEPHYR_BASE is not defined.") |
| sys.exit(1) |
| |
| zephyr_base = os.path.abspath(args.zephyr_base) |
| zephyr_base_repo = Repo(zephyr_base) |
| |
| logging.info(f"scanning {len(args.input_paths)} directories ...") |
| |
| logging.info(f"use plan '{args.plan}'") |
| plan = read_plan(args.plan) |
| |
| test_name_sep = '/' if '/' in args.test_name else '.' |
| test_name_parts = args.test_name.split(test_name_sep) |
| |
| for report_path in args.input_paths: |
| logging.info(f"convert {report_path}") |
| # print(p) |
| p_head = os.path.normcase(report_path) |
| p_head = os.path.normpath(p_head) |
| if not os.path.isdir(p_head): |
| logging.error(f"not a directory '{p_head}'") |
| errors += 1 |
| continue |
| |
| data_id = get_id_from_path(plan, p_head) |
| if data_id is None: |
| logging.warning(f"skipped '{report_path}' - not a correct report directory") |
| skipped += 1 |
| continue |
| elif not data_id: |
| logging.info(f"filtered '{report_path}' - not in the plan") |
| filtered += 1 |
| continue |
| |
| r_plan = f"{data_id['app']}/{data_id['feature']}/{data_id['board']}" |
| |
| if 'suite_name' in test_name_parts and 'suite_name' not in plan[r_plan]: |
| logging.info(f"filtered '{report_path}' - no Twister suite name in the plan.") |
| filtered += 1 |
| continue |
| |
| suite_name = test_name_sep.join([plan[r_plan][n] if n in plan[r_plan] else '' for n in test_name_parts]) |
| |
| # Just some sanity checks of the 'application' in the current ZEPHYR_BASE |
| if args.testsuite_check: |
| suite_type = plan[r_plan]['application'].split('/') |
| if len(suite_type) and suite_type[0] in TESTSUITE_FILENAME: |
| suite_conf_name = TESTSUITE_FILENAME[suite_type[0]] |
| else: |
| logging.error(f"unknown app type to get configuration in '{report_path}'") |
| errors += 1 |
| continue |
| |
| suite_conf_fname = os.path.join(zephyr_base, plan[r_plan]['application'], suite_conf_name) |
| if not os.path.isfile(suite_conf_fname): |
| logging.error(f"test configuration not found for '{report_path}' at '{suite_conf_fname}'") |
| errors += 1 |
| continue |
| |
| |
| # Check SHA presence in the current ZEPHYR_BASE |
| sha_match = VERSION_COMMIT_RE.search(data_id['version']) |
| version_sha = sha_match.group(1) if sha_match else data_id['version'] |
| try: |
| git_commit = zephyr_base_repo.commit(version_sha) |
| except BadName: |
| logging.error(f"SHA:'{version_sha}' is not found in ZEPHYR_BASE for '{report_path}'") |
| errors += 1 |
| continue |
| |
| |
| # Compose twister_footprint.json record - each application (test suite) will have its own |
| # simplified header with options, SHA, etc. |
| |
| res = {} |
| |
| res['environment'] = { |
| 'zephyr_version': data_id['version'], |
| 'commit_date': |
| git_commit.committed_datetime.astimezone(timezone.utc).isoformat(timespec='seconds'), |
| 'run_date': run_date, |
| 'options': { |
| 'testsuite_root': [ plan[r_plan]['application'] ], |
| 'build_only': True, |
| 'create_rom_ram_report': True, |
| 'footprint_report': 'all', |
| 'platform': [ plan[r_plan]['board'] ] |
| } |
| } |
| |
| test_suite = { |
| 'name': suite_name, |
| 'arch': None, |
| 'platform': plan[r_plan]['board'], |
| 'status': 'passed', |
| 'footprint': {} |
| } |
| |
| for k,v in FOOTPRINT_FILES.items(): |
| footprint_fname = os.path.join(report_path, v) |
| try: |
| with open(footprint_fname, "rt") as footprint_json: |
| logger.debug(f"reading {footprint_fname}") |
| test_suite['footprint'][k] = json.load(footprint_json) |
| except FileNotFoundError: |
| logger.warning(f"{report_path} missing {v}") |
| |
| res['testsuites'] = [test_suite] |
| |
| report_fname = os.path.join(report_path, args.output_fname) |
| with open(report_fname, "wt") as json_file: |
| logger.debug(f"writing {report_fname}") |
| json.dump(res, json_file, indent=4, separators=(',',':')) |
| |
| converted += 1 |
| |
| logging.info(f'found={len(args.input_paths)}, converted={converted}, ' |
| f'skipped={skipped}, filtered={filtered}, errors={errors}') |
| sys.exit(errors != 0) |
| |
| |
| if __name__ == '__main__': |
| main() |