| # Copyright (c) 2022 Nordic Semiconductor ASA |
| # |
| # SPDX-License-Identifier: Apache-2.0 |
| |
| import argparse |
| import os |
| import re |
| import shutil |
| import sys |
| import textwrap |
| from pathlib import Path |
| from urllib.parse import urlparse |
| |
| from west.commands import WestCommand |
| from zephyr_ext_common import ZEPHYR_BASE |
| |
| sys.path.append(os.fspath(Path(__file__).parent.parent)) |
| import zephyr_module |
| |
| |
| class Blobs(WestCommand): |
| DEFAULT_LIST_FMT = '{module} {status} {path} {type} {abspath}' |
| |
| def __init__(self): |
| super().__init__( |
| 'blobs', |
| # Keep this in sync with the string in west-commands.yml. |
| 'work with binary blobs', |
| 'Work with binary blobs', |
| accepts_unknown_args=False, |
| ) |
| |
| def do_add_parser(self, parser_adder): |
| parser = parser_adder.add_parser( |
| self.name, |
| help=self.help, |
| formatter_class=argparse.RawDescriptionHelpFormatter, |
| description=self.description, |
| epilog=textwrap.dedent(f'''\ |
| FORMAT STRINGS |
| -------------- |
| |
| Blobs are listed using a Python 3 format string. Arguments |
| to the format string are accessed by name. |
| |
| The default format string is: |
| |
| "{self.DEFAULT_LIST_FMT}" |
| |
| The following arguments are available: |
| |
| - module: name of the module that contains this blob |
| - abspath: blob absolute path |
| - status: short status (A: present, M: hash failure, D: not present) |
| - path: blob local path from <module>/zephyr/blobs/ |
| - sha256: blob SHA256 hash in hex |
| - type: type of blob |
| - version: version string |
| - license_path: path to the license file for the blob |
| - license-abspath: absolute path to the license file for the blob |
| - click-through: need license click-through or not |
| - uri: URI to the remote location of the blob |
| - description: blob text description |
| - doc-url: URL to the documentation for this blob |
| '''), |
| ) |
| |
| # Remember to update west-completion.bash if you add or remove |
| # flags |
| parser.add_argument( |
| 'subcmd', nargs=1, choices=['list', 'fetch', 'clean'], help='sub-command to execute' |
| ) |
| |
| parser.add_argument( |
| 'modules', |
| metavar='MODULE', |
| nargs='*', |
| help='''zephyr modules to operate on; |
| all modules will be used if not given''', |
| ) |
| |
| group = parser.add_argument_group('west blob list options') |
| group.add_argument( |
| '-f', |
| '--format', |
| help='''format string to use to list each blob; |
| see FORMAT STRINGS below''', |
| ) |
| |
| group = parser.add_argument_group('west blobs fetch options') |
| group.add_argument( |
| '-l', |
| '--allow-regex', |
| help='''Regex pattern to apply to the blob local path. |
| Only local paths matching this regex will be fetched. |
| Note that local paths are relative to the module directory''', |
| ) |
| group.add_argument( |
| '-a', |
| '--auto-accept', |
| action='store_true', |
| help='''auto accept license if the fetching needs click-through''', |
| ) |
| group.add_argument( |
| '--cache-dirs', |
| help='''Semicolon-separated list of directories to search for cached |
| blobs before downloading. Cache files may use the original |
| filename or be suffixed with `.<sha256>`.''', |
| ) |
| group.add_argument( |
| '--auto-cache', |
| help='''Path to a directory that is automatically populated when a blob |
| is downloaded. Cached blobs are stored using the original |
| filename suffixed with `.<sha256>`.''', |
| ) |
| |
| return parser |
| |
| def get_blobs(self, args): |
| blobs = [] |
| modules = args.modules |
| all_modules = zephyr_module.parse_modules(ZEPHYR_BASE, self.manifest) |
| all_names = [m.meta.get('name', None) for m in all_modules] |
| |
| unknown = set(modules) - set(all_names) |
| |
| if len(unknown): |
| self.die(f'Unknown module(s): {unknown}') |
| |
| for module in all_modules: |
| # Filter by module |
| module_name = module.meta.get('name', None) |
| if len(modules) and module_name not in modules: |
| continue |
| |
| blobs += zephyr_module.process_blobs(module.project, module.meta) |
| |
| return blobs |
| |
| def list(self, args): |
| blobs = self.get_blobs(args) |
| fmt = args.format or self.DEFAULT_LIST_FMT |
| for blob in blobs: |
| self.inf(fmt.format(**blob)) |
| |
| def ensure_folder(self, path): |
| path.parent.mkdir(parents=True, exist_ok=True) |
| |
| def handle_auto_cache(self, blob, auto_cache_dir) -> Path: |
| """ |
| This function guarantees that a given blob exists in the auto-cache. |
| It first checks whether the blob is already present. If so, it |
| returns the path of this cached blob. If the blob is not yet cached, |
| the blob is downloaded into the auto-cache directory and the path of |
| the freshly cached blob is returned. |
| """ |
| cached_blob = self.get_cached_blob(blob, [auto_cache_dir]) |
| if cached_blob: |
| return cached_blob |
| name = Path(blob['path']).name |
| sha256 = blob['sha256'] |
| self.download_blob(blob, auto_cache_dir / f'{name}.{sha256}') |
| cached_blob = self.get_cached_blob(blob, [auto_cache_dir]) |
| assert cached_blob, f'Blob {name} still not cached in auto-cache.' |
| return cached_blob |
| |
| def get_cached_blob(self, blob, cache_dirs: list) -> Path | None: |
| """ |
| Look for a cached blob in the provided cache directories. |
| A blob may be stored using either its original name or suffixed with |
| its SHA256 hash (e.g. "<name>.<sha256>"). |
| Return the first matching path, or None if not found. |
| """ |
| name = Path(blob['path']).name |
| sha256 = blob["sha256"] |
| candidate_names = [ |
| f"{name}.{sha256}", # suffixed version |
| name, # original blob name |
| ] |
| |
| for cache_dir in cache_dirs: |
| if not cache_dir.exists(): |
| continue |
| for name in candidate_names: |
| candidate_path = cache_dir / name |
| if ( |
| zephyr_module.get_blob_status(candidate_path, sha256) |
| == zephyr_module.BLOB_PRESENT |
| ): |
| return candidate_path |
| return None |
| |
| def download_blob(self, blob, path): |
| '''Download a blob from its url to a given path.''' |
| url = blob['url'] |
| scheme = urlparse(url).scheme |
| self.dbg(f'Fetching blob from url {url} with {scheme} to path: {path}') |
| import fetchers |
| |
| fetcher = fetchers.get_fetcher_cls(scheme) |
| self.dbg(f'Found fetcher: {fetcher}') |
| inst = fetcher() |
| self.ensure_folder(path) |
| inst.fetch(url, path) |
| |
| def fetch_blob(self, args, blob): |
| """ |
| Ensures that the specified blob is available at its path. |
| If caching is enabled and the blob exists in the cache, it is copied |
| from there. Otherwise, the blob is downloaded from its URL and placed |
| at the target path. |
| """ |
| path = Path(blob['abspath']) |
| |
| # collect existing cache dirs specified as args, otherwise from west config |
| cache_dirs = args.cache_dirs |
| auto_cache_dir = args.auto_cache |
| if self.has_config: |
| if cache_dirs is None: |
| cache_dirs = self.config.get('blobs.cache-dirs') |
| if auto_cache_dir is None: |
| auto_cache_dir = self.config.get('blobs.auto-cache') |
| |
| # expand user home for each cache directory |
| if auto_cache_dir is not None: |
| auto_cache_dir = Path(auto_cache_dir).expanduser() |
| if cache_dirs is not None: |
| cache_dirs = [Path(p).expanduser() for p in cache_dirs.split(';') if p] |
| |
| # search for cached blob in the cache directories |
| cached_blob = self.get_cached_blob(blob, cache_dirs or []) |
| |
| # If blob is not found in cache directories: Use auto-cache if enabled |
| if not cached_blob and auto_cache_dir: |
| cached_blob = self.handle_auto_cache(blob, auto_cache_dir) |
| |
| # Copy blob if it is cached, otherwise download it |
| if cached_blob: |
| self.dbg(f'Copy cached blob: {cached_blob}') |
| self.ensure_folder(path) |
| shutil.copy(cached_blob, path) |
| else: |
| self.download_blob(blob, path) |
| |
| # Compare the checksum of a file we've just downloaded |
| # to the digest in blob metadata, warn user if they differ. |
| def verify_blob(self, blob) -> bool: |
| self.dbg(f"Verifying blob {blob['module']}: {blob['abspath']}") |
| |
| status = zephyr_module.get_blob_status(blob['abspath'], blob['sha256']) |
| if status == zephyr_module.BLOB_OUTDATED: |
| self.err( |
| textwrap.dedent( |
| f'''\ |
| The checksum of the downloaded file does not match that |
| in the blob metadata: |
| - if it is not certain that the download was successful, |
| try running 'west blobs fetch {blob['module']}' |
| to re-download the file |
| - if the error persists, please consider contacting |
| the maintainers of the module so that they can check |
| the corresponding blob metadata |
| |
| Module: {blob['module']} |
| Blob: {blob['path']} |
| URL: {blob['url']} |
| Info: {blob['description']}''' |
| ) |
| ) |
| return False |
| return True |
| |
| def fetch(self, args): |
| bad_checksum_count = 0 |
| blobs = self.get_blobs(args) |
| for blob in blobs: |
| if blob['status'] == zephyr_module.BLOB_PRESENT: |
| self.dbg(f"Blob {blob['module']}: {blob['abspath']} is up to date") |
| continue |
| |
| # if args.allow_regex is set, use it to filter the blob by path |
| if args.allow_regex and not re.match(args.allow_regex, blob['path']): |
| self.dbg( |
| f"Blob {blob['module']}: {blob['abspath']} does not match regex " |
| f"'{args.allow_regex}', skipping" |
| ) |
| continue |
| self.inf(f"Fetching blob {blob['module']}: {blob['abspath']}") |
| |
| if blob['click-through'] and not args.auto_accept: |
| while True: |
| user_input = input( |
| "For this blob, need to read and accept " |
| "license to continue. Read it?\n" |
| "Please type 'y' or 'n' and press enter to confirm: " |
| ) |
| if user_input.upper() == "Y" or user_input.upper() == "N": |
| break |
| |
| if user_input.upper() != "Y": |
| self.wrn('Skip fetching this blob.') |
| continue |
| |
| with open(blob['license-abspath'], encoding="utf-8") as license_file: |
| license_content = license_file.read() |
| print(license_content) |
| |
| while True: |
| user_input = input( |
| "Accept license to continue?\n" |
| "Please type 'y' or 'n' and press enter to confirm: " |
| ) |
| if user_input.upper() == "Y" or user_input.upper() == "N": |
| break |
| |
| if user_input.upper() != "Y": |
| self.wrn('Skip fetching this blob.') |
| continue |
| |
| self.fetch_blob(args, blob) |
| if not self.verify_blob(blob): |
| bad_checksum_count += 1 |
| |
| if bad_checksum_count: |
| self.err(f"{bad_checksum_count} blobs have bad checksums") |
| sys.exit(os.EX_DATAERR) |
| |
| def clean(self, args): |
| blobs = self.get_blobs(args) |
| for blob in blobs: |
| if blob['status'] == zephyr_module.BLOB_NOT_PRESENT: |
| self.dbg(f"Blob {blob['module']}: {blob['abspath']} not in filesystem") |
| continue |
| self.inf(f"Deleting blob {blob['module']}: {blob['status']} {blob['abspath']}") |
| blob['abspath'].unlink() |
| |
| def do_run(self, args, _): |
| self.dbg(f"subcmd: '{args.subcmd[0]}' modules: {args.modules}") |
| |
| subcmd = getattr(self, args.subcmd[0]) |
| |
| if args.subcmd[0] != 'list' and args.format is not None: |
| self.die('unexpected --format argument; this is a "west blobs list" option') |
| |
| subcmd(args) |