blob: c72f2776d3ab31810a75784bf833a240bfa2b8d2 [file] [log] [blame]
# Copyright 2022 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""
Software update related operations.
Learn more at: pigweed.dev/pw_software_update
"""
import argparse
import os
import sys
from pathlib import Path
from pw_software_update import (dev_sign, keys, metadata, root_metadata,
update_bundle)
from pw_software_update.tuf_pb2 import (RootMetadata, SignedRootMetadata,
TargetsMetadata)
from pw_software_update.update_bundle_pb2 import UpdateBundle
def inspect_bundle_handler(arg) -> None:
"""Prints bundle contents."""
try:
bundle = UpdateBundle.FromString(arg.pathname.read_bytes())
signed_targets_metadata = bundle.targets_metadata['targets']
targets_metadata = TargetsMetadata().FromString(
signed_targets_metadata.serialized_targets_metadata)
print('Targets Metadata:')
print('=================')
print(targets_metadata)
print('\nTarget Files:')
print('=============')
for i, (name, contents) in enumerate(bundle.target_payloads.items()):
print(f'{i+1} of {len(bundle.target_payloads)}:')
print(f' filename: {name}')
print(f' length: {len(contents)}')
first_32_bytes = contents[:32]
print(f' ascii contents(first 32 bytes): {first_32_bytes!r}')
print(f' hex contents(first 32 bytes): {first_32_bytes.hex()}\n')
signed_root_metadata = bundle.root_metadata
deserialized_root_metadata = RootMetadata.FromString(
signed_root_metadata.serialized_root_metadata)
print('\nRoot Metadata:')
print('==============')
print(deserialized_root_metadata)
except IOError as error:
print(error)
def _new_inspect_bundle_parser(subparsers) -> None:
"""Parser to handle inspect-bundle subcommand"""
formatter_class = lambda prog: argparse.HelpFormatter(
prog, max_help_position=100, width=200)
inspect_bundle_parser = subparsers.add_parser(
'inspect-bundle',
description='Outputs contents of bundle',
formatter_class=formatter_class,
help="")
inspect_bundle_parser.set_defaults(func=inspect_bundle_handler)
inspect_bundle_parser.add_argument('pathname',
type=Path,
help='Path to bundle')
def sign_bundle_handler(arg) -> None:
"""Handles signing of a bundle"""
try:
signed_bundle = dev_sign.sign_update_bundle(
UpdateBundle.FromString(arg.bundle.read_bytes()),
arg.key.read_bytes())
arg.bundle.write_bytes(signed_bundle.SerializeToString())
except IOError as error:
print(error)
def _new_sign_bundle_parser(subparsers) -> None:
"""Parser for sign-bundle subcommand"""
formatter_class = lambda prog: argparse.HelpFormatter(
prog, max_help_position=100, width=200)
sign_bundle_parser = subparsers.add_parser(
'sign-bundle',
description='Sign an existing bundle using a development key',
formatter_class=formatter_class,
help="",
)
sign_bundle_parser.set_defaults(func=sign_bundle_handler)
required_arguments = sign_bundle_parser.add_argument_group(
'required arguments')
required_arguments.add_argument('--bundle',
help='Bundle to be signed',
metavar='BUNDLE',
required=True,
type=Path)
required_arguments.add_argument('--key',
help='Bundle signing key',
metavar='KEY',
required=True,
type=Path)
def add_file_to_bundle(bundle: UpdateBundle, file_name: str,
file_contents: bytes) -> UpdateBundle:
"""Adds a target file represented by file_name and file_contents to an
existing UpdateBundle -- bundle and returns the updated UpdateBundle object.
"""
if not file_name in bundle.target_payloads:
bundle.target_payloads[file_name] = file_contents
else:
raise Exception(f'File name {file_name} already exists in bundle')
signed_targets_metadata = bundle.targets_metadata['targets']
targets_metadata = TargetsMetadata().FromString(
signed_targets_metadata.serialized_targets_metadata)
matching_file_names = list(
filter(lambda name: name.file_name == file_name,
targets_metadata.target_files))
target_file = metadata.gen_target_file(file_name, file_contents)
if not matching_file_names:
targets_metadata.target_files.append(target_file)
else:
raise Exception(f'File name {file_name} already exists in bundle')
bundle.targets_metadata['targets'].serialized_targets_metadata =\
targets_metadata.SerializeToString()
return bundle
def add_file_to_bundle_handler(arg) -> None:
"""Add a new file to an existing bundle. Updates the targets metadata
and errors out if the file already exists.
"""
try:
if not arg.new_name:
file_name = os.path.splitext(os.path.basename(arg.file))[0]
else:
file_name = arg.new_name
bundle = UpdateBundle().FromString(arg.bundle.read_bytes())
updated_bundle = add_file_to_bundle(
bundle=bundle,
file_name=file_name,
file_contents=arg.file.read_bytes())
arg.bundle.write_bytes(updated_bundle.SerializeToString())
except (IOError) as error:
print(error)
def _new_add_file_to_bundle_parser(subparsers) -> None:
"""Parser for adding file to bundle subcommand"""
formatter_class = lambda prog: argparse.HelpFormatter(
prog, max_help_position=100, width=200)
add_file_to_bundle_parser = subparsers.add_parser(
'add-file-to-bundle',
description='Add a file to an existing bundle',
formatter_class=formatter_class,
help="")
add_file_to_bundle_parser.set_defaults(func=add_file_to_bundle_handler)
required_arguments = add_file_to_bundle_parser.add_argument_group(
'required arguments')
required_arguments.add_argument('--bundle',
help='Path to an existing bundle',
metavar='BUNDLE',
required=True,
type=Path)
required_arguments.add_argument('--file',
help='Path to a target file',
metavar='FILE_PATH',
required=True,
type=Path)
required_arguments.add_argument('--new-name',
help='Optional new name for target',
metavar='NEW_NAME',
required=False,
type=str)
def add_root_metadata_to_bundle_handler(arg) -> None:
"""Handles appending of root metadata to a bundle"""
try:
bundle = UpdateBundle().FromString(arg.bundle.read_bytes())
bundle.root_metadata.CopyFrom(SignedRootMetadata().FromString(
arg.append_root_metadata.read_bytes()))
arg.bundle.write_bytes(bundle.SerializeToString())
except IOError as error:
print(error)
def _new_add_root_metadata_to_bundle_parser(subparsers) -> None:
"""Parser for subcommand adding root metadata to bundle"""
formatter_class = lambda prog: argparse.HelpFormatter(
prog, max_help_position=100, width=200)
add_root_metadata_to_bundle_parser = subparsers.add_parser(
'add-root-metadata-to-bundle',
description='Add root metadata to a bundle',
formatter_class=formatter_class,
help="")
add_root_metadata_to_bundle_parser.set_defaults(
func=add_root_metadata_to_bundle_handler)
required_arguments = add_root_metadata_to_bundle_parser.add_argument_group(
'required arguments')
required_arguments.add_argument('--append-root-metadata',
help='Path to root metadata',
metavar='ROOT_METADATA',
required=True,
type=Path)
required_arguments.add_argument('--bundle',
help='Path to bundle',
metavar='BUNDLE',
required=True,
type=Path)
def create_empty_bundle_handler(arg) -> None:
"""Handles the creation of an empty bundle and writes it to disc."""
try:
bundle = update_bundle.gen_empty_update_bundle(
arg.target_metadata_version)
arg.pathname.write_bytes(bundle.SerializeToString())
except IOError as error:
print(error)
def _new_create_empty_bundle_parser(subparsers) -> None:
"""Parser for creation of an empty bundle."""
formatter_class = lambda prog: argparse.HelpFormatter(
prog, max_help_position=100, width=200)
create_empty_bundle_parser = subparsers.add_parser(
'create-empty-bundle',
description='Creation of an empty bundle',
formatter_class=formatter_class,
help="")
create_empty_bundle_parser.set_defaults(func=create_empty_bundle_handler)
create_empty_bundle_parser.add_argument(
'pathname', type=Path, help='Path to newly created empty bundle')
create_empty_bundle_parser.add_argument(
'--target-metadata-version',
help='Version number for targets metadata; Defaults to 1',
metavar='VERSION',
type=int,
default=1,
required=False)
def inspect_root_metadata_handler(arg) -> None:
"""Prints root metadata contents as defined by "RootMetadata" message
structure in tuf.proto as well as the number of identified signatures.
"""
try:
signed_root_metadata = SignedRootMetadata.FromString(
arg.pathname.read_bytes())
deserialized_root_metadata = RootMetadata.FromString(
signed_root_metadata.serialized_root_metadata)
print(deserialized_root_metadata)
print('Number of signatures found:',
len(signed_root_metadata.signatures))
except IOError as error:
print(error)
def _new_inspect_root_metadata_parser(subparsers) -> None:
"""Parser to handle inspect-root-metadata subcommand"""
formatter_class = lambda prog: argparse.HelpFormatter(
prog, max_help_position=100, width=200)
inspect_root_metadata_parser = subparsers.add_parser(
'inspect-root-metadata',
description='Outputs contents of root metadata',
formatter_class=formatter_class,
help="")
inspect_root_metadata_parser.set_defaults(
func=inspect_root_metadata_handler)
inspect_root_metadata_parser.add_argument('pathname',
type=Path,
help='Path to root metadata')
def sign_root_metadata_handler(arg) -> None:
"""Handler for signing of root metadata"""
try:
signed_root_metadata = dev_sign.sign_root_metadata(
SignedRootMetadata.FromString(arg.root_metadata.read_bytes()),
arg.root_key.read_bytes())
arg.root_metadata.write_bytes(signed_root_metadata.SerializeToString())
except IOError as error:
print(error)
def _new_sign_root_metadata_parser(subparsers) -> None:
"""Parser to handle sign-root-metadata subcommand"""
formatter_class = lambda prog: argparse.HelpFormatter(
prog, max_help_position=100, width=200)
sign_root_metadata_parser = subparsers.add_parser(
'sign-root-metadata',
description='Signing of root metadata',
formatter_class=formatter_class,
help="",
)
sign_root_metadata_parser.set_defaults(func=sign_root_metadata_handler)
required_arguments = sign_root_metadata_parser.add_argument_group(
'required arguments')
required_arguments.add_argument('--root-metadata',
help='Root metadata to be signed',
metavar='ROOT_METADATA',
required=True,
type=Path)
required_arguments.add_argument('--root-key',
help='Root signing key',
metavar='ROOT_KEY',
required=True,
type=Path)
def create_root_metadata_handler(arg) -> None:
"""Handler function for creation of root metadata."""
try:
root_metadata.main(arg.out, arg.append_root_key,
arg.append_targets_key, arg.version)
# TODO(eashansingh): Print message that allows user
# to visualize root metadata with
# `pw update inspect-root-metadata` command
except IOError as error:
print(error)
def _new_create_root_metadata_parser(subparsers) -> None:
"""Parser to handle create-root-metadata subcommand."""
formatter_class = lambda prog: argparse.HelpFormatter(
prog, max_help_position=100, width=200)
create_root_metadata_parser = subparsers.add_parser(
'create-root-metadata',
description='Creation of root metadata',
formatter_class=formatter_class,
help='',
)
create_root_metadata_parser.set_defaults(func=create_root_metadata_handler)
create_root_metadata_parser.add_argument(
'--version',
help='Canonical version number for rollback checks; Defaults to 1',
type=int,
default=1,
required=False)
required_arguments = create_root_metadata_parser.add_argument_group(
'required arguments')
required_arguments.add_argument('--append-root-key',
help='Path to root key',
metavar='ROOT_KEY',
required=True,
action='append',
type=Path)
required_arguments.add_argument('--append-targets-key',
help='Path to targets key',
metavar='TARGETS_KEY',
required=True,
action='append',
type=Path)
required_arguments.add_argument('-o',
'--out',
help='Path to output file',
required=True,
type=Path)
def generate_key_handler(arg) -> None:
""" Handler function for key generation"""
try:
keys.gen_ecdsa_keypair(arg.pathname)
print('Private key: ' + str(arg.pathname))
print('Public key: ' + str(arg.pathname) + '.pub')
except IOError as error:
print(error)
def _new_generate_key_parser(subparsers) -> None:
"""Parser to handle key generation subcommand."""
generate_key_parser = subparsers.add_parser(
'generate-key',
description=
'Generates an ecdsa-sha2-nistp256 signing key pair (private + public)',
help='')
generate_key_parser.set_defaults(func=generate_key_handler)
generate_key_parser.add_argument('pathname',
type=Path,
help='Path to generated key pair')
def _parse_args() -> argparse.Namespace:
parser_root = argparse.ArgumentParser(
description='Software update related operations.',
epilog='Learn more at: pigweed.dev/pw_software_update')
parser_root.set_defaults(
func=lambda *_args, **_kwargs: parser_root.print_help())
subparsers = parser_root.add_subparsers()
# Key generation related parsers
_new_generate_key_parser(subparsers)
# Root metadata related parsers
_new_create_root_metadata_parser(subparsers)
_new_sign_root_metadata_parser(subparsers)
_new_inspect_root_metadata_parser(subparsers)
# Bundle related parsers
_new_create_empty_bundle_parser(subparsers)
_new_add_root_metadata_to_bundle_parser(subparsers)
_new_add_file_to_bundle_parser(subparsers)
_new_sign_bundle_parser(subparsers)
_new_inspect_bundle_parser(subparsers)
return parser_root.parse_args()
def _dispatch_command(args) -> None:
args.func(args)
def main() -> int:
"""Software update command-line interface(WIP)."""
_dispatch_command(_parse_args())
return 0
if __name__ == '__main__':
sys.exit(main())