blob: bd8490a11baf12d35a9af528f80e6fbd2585c3f7 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright (c) 2024 Intel Corporation
#
# SPDX-License-Identifier: Apache-2.0
"""
This script converts memory footprint data prepared by `./footprint/scripts/track.py`
into a JSON files compatible with Twister report schema making them ready for upload
to the same ElasticSearch data storage together with other Twister reports
for analysis, visualization, etc.
The memory footprint input data files (rom.json, ram.json) are expected in directories
sturctured as 'ZEPHYR_VERSION/APPLICATION/FEATURE/BOARD' under the input path(s).
The BOARD name itself can be in HWMv2 format as 'BOARD/SOC' or 'BOARD/SOC/VARIANT'
with the corresponding sub-directories.
For example, an input path `./**/*v3.6.0-rc3-*/footprints/**/frdm_k64f/` will be
expanded by bash to all sub-directories with the 'footprints' data `v3.6.0-rc3`
release commits collected for `frdm_k64f` board.
Note: for the above example to work the bash recursive globbing should be active:
`shopt -s globstar`.
The output `twister_footprint.json` files will be placed into the same directories
as the corresponding input files.
In Twister report a test instance has either long or short name, each needs test
suite name from the test configuration yaml file.
This scripts has `--test-name` parameter to customize how to compose test names
from the plan.txt columns including an additional (last) one whth explicit
test suite name ('dot separated' format).
"""
from __future__ import annotations
from datetime import datetime, timezone
import argparse
import os
import sys
import re
import csv
import logging
import json
from git import Repo
from git.exc import BadName
VERSION_COMMIT_RE = re.compile(r".*-g([a-f0-9]{12})$")
PLAN_HEADERS = ['name', 'feature', 'board', 'application', 'options', 'suite_name']
TESTSUITE_FILENAME = { 'tests': 'testcase.yaml', 'samples': 'sample.yaml' }
FOOTPRINT_FILES = { 'ROM': 'rom.json', 'RAM': 'ram.json' }
RESULT_FILENAME = 'twister_footprint.json'
HWMv2_LEVELS = 3
logger = None
LOG_LEVELS = {
'DEBUG': (logging.DEBUG, 3),
'INFO': (logging.INFO, 2),
'WARNING': (logging.WARNING, 1),
'ERROR': (logging.ERROR, 0)
}
def init_logs(logger_name=''):
global logger
log_level = os.environ.get('LOG_LEVEL', 'ERROR')
log_level = LOG_LEVELS[log_level][0] if log_level in LOG_LEVELS else logging.ERROR
console = logging.StreamHandler(sys.stdout)
console.setFormatter(logging.Formatter('%(asctime)s - %(levelname)-8s - %(message)s'))
logger = logging.getLogger(logger_name)
logger.setLevel(log_level)
logger.addHandler(console)
def set_verbose(verbose: int):
levels = { lvl[1]: lvl[0] for lvl in LOG_LEVELS.values() }
if verbose > len(levels):
verbose = len(levels)
if verbose <= 0:
verbose = 0
logger.setLevel(levels[verbose])
def parse_args():
parser = argparse.ArgumentParser(allow_abbrev=False,
formatter_class=argparse.RawDescriptionHelpFormatter,
description=__doc__)
parser.add_argument('input_paths', metavar='INPUT_PATHS', nargs='+',
help="Directories with the memory footprint data to convert. "
"Each directory must have 'ZEPHYR_VERSION/APPLICATION/FEATURE/BOARD' path structure.")
parser.add_argument('-p', '--plan', metavar='PLAN_FILE_CSV', required=True,
help="An execution plan (CSV file) with details of what footprint applications "
"and platforms were chosen to generate the input data. "
"It is also applied to filter input directories and check their names.")
parser.add_argument('-o', '--output-fname', metavar='OUTPUT_FNAME', required=False,
default=RESULT_FILENAME,
help="Destination JSON file name to create at each of INPUT_PATHS. "
"Default: '%(default)s'")
parser.add_argument('-z', '--zephyr_base', metavar='ZEPHYR_BASE', required=False,
default = os.environ.get('ZEPHYR_BASE'),
help="Zephyr code base path to use instead of the current ZEPHYR_BASE environment variable. "
"The script needs Zephyr repository there to read SHA and commit time of builds. "
"Current default: '%(default)s'")
parser.add_argument("--test-name",
choices=['application/suite_name', 'suite_name', 'application', 'name.feature'],
default='name.feature',
help="How to compose Twister test instance names using plan.txt columns. "
"Default: '%(default)s'" )
parser.add_argument("--no-testsuite-check",
dest='testsuite_check', action="store_false",
help="Don't check for applications' testsuite configs in ZEPHYR_BASE.")
parser.add_argument('-v', '--verbose', required=False, action='count', default=0,
help="Increase the logging level for each occurrence. Default level: ERROR")
return parser.parse_args()
def read_plan(fname: str) -> list[dict]:
plan = []
with open(fname) as plan_file:
plan_rows = csv.reader(plan_file)
plan_vals = [ dict(zip(PLAN_HEADERS, row)) for row in plan_rows ]
plan = { f"{p['name']}/{p['feature']}/{p['board']}" : p for p in plan_vals }
return plan
def get_id_from_path(plan, in_path, max_levels=HWMv2_LEVELS):
data_id = {}
(in_path, data_id['board']) = os.path.split(in_path)
if not data_id['board']:
# trailing '/'
(in_path, data_id['board']) = os.path.split(in_path)
for _ in range(max_levels):
(in_path, data_id['feature']) = os.path.split(in_path)
(c_head, data_id['app']) = os.path.split(in_path)
(c_head, data_id['version']) = os.path.split(c_head)
if not all(data_id.values()):
# incorrect plan id
return None
if f"{data_id['app']}/{data_id['feature']}/{data_id['board']}" in plan:
return data_id
else:
# try with HWMv2 board name one more level deep
data_id['board'] = f"{data_id['feature']}/{data_id['board']}"
# not found
return {}
def main():
errors = 0
converted = 0
skipped = 0
filtered = 0
run_date = datetime.now(timezone.utc).isoformat(timespec='seconds')
init_logs()
args = parse_args()
set_verbose(args.verbose)
if not args.zephyr_base:
logging.error("ZEPHYR_BASE is not defined.")
sys.exit(1)
zephyr_base = os.path.abspath(args.zephyr_base)
zephyr_base_repo = Repo(zephyr_base)
logging.info(f"scanning {len(args.input_paths)} directories ...")
logging.info(f"use plan '{args.plan}'")
plan = read_plan(args.plan)
test_name_sep = '/' if '/' in args.test_name else '.'
test_name_parts = args.test_name.split(test_name_sep)
for report_path in args.input_paths:
logging.info(f"convert {report_path}")
# print(p)
p_head = os.path.normcase(report_path)
p_head = os.path.normpath(p_head)
if not os.path.isdir(p_head):
logging.error(f"not a directory '{p_head}'")
errors += 1
continue
data_id = get_id_from_path(plan, p_head)
if data_id is None:
logging.warning(f"skipped '{report_path}' - not a correct report directory")
skipped += 1
continue
elif not data_id:
logging.info(f"filtered '{report_path}' - not in the plan")
filtered += 1
continue
r_plan = f"{data_id['app']}/{data_id['feature']}/{data_id['board']}"
if 'suite_name' in test_name_parts and 'suite_name' not in plan[r_plan]:
logging.info(f"filtered '{report_path}' - no Twister suite name in the plan.")
filtered += 1
continue
suite_name = test_name_sep.join([plan[r_plan][n] if n in plan[r_plan] else '' for n in test_name_parts])
# Just some sanity checks of the 'application' in the current ZEPHYR_BASE
if args.testsuite_check:
suite_type = plan[r_plan]['application'].split('/')
if len(suite_type) and suite_type[0] in TESTSUITE_FILENAME:
suite_conf_name = TESTSUITE_FILENAME[suite_type[0]]
else:
logging.error(f"unknown app type to get configuration in '{report_path}'")
errors += 1
continue
suite_conf_fname = os.path.join(zephyr_base, plan[r_plan]['application'], suite_conf_name)
if not os.path.isfile(suite_conf_fname):
logging.error(f"test configuration not found for '{report_path}' at '{suite_conf_fname}'")
errors += 1
continue
# Check SHA presence in the current ZEPHYR_BASE
sha_match = VERSION_COMMIT_RE.search(data_id['version'])
version_sha = sha_match.group(1) if sha_match else data_id['version']
try:
git_commit = zephyr_base_repo.commit(version_sha)
except BadName:
logging.error(f"SHA:'{version_sha}' is not found in ZEPHYR_BASE for '{report_path}'")
errors += 1
continue
# Compose twister_footprint.json record - each application (test suite) will have its own
# simplified header with options, SHA, etc.
res = {}
res['environment'] = {
'zephyr_version': data_id['version'],
'commit_date':
git_commit.committed_datetime.astimezone(timezone.utc).isoformat(timespec='seconds'),
'run_date': run_date,
'options': {
'testsuite_root': [ plan[r_plan]['application'] ],
'build_only': True,
'create_rom_ram_report': True,
'footprint_report': 'all',
'platform': [ plan[r_plan]['board'] ]
}
}
test_suite = {
'name': suite_name,
'arch': None,
'platform': plan[r_plan]['board'],
'status': 'passed',
'footprint': {}
}
for k,v in FOOTPRINT_FILES.items():
footprint_fname = os.path.join(report_path, v)
try:
with open(footprint_fname, "rt") as footprint_json:
logger.debug(f"reading {footprint_fname}")
test_suite['footprint'][k] = json.load(footprint_json)
except FileNotFoundError:
logger.warning(f"{report_path} missing {v}")
res['testsuites'] = [test_suite]
report_fname = os.path.join(report_path, args.output_fname)
with open(report_fname, "wt") as json_file:
logger.debug(f"writing {report_fname}")
json.dump(res, json_file, indent=4, separators=(',',':'))
converted += 1
logging.info(f'found={len(args.input_paths)}, converted={converted}, '
f'skipped={skipped}, filtered={filtered}, errors={errors}')
sys.exit(errors != 0)
if __name__ == '__main__':
main()