| # Copyright 2022 The Pigweed Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| # use this file except in compliance with the License. You may obtain a copy of |
| # the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations under |
| # the License. |
| """ |
| Software update related operations. |
| |
| Learn more at: pigweed.dev/pw_software_update |
| |
| """ |
| |
| import argparse |
| import os |
| import sys |
| from pathlib import Path |
| |
| from pw_software_update import (dev_sign, keys, metadata, root_metadata, |
| update_bundle) |
| from pw_software_update.tuf_pb2 import (RootMetadata, SignedRootMetadata, |
| TargetsMetadata) |
| from pw_software_update.update_bundle_pb2 import UpdateBundle |
| |
| |
| def inspect_bundle_handler(arg) -> None: |
| """Prints bundle contents.""" |
| |
| try: |
| bundle = UpdateBundle.FromString(arg.pathname.read_bytes()) |
| signed_targets_metadata = bundle.targets_metadata['targets'] |
| targets_metadata = TargetsMetadata().FromString( |
| signed_targets_metadata.serialized_targets_metadata) |
| print('Targets Metadata:') |
| print('=================') |
| print(targets_metadata) |
| |
| print('\nTarget Files:') |
| print('=============') |
| for i, (name, contents) in enumerate(bundle.target_payloads.items()): |
| print(f'{i+1} of {len(bundle.target_payloads)}:') |
| print(f' filename: {name}') |
| print(f' length: {len(contents)}') |
| |
| first_32_bytes = contents[:32] |
| print(f' ascii contents(first 32 bytes): {first_32_bytes!r}') |
| print(f' hex contents(first 32 bytes): {first_32_bytes.hex()}\n') |
| |
| signed_root_metadata = bundle.root_metadata |
| deserialized_root_metadata = RootMetadata.FromString( |
| signed_root_metadata.serialized_root_metadata) |
| print('\nRoot Metadata:') |
| print('==============') |
| print(deserialized_root_metadata) |
| |
| except IOError as error: |
| print(error) |
| |
| |
| def _new_inspect_bundle_parser(subparsers) -> None: |
| """Parser to handle inspect-bundle subcommand""" |
| |
| formatter_class = lambda prog: argparse.HelpFormatter( |
| prog, max_help_position=100, width=200) |
| inspect_bundle_parser = subparsers.add_parser( |
| 'inspect-bundle', |
| description='Outputs contents of bundle', |
| formatter_class=formatter_class, |
| help="") |
| |
| inspect_bundle_parser.set_defaults(func=inspect_bundle_handler) |
| inspect_bundle_parser.add_argument('pathname', |
| type=Path, |
| help='Path to bundle') |
| |
| |
| def sign_bundle_handler(arg) -> None: |
| """Handles signing of a bundle""" |
| |
| try: |
| signed_bundle = dev_sign.sign_update_bundle( |
| UpdateBundle.FromString(arg.bundle.read_bytes()), |
| arg.key.read_bytes()) |
| arg.bundle.write_bytes(signed_bundle.SerializeToString()) |
| |
| except IOError as error: |
| print(error) |
| |
| |
| def _new_sign_bundle_parser(subparsers) -> None: |
| """Parser for sign-bundle subcommand""" |
| |
| formatter_class = lambda prog: argparse.HelpFormatter( |
| prog, max_help_position=100, width=200) |
| sign_bundle_parser = subparsers.add_parser( |
| 'sign-bundle', |
| description='Sign an existing bundle using a development key', |
| formatter_class=formatter_class, |
| help="", |
| ) |
| |
| sign_bundle_parser.set_defaults(func=sign_bundle_handler) |
| required_arguments = sign_bundle_parser.add_argument_group( |
| 'required arguments') |
| required_arguments.add_argument('--bundle', |
| help='Bundle to be signed', |
| metavar='BUNDLE', |
| required=True, |
| type=Path) |
| required_arguments.add_argument('--key', |
| help='Bundle signing key', |
| metavar='KEY', |
| required=True, |
| type=Path) |
| |
| |
| def add_file_to_bundle(bundle: UpdateBundle, file_name: str, |
| file_contents: bytes) -> UpdateBundle: |
| """Adds a target file represented by file_name and file_contents to an |
| existing UpdateBundle -- bundle and returns the updated UpdateBundle object. |
| """ |
| |
| if not file_name in bundle.target_payloads: |
| bundle.target_payloads[file_name] = file_contents |
| else: |
| raise Exception(f'File name {file_name} already exists in bundle') |
| |
| signed_targets_metadata = bundle.targets_metadata['targets'] |
| targets_metadata = TargetsMetadata().FromString( |
| signed_targets_metadata.serialized_targets_metadata) |
| |
| matching_file_names = list( |
| filter(lambda name: name.file_name == file_name, |
| targets_metadata.target_files)) |
| |
| target_file = metadata.gen_target_file(file_name, file_contents) |
| |
| if not matching_file_names: |
| targets_metadata.target_files.append(target_file) |
| else: |
| raise Exception(f'File name {file_name} already exists in bundle') |
| |
| bundle.targets_metadata['targets'].serialized_targets_metadata =\ |
| targets_metadata.SerializeToString() |
| |
| return bundle |
| |
| |
| def add_file_to_bundle_handler(arg) -> None: |
| """Add a new file to an existing bundle. Updates the targets metadata |
| and errors out if the file already exists. |
| """ |
| |
| try: |
| if not arg.new_name: |
| file_name = os.path.splitext(os.path.basename(arg.file))[0] |
| else: |
| file_name = arg.new_name |
| |
| bundle = UpdateBundle().FromString(arg.bundle.read_bytes()) |
| updated_bundle = add_file_to_bundle( |
| bundle=bundle, |
| file_name=file_name, |
| file_contents=arg.file.read_bytes()) |
| |
| arg.bundle.write_bytes(updated_bundle.SerializeToString()) |
| |
| except (IOError) as error: |
| print(error) |
| |
| |
| def _new_add_file_to_bundle_parser(subparsers) -> None: |
| """Parser for adding file to bundle subcommand""" |
| |
| formatter_class = lambda prog: argparse.HelpFormatter( |
| prog, max_help_position=100, width=200) |
| add_file_to_bundle_parser = subparsers.add_parser( |
| 'add-file-to-bundle', |
| description='Add a file to an existing bundle', |
| formatter_class=formatter_class, |
| help="") |
| add_file_to_bundle_parser.set_defaults(func=add_file_to_bundle_handler) |
| required_arguments = add_file_to_bundle_parser.add_argument_group( |
| 'required arguments') |
| required_arguments.add_argument('--bundle', |
| help='Path to an existing bundle', |
| metavar='BUNDLE', |
| required=True, |
| type=Path) |
| required_arguments.add_argument('--file', |
| help='Path to a target file', |
| metavar='FILE_PATH', |
| required=True, |
| type=Path) |
| required_arguments.add_argument('--new-name', |
| help='Optional new name for target', |
| metavar='NEW_NAME', |
| required=False, |
| type=str) |
| |
| |
| def add_root_metadata_to_bundle_handler(arg) -> None: |
| """Handles appending of root metadata to a bundle""" |
| |
| try: |
| bundle = UpdateBundle().FromString(arg.bundle.read_bytes()) |
| bundle.root_metadata.CopyFrom(SignedRootMetadata().FromString( |
| arg.append_root_metadata.read_bytes())) |
| arg.bundle.write_bytes(bundle.SerializeToString()) |
| |
| except IOError as error: |
| print(error) |
| |
| |
| def _new_add_root_metadata_to_bundle_parser(subparsers) -> None: |
| """Parser for subcommand adding root metadata to bundle""" |
| |
| formatter_class = lambda prog: argparse.HelpFormatter( |
| prog, max_help_position=100, width=200) |
| add_root_metadata_to_bundle_parser = subparsers.add_parser( |
| 'add-root-metadata-to-bundle', |
| description='Add root metadata to a bundle', |
| formatter_class=formatter_class, |
| help="") |
| add_root_metadata_to_bundle_parser.set_defaults( |
| func=add_root_metadata_to_bundle_handler) |
| required_arguments = add_root_metadata_to_bundle_parser.add_argument_group( |
| 'required arguments') |
| required_arguments.add_argument('--append-root-metadata', |
| help='Path to root metadata', |
| metavar='ROOT_METADATA', |
| required=True, |
| type=Path) |
| required_arguments.add_argument('--bundle', |
| help='Path to bundle', |
| metavar='BUNDLE', |
| required=True, |
| type=Path) |
| |
| |
| def create_empty_bundle_handler(arg) -> None: |
| """Handles the creation of an empty bundle and writes it to disc.""" |
| |
| try: |
| bundle = update_bundle.gen_empty_update_bundle( |
| arg.target_metadata_version) |
| arg.pathname.write_bytes(bundle.SerializeToString()) |
| |
| except IOError as error: |
| print(error) |
| |
| |
| def _new_create_empty_bundle_parser(subparsers) -> None: |
| """Parser for creation of an empty bundle.""" |
| |
| formatter_class = lambda prog: argparse.HelpFormatter( |
| prog, max_help_position=100, width=200) |
| create_empty_bundle_parser = subparsers.add_parser( |
| 'create-empty-bundle', |
| description='Creation of an empty bundle', |
| formatter_class=formatter_class, |
| help="") |
| create_empty_bundle_parser.set_defaults(func=create_empty_bundle_handler) |
| create_empty_bundle_parser.add_argument( |
| 'pathname', type=Path, help='Path to newly created empty bundle') |
| create_empty_bundle_parser.add_argument( |
| '--target-metadata-version', |
| help='Version number for targets metadata; Defaults to 1', |
| metavar='VERSION', |
| type=int, |
| default=1, |
| required=False) |
| |
| |
| def inspect_root_metadata_handler(arg) -> None: |
| """Prints root metadata contents as defined by "RootMetadata" message |
| structure in tuf.proto as well as the number of identified signatures. |
| """ |
| |
| try: |
| signed_root_metadata = SignedRootMetadata.FromString( |
| arg.pathname.read_bytes()) |
| |
| deserialized_root_metadata = RootMetadata.FromString( |
| signed_root_metadata.serialized_root_metadata) |
| print(deserialized_root_metadata) |
| |
| print('Number of signatures found:', |
| len(signed_root_metadata.signatures)) |
| |
| except IOError as error: |
| print(error) |
| |
| |
| def _new_inspect_root_metadata_parser(subparsers) -> None: |
| """Parser to handle inspect-root-metadata subcommand""" |
| |
| formatter_class = lambda prog: argparse.HelpFormatter( |
| prog, max_help_position=100, width=200) |
| inspect_root_metadata_parser = subparsers.add_parser( |
| 'inspect-root-metadata', |
| description='Outputs contents of root metadata', |
| formatter_class=formatter_class, |
| help="") |
| |
| inspect_root_metadata_parser.set_defaults( |
| func=inspect_root_metadata_handler) |
| inspect_root_metadata_parser.add_argument('pathname', |
| type=Path, |
| help='Path to root metadata') |
| |
| |
| def sign_root_metadata_handler(arg) -> None: |
| """Handler for signing of root metadata""" |
| |
| try: |
| signed_root_metadata = dev_sign.sign_root_metadata( |
| SignedRootMetadata.FromString(arg.root_metadata.read_bytes()), |
| arg.root_key.read_bytes()) |
| arg.root_metadata.write_bytes(signed_root_metadata.SerializeToString()) |
| |
| except IOError as error: |
| print(error) |
| |
| |
| def _new_sign_root_metadata_parser(subparsers) -> None: |
| """Parser to handle sign-root-metadata subcommand""" |
| |
| formatter_class = lambda prog: argparse.HelpFormatter( |
| prog, max_help_position=100, width=200) |
| sign_root_metadata_parser = subparsers.add_parser( |
| 'sign-root-metadata', |
| description='Signing of root metadata', |
| formatter_class=formatter_class, |
| help="", |
| ) |
| |
| sign_root_metadata_parser.set_defaults(func=sign_root_metadata_handler) |
| required_arguments = sign_root_metadata_parser.add_argument_group( |
| 'required arguments') |
| required_arguments.add_argument('--root-metadata', |
| help='Root metadata to be signed', |
| metavar='ROOT_METADATA', |
| required=True, |
| type=Path) |
| required_arguments.add_argument('--root-key', |
| help='Root signing key', |
| metavar='ROOT_KEY', |
| required=True, |
| type=Path) |
| |
| |
| def create_root_metadata_handler(arg) -> None: |
| """Handler function for creation of root metadata.""" |
| |
| try: |
| root_metadata.main(arg.out, arg.append_root_key, |
| arg.append_targets_key, arg.version) |
| |
| # TODO(eashansingh): Print message that allows user |
| # to visualize root metadata with |
| # `pw update inspect-root-metadata` command |
| |
| except IOError as error: |
| print(error) |
| |
| |
| def _new_create_root_metadata_parser(subparsers) -> None: |
| """Parser to handle create-root-metadata subcommand.""" |
| |
| formatter_class = lambda prog: argparse.HelpFormatter( |
| prog, max_help_position=100, width=200) |
| create_root_metadata_parser = subparsers.add_parser( |
| 'create-root-metadata', |
| description='Creation of root metadata', |
| formatter_class=formatter_class, |
| help='', |
| ) |
| create_root_metadata_parser.set_defaults(func=create_root_metadata_handler) |
| create_root_metadata_parser.add_argument( |
| '--version', |
| help='Canonical version number for rollback checks; Defaults to 1', |
| type=int, |
| default=1, |
| required=False) |
| |
| required_arguments = create_root_metadata_parser.add_argument_group( |
| 'required arguments') |
| required_arguments.add_argument('--append-root-key', |
| help='Path to root key', |
| metavar='ROOT_KEY', |
| required=True, |
| action='append', |
| type=Path) |
| required_arguments.add_argument('--append-targets-key', |
| help='Path to targets key', |
| metavar='TARGETS_KEY', |
| required=True, |
| action='append', |
| type=Path) |
| required_arguments.add_argument('-o', |
| '--out', |
| help='Path to output file', |
| required=True, |
| type=Path) |
| |
| |
| def generate_key_handler(arg) -> None: |
| """ Handler function for key generation""" |
| |
| try: |
| keys.gen_ecdsa_keypair(arg.pathname) |
| print('Private key: ' + str(arg.pathname)) |
| print('Public key: ' + str(arg.pathname) + '.pub') |
| |
| except IOError as error: |
| print(error) |
| |
| |
| def _new_generate_key_parser(subparsers) -> None: |
| """Parser to handle key generation subcommand.""" |
| |
| generate_key_parser = subparsers.add_parser( |
| 'generate-key', |
| description= |
| 'Generates an ecdsa-sha2-nistp256 signing key pair (private + public)', |
| help='') |
| generate_key_parser.set_defaults(func=generate_key_handler) |
| generate_key_parser.add_argument('pathname', |
| type=Path, |
| help='Path to generated key pair') |
| |
| |
| def _parse_args() -> argparse.Namespace: |
| parser_root = argparse.ArgumentParser( |
| description='Software update related operations.', |
| epilog='Learn more at: pigweed.dev/pw_software_update') |
| parser_root.set_defaults( |
| func=lambda *_args, **_kwargs: parser_root.print_help()) |
| |
| subparsers = parser_root.add_subparsers() |
| |
| # Key generation related parsers |
| _new_generate_key_parser(subparsers) |
| |
| # Root metadata related parsers |
| _new_create_root_metadata_parser(subparsers) |
| _new_sign_root_metadata_parser(subparsers) |
| _new_inspect_root_metadata_parser(subparsers) |
| |
| # Bundle related parsers |
| _new_create_empty_bundle_parser(subparsers) |
| _new_add_root_metadata_to_bundle_parser(subparsers) |
| _new_add_file_to_bundle_parser(subparsers) |
| _new_sign_bundle_parser(subparsers) |
| _new_inspect_bundle_parser(subparsers) |
| |
| return parser_root.parse_args() |
| |
| |
| def _dispatch_command(args) -> None: |
| args.func(args) |
| |
| |
| def main() -> int: |
| """Software update command-line interface(WIP).""" |
| _dispatch_command(_parse_args()) |
| return 0 |
| |
| |
| if __name__ == '__main__': |
| sys.exit(main()) |