| #!/usr/bin/env python3 |
| # Copyright (c) 2020 Project CHIP Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| """Utitilies to flash or erase a device.""" |
| |
| import argparse |
| import pathlib |
| import errno |
| import locale |
| import os |
| import stat |
| import subprocess |
| import sys |
| import textwrap |
| |
| # Here are the options that can be use to configure a `Flasher` |
| # object (as dictionary keys) and/or passed as command line options. |
| |
| OPTIONS = { |
| # Configuration options define properties used in flashing operations. |
| # (The outer level of an options definition corresponds to option groups |
| # in the command-line help message.) |
| 'configuration': { |
| # Script configuration options. |
| 'verbose': { |
| 'help': 'Report more verbosely', |
| 'default': 0, |
| 'alias': ['-v'], |
| 'argparse': { |
| 'action': 'count', |
| }, |
| # Levels: |
| # 0 - error message |
| # 1 - action to be taken |
| # 2 - results of action, even if successful |
| # 3+ - details |
| }, |
| }, |
| |
| # Action control options specify operations that Flasher.action() or |
| # the function interface flash_command() will perform. |
| 'operations': { |
| # Action control options. |
| 'erase': { |
| 'help': 'Erase device', |
| 'default': False, |
| 'argparse': { |
| 'action': 'store_true' |
| }, |
| }, |
| 'application': { |
| 'help': 'Flash an image', |
| 'default': None, |
| 'argparse': { |
| 'metavar': 'FILE', |
| 'type': pathlib.Path, |
| }, |
| }, |
| 'verify_application': { |
| 'help': 'Verify the image after flashing', |
| 'default': False, |
| 'argparse': { |
| 'action': 'store_true' |
| }, |
| }, |
| # 'reset' is a three-way switch; if None, action() will reset the |
| # device if and only if an application image is flashed. So, we add |
| # an explicit option to set it false. |
| 'reset': { |
| 'help': 'Reset device after flashing', |
| 'default': None, # None = Reset iff application was flashed. |
| 'argparse': { |
| 'action': 'store_true' |
| }, |
| }, |
| 'skip_reset': { |
| 'help': 'Do not reset device after flashing', |
| 'default': None, # None = Reset iff application was flashed. |
| 'argparse': { |
| 'dest': 'reset', |
| 'action': 'store_false' |
| }, |
| } |
| }, |
| |
| # Internal; these properties do not have command line options |
| # (because they don't have an `argparse` key). |
| 'internal': { |
| # Script configuration options. |
| 'platform': { |
| 'help': 'Short name of the current platform', |
| 'default': None, |
| }, |
| 'module': { |
| 'help': 'Invoking Python module, for generating scripts', |
| 'default': None, |
| }, |
| }, |
| } |
| |
| |
| class Flasher: |
| """Manage flashing.""" |
| |
| def __init__(self, **options): |
| # An integer giving the current Flasher status. |
| # 0 if OK, and normally an errno value if positive. |
| self.err = 0 |
| |
| # Namespace of option values. |
| self.option = argparse.Namespace(**options) |
| |
| # Namespace of option metadata. This contains the option specification |
| # information one level down from `define_options()`, i.e. without the |
| # group; the keys are mostly the same as those of `self.option`. |
| # (Exceptions include options with no metadata and only defined when |
| # constructing the Flasher, and options where different command line |
| # options (`info` keys) affect a single attribute (e.g. `reset` and |
| # `skip-reset` have distinct `info` entries but one option). |
| self.info = argparse.Namespace() |
| |
| # `argv[0]` from the most recent call to parse_argv(); that is, |
| # the path used to invoke the script. This is used to find files |
| # relative to the script. |
| self.argv0 = None |
| |
| # Argument parser for `parse_argv()`. Normally defines command-line |
| # options for most of the `self.option` keys. |
| self.parser = argparse.ArgumentParser( |
| description='Flash {} device'.format(self.option.platform or 'a')) |
| |
| # Argument parser groups. |
| self.group = {} |
| |
| # Construct the global options for all Flasher()s. |
| self.define_options(OPTIONS) |
| |
| def define_options(self, options): |
| """Define options, including setting defaults and argument parsing.""" |
| for group, group_options in options.items(): |
| if group not in self.group: |
| self.group[group] = self.parser.add_argument_group(group) |
| for key, info in group_options.items(): |
| setattr(self.info, key, info) |
| if 'argparse' not in info: |
| continue |
| argument = info['argparse'] |
| attribute = argument.get('dest', key) |
| # Set default value. |
| if attribute not in self.option: |
| setattr(self.option, attribute, info['default']) |
| # Add command line argument. |
| names = ['--' + key] |
| if '_' in key: |
| names.append('--' + key.replace('_', '-')) |
| if 'alias' in info: |
| names += info['alias'] |
| self.group[group].add_argument( |
| *names, |
| help=info['help'], |
| default=getattr(self.option, attribute), |
| **argument) |
| return self |
| |
| def status(self): |
| """Return the current error code.""" |
| return self.err |
| |
| def actions(self): |
| """Perform actions on the device according to self.option.""" |
| raise NotImplementedError() |
| |
| def log(self, level, *args): |
| """Optionally log a message to stderr.""" |
| if self.option.verbose >= level: |
| print(*args, file=sys.stderr) |
| |
| def run_tool(self, |
| tool, |
| arguments, |
| options=None, |
| name=None, |
| pass_message=None, |
| fail_message=None, |
| fail_level=0, |
| capture_output=False): |
| """Run an external tool.""" |
| if name is None: |
| name = 'Run ' + tool |
| self.log(1, name) |
| |
| option_map = vars(self.option) |
| if options: |
| option_map.update(options) |
| arguments = self.format_command(arguments, opt=option_map) |
| if not getattr(self.option, tool, None): |
| setattr(self.option, tool, self.locate_tool(tool)) |
| tool_info = getattr(self.info, tool) |
| command_template = tool_info.get('command', ['{' + tool + '}', ()]) |
| command = self.format_command(command_template, arguments, option_map) |
| self.log(3, 'Execute:', *command) |
| |
| try: |
| if capture_output: |
| result = None |
| result = subprocess.run( |
| command, |
| check=True, |
| encoding=locale.getpreferredencoding(), |
| capture_output=True) |
| else: |
| result = self |
| self.error = subprocess.check_call(command) |
| except subprocess.CalledProcessError as exception: |
| self.err = exception.returncode |
| if capture_output: |
| self.log(fail_level, '--- stdout ---') |
| self.log(fail_level, exception.stdout) |
| self.log(fail_level, '--- stderr ---') |
| self.log(fail_level, exception.stderr) |
| self.log(fail_level, '---') |
| except FileNotFoundError as exception: |
| self.err = exception.errno |
| if self.err == errno.ENOENT: |
| # This likely means that the program was not found. |
| # But if it seems OK, rethrow the exception. |
| if self.verify_tool(tool): |
| raise exception |
| |
| if self.err: |
| self.log(fail_level, fail_message or ('FAILED: ' + name)) |
| else: |
| self.log(2, pass_message or (name + ' complete')) |
| return result |
| |
| def locate_tool(self, tool): |
| """Called to find an undefined tool. (Override in platform.)""" |
| return tool |
| |
| def verify_tool(self, tool): |
| """Run a command to verify that an external tool is available. |
| |
| Prints a configurable error and returns False if not. |
| """ |
| tool_info = getattr(self.info, tool) |
| command_template = tool_info.get('verify') |
| if not command_template: |
| return True |
| command = self.format_command(command_template, opt=vars(self.option)) |
| try: |
| self.err = subprocess.call(command) |
| except OSError as ex: |
| self.err = ex.errno |
| if self.err: |
| note = tool_info.get('error', 'Unable to execute {tool}.') |
| note = textwrap.dedent(note).format(tool=tool, **vars(self.option)) |
| # textwrap.fill only handles single paragraphs: |
| note = '\n\n'.join((textwrap.fill(p) for p in note.split('\n\n'))) |
| print(note, file=sys.stderr) |
| return False |
| return True |
| |
| def format_command(self, template, args=None, opt=None): |
| """Construct a tool command line. |
| |
| This provides a few conveniences over a simple list of fixed strings, |
| that in most cases eliminates any need for custom code to build a tool |
| command line. In this description, φ(τ) is the result of formatting a |
| template τ. |
| |
| template ::= list | () | str | dict |
| |
| Typically the caller provides a list, and `format_command()` returns a |
| formatted list. The results of formatting sub-elements get interpolated |
| into the end result. |
| |
| list ::= [τ₀, …, τₙ] |
| ↦ φ(τ₀) + … + φ(τₙ) |
| |
| An empty tuple returns the supplied `args`. Typically this would be |
| used for things like subcommands or file names at the end of a command. |
| |
| () ↦ args or [] |
| |
| Formatting a string uses the Python string formatter with the `opt` |
| map as arguments. Typically used to interpolate an option value into |
| the command line, e.g. ['--flag', '{flag}'] or ['--flag={flag}']. |
| |
| str ::= σ |
| ↦ [σ.format_map(opt)] |
| |
| A dictionary element provides a convenience feature. For any dictionary |
| template, if it contains an optional 'expand' key that tests true, the |
| result is recursively passed to format_command(); otherwise it is taken |
| as is. |
| |
| The simplest case is an option propagated to the tool command line, |
| as a single option if the value is exactly boolean True or as an |
| option-argument pair if otherwise set. |
| |
| optional ::= {'optional': name} |
| ↦ ['--name'] if opt[name] is True |
| ['--name', opt[name]] if opt[name] tests true |
| [] otherwise |
| |
| A dictionary with an 'option' can insert command line arguments based |
| on the value of an option. The 'result' is optional defaults to the |
| option value itself, and 'else' defaults to nothing. |
| |
| option ::= {'option': name, 'result': ρ, 'else': δ} |
| ↦ ρ if opt[name] |
| δ otherwise |
| |
| A dictionary with a 'match' key returns a result comparing the value of |
| an option against a 'test' list of tuples. The 'else' is optional and |
| defaults to nothing. |
| |
| match ::= {'match': name, 'test': [(σᵢ, ρᵢ), …], 'else': ρ} |
| ↦ ρᵢ if opt[name]==σᵢ |
| ρ otherwise |
| """ |
| if isinstance(template, str) or isinstance(template, pathlib.Path): |
| result = [str(template).format_map(opt)] |
| elif isinstance(template, list): |
| result = [] |
| for i in template: |
| result += self.format_command(i, args, opt) |
| elif template == (): |
| result = args or [] |
| elif isinstance(template, dict): |
| if 'optional' in template: |
| name = template['optional'] |
| value = opt.get(name) |
| if value is True: |
| result = ['--' + name] |
| elif value: |
| result = ['--' + name, value] |
| else: |
| result = [] |
| elif 'option' in template: |
| name = template['option'] |
| value = opt.get(name) |
| if value: |
| result = template.get('result', value) |
| else: |
| result = template.get('else') |
| elif 'match' in template: |
| value = template['match'] |
| for compare, result in template['test']: |
| if value == compare: |
| break |
| else: |
| result = template.get('else') |
| if result and template.get('expand'): |
| result = self.format_command(result, args, opt) |
| elif result is None: |
| result = [] |
| elif not isinstance(result, list): |
| result = [result] |
| else: |
| raise ValueError('Unknown: {}'.format(template)) |
| return result |
| |
| def parse_argv(self, argv): |
| """Handle command line options.""" |
| self.argv0 = argv[0] |
| self.parser.parse_args(argv[1:], namespace=self.option) |
| self._postprocess_argv() |
| return self |
| |
| def _postprocess_argv(self): |
| """Called after parse_argv() for platform-specific processing.""" |
| |
| def flash_command(self, argv): |
| """Perform device actions according to the command line.""" |
| return self.parse_argv(argv).actions().status() |
| |
| def _platform_wrapper_args(self, args): |
| """Called from make_wrapper() to optionally manipulate arguments.""" |
| |
| def make_wrapper(self, argv): |
| """Generate script to flash a device. |
| |
| The generated script is a minimal wrapper around `flash_command()`, |
| containing any option values that differ from the class defaults. |
| """ |
| |
| # Note: this modifies the argument parser, so the same Flasher instance |
| # should not be used for both parse_argv() and make_wrapper(). |
| self.parser.description = 'Generate a flashing script.' |
| self.parser.add_argument( |
| '--output', |
| metavar='FILENAME', |
| required=True, |
| help='flashing script name') |
| self.argv0 = argv[0] |
| args = self.parser.parse_args(argv[1:]) |
| |
| # Give platform-specific code a chance to manipulate the arguments |
| # for the wrapper script. |
| self._platform_wrapper_args(args) |
| |
| # Find any option values that differ from the class defaults. |
| # These will be inserted into the wrapper script. |
| defaults = [] |
| for key, value in vars(args).items(): |
| if key in self.option and value != getattr(self.option, key): |
| if isinstance(value, pathlib.Path): |
| defaults.append(' {}: os.path.join(os.path.dirname(sys.argv[0]), {}),'.format( |
| repr(key), repr(str(value)))) |
| else: |
| defaults.append(' {}: {},'.format(repr(key), repr(value))) |
| |
| script = """ |
| import sys |
| import os.path |
| |
| DEFAULTS = {{ |
| {defaults} |
| }} |
| |
| import {module} |
| |
| if __name__ == '__main__': |
| sys.exit({module}.Flasher(**DEFAULTS).flash_command(sys.argv)) |
| """ |
| |
| script = ('#!/usr/bin/env python3' + textwrap.dedent(script).format( |
| module=self.option.module, defaults='\n'.join(defaults))) |
| |
| try: |
| with open(args.output, 'w') as script_file: |
| script_file.write(script) |
| os.chmod(args.output, (stat.S_IXUSR | stat.S_IRUSR | stat.S_IWUSR |
| | stat.S_IXGRP | stat.S_IRGRP |
| | stat.S_IXOTH | stat.S_IROTH)) |
| except OSError as exception: |
| print(exception, sys.stderr) |
| return 1 |
| return 0 |