pw_software_update: Create unsigned update bundles
Testing: Includes baseline unit test coverage
Change-Id: Idb0dfed54cb52e600a5d1acb0002bf051225a84d
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/60043
Commit-Queue: Joe Ethier <jethier@google.com>
Reviewed-by: Ali Zhang <alizhang@google.com>
diff --git a/pw_env_setup/BUILD.gn b/pw_env_setup/BUILD.gn
index 572e859..0cd60bd 100644
--- a/pw_env_setup/BUILD.gn
+++ b/pw_env_setup/BUILD.gn
@@ -47,6 +47,7 @@
"$dir_pw_rpc/py",
"$dir_pw_snapshot/py:pw_snapshot",
"$dir_pw_snapshot/py:pw_snapshot_metadata",
+ "$dir_pw_software_update/py",
"$dir_pw_status/py",
"$dir_pw_stm32cube_build/py",
"$dir_pw_symbolizer/py",
diff --git a/pw_software_update/BUILD.gn b/pw_software_update/BUILD.gn
index 59bf165..16ba387 100644
--- a/pw_software_update/BUILD.gn
+++ b/pw_software_update/BUILD.gn
@@ -28,6 +28,7 @@
"update_bundle.proto",
]
prefix = "pw_software_update"
+ python_package = "py"
}
pw_source_set("update_bundle") {
diff --git a/pw_software_update/py/BUILD.gn b/pw_software_update/py/BUILD.gn
new file mode 100644
index 0000000..bc76091
--- /dev/null
+++ b/pw_software_update/py/BUILD.gn
@@ -0,0 +1,39 @@
+# Copyright 2021 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+
+import("//build_overrides/pigweed.gni")
+
+import("$dir_pw_build/python.gni")
+
+pw_python_package("py") {
+ generate_setup = {
+ metadata = {
+ name = "pw_software_update"
+ version = "0.0.1"
+ }
+ options = {
+ }
+ }
+ sources = [
+ "pw_software_update/__init__.py",
+ "pw_software_update/metadata.py",
+ "pw_software_update/update_bundle.py",
+ ]
+ tests = [
+ "metadata_test.py",
+ "update_bundle_test.py",
+ ]
+ pylintrc = "$dir_pigweed/.pylintrc"
+ proto_library = "..:protos"
+}
diff --git a/pw_software_update/py/metadata_test.py b/pw_software_update/py/metadata_test.py
new file mode 100644
index 0000000..b78d4ff
--- /dev/null
+++ b/pw_software_update/py/metadata_test.py
@@ -0,0 +1,47 @@
+# Copyright 2021 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""Unit tests for pw_software_update/metadata.py."""
+
+import unittest
+
+from pw_software_update import metadata
+from pw_software_update.tuf_pb2 import HashFunction
+
+
+class GenTargetsMetadataTest(unittest.TestCase):
+ """Test the generation of targets metadata."""
+ def test_multiple_targets(self):
+ """Checks that multiple targets generates multiple TargetFiles."""
+ target_payloads = {
+ 'foo': b'\x1e\xe7',
+ 'bar': b'\x12\x34',
+ }
+ targets_metadata = metadata.gen_targets_metadata(
+ target_payloads, (HashFunction.SHA256, ))
+ self.assertEqual(2, len(targets_metadata.target_files))
+
+
+class GenHashesTest(unittest.TestCase):
+ """Test the generation of hashes."""
+ def test_sha256(self):
+ """Checks that SHA256 hashes are computed and stored properly."""
+ data = b'\x1e\xe7'
+ sha256_hash = metadata.gen_hashes(data, (HashFunction.SHA256, ))[0]
+ self.assertEqual(
+ '9f36ce605a3b28110d2a25ec36bdfff86059086cbd53c9efc1428ef01070515d',
+ sha256_hash.hash.hex())
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/pw_software_update/py/pw_software_update/__init__.py b/pw_software_update/py/pw_software_update/__init__.py
new file mode 100644
index 0000000..a643e27
--- /dev/null
+++ b/pw_software_update/py/pw_software_update/__init__.py
@@ -0,0 +1,14 @@
+# Copyright 2021 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""pw_software_update"""
diff --git a/pw_software_update/py/pw_software_update/metadata.py b/pw_software_update/py/pw_software_update/metadata.py
new file mode 100644
index 0000000..21fdefe
--- /dev/null
+++ b/pw_software_update/py/pw_software_update/metadata.py
@@ -0,0 +1,61 @@
+# Copyright 2021 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""Facilities for generating TUF target metadata."""
+
+import hashlib
+from typing import Dict, Iterable
+
+from pw_software_update.tuf_pb2 import (CommonMetadata, Hash, HashFunction,
+ TargetFile, TargetsMetadata)
+
+HASH_FACTORIES = {
+ HashFunction.SHA256: hashlib.sha256,
+}
+DEFAULT_HASHES = (HashFunction.SHA256, )
+
+
+def gen_targets_metadata(
+ target_payloads: Dict[str, bytes],
+ hash_funcs: Iterable['HashFunction.V'] = DEFAULT_HASHES
+) -> TargetsMetadata:
+ """Generates TargetsMetadata the given target payloads."""
+ target_files = []
+ for target_file_name, target_payload in target_payloads.items():
+ target_files.append(
+ TargetFile(file_name=target_file_name,
+ length=len(target_payload),
+ hashes=gen_hashes(target_payload, hash_funcs)))
+
+ return TargetsMetadata(common_metadata=gen_commmon_metadata(),
+ target_files=target_files)
+
+
+def gen_hashes(data: bytes,
+ hash_funcs: Iterable['HashFunction.V']) -> Iterable[Hash]:
+ """Computes all the specified hashes over the data."""
+ result = []
+ for func in hash_funcs:
+ if func == HashFunction.UNKNOWN_HASH_FUNCTION:
+ raise ValueError(
+ 'UNKNOWN_HASH_FUNCTION cannot be used to generate hashes.')
+ digest = HASH_FACTORIES[func](data).digest()
+ result.append(Hash(function=func, hash=digest))
+
+ return result
+
+
+def gen_commmon_metadata() -> CommonMetadata:
+ """Generates CommonMetadata."""
+ # TODO(jethier): Figure out where common metadata should actually come from.
+ return CommonMetadata(spec_version="0.0.0", version=0)
diff --git a/pw_software_update/py/pw_software_update/update_bundle.py b/pw_software_update/py/pw_software_update/update_bundle.py
new file mode 100644
index 0000000..ae6887d
--- /dev/null
+++ b/pw_software_update/py/pw_software_update/update_bundle.py
@@ -0,0 +1,137 @@
+# Copyright 2021 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""Facilities for generating and serializing update bundles."""
+
+import argparse
+import logging
+from pathlib import Path
+from typing import Dict, Iterable, Optional, Tuple
+
+from pw_software_update import metadata
+from pw_software_update.tuf_pb2 import SignedTargetsMetadata
+from pw_software_update.update_bundle_pb2 import UpdateBundle
+
+_LOG = logging.getLogger(__package__)
+
+
+def gen_unsigned_update_bundle(
+ tuf_repo: Path,
+ exclude: Iterable[Path] = tuple(),
+ remap_paths: Optional[Dict[Path, str]] = None) -> UpdateBundle:
+ """Given a set of targets, generates an unsigned UpdateBundle.
+
+ Args:
+ tuf_repo: Path to a directory which will be ingested as a TUF repository.
+ exclude: Iterable of paths in tuf_repo to exclude from the UpdateBundle.
+ remap_paths: Dict mapping paths in tuf_repo to new target file names.
+
+ The input directory will be treated as a TUF repository for the purposes of
+ building an UpdateBundle instance. Each file in the input directory will be
+ read in as a target file, unless its path (relative to the TUF repo root) is
+ among the excludes.
+
+ Default behavior is to treat TUF repo root-relative paths as the strings to
+ use as targets file names, but remapping can be used to change a target file
+ name to any string. If some remappings are provided but a file is found that
+ does not have a remapping, a warning will be logged. If a remap is declared
+ for a file that does not exist, FileNotFoundError will be raised.
+ """
+ if not tuf_repo.is_dir():
+ raise ValueError('TUF repository must be a directory.')
+ target_payloads = {}
+ for path in tuf_repo.glob('*'):
+ rel_path = path.relative_to(tuf_repo)
+ if rel_path in exclude:
+ continue
+ target_file_name = str(rel_path)
+ if remap_paths:
+ if rel_path in remap_paths:
+ target_file_name = remap_paths[rel_path]
+ else:
+ _LOG.warning('Some remaps defined, but not "%s"',
+ target_file_name)
+ target_payloads[target_file_name] = path.read_bytes()
+
+ if remap_paths is not None:
+ for original_path, new_target_file_name in remap_paths.items():
+ if new_target_file_name not in target_payloads:
+ raise FileNotFoundError(
+ f'Unable to remap "{original_path}" to'
+ f' "{new_target_file_name}"; file not found in TUF'
+ ' repository.')
+
+ targets_metadata = metadata.gen_targets_metadata(target_payloads)
+ unsigned_targets_metadata = SignedTargetsMetadata(
+ serialized_targets_metadata=targets_metadata.SerializeToString())
+ return UpdateBundle(
+ targets_metadata=dict(targets=unsigned_targets_metadata),
+ target_payloads=target_payloads)
+
+
+def parse_remap_arg(remap_arg: str) -> Tuple[Path, str]:
+ """Parse the string passed in to the remap argument.
+
+ Remap strings take the following form:
+ "<ORIGINAL FILENAME> > <NEW TARGET PATH>"
+
+ For example:
+ "fw_images/main_image.bin > main"
+ """
+ try:
+ original_path, new_target_file_name = remap_arg.split('>')
+ return Path(original_path.strip()), new_target_file_name.strip()
+ except ValueError as err:
+ raise ValueError('Path remaps must be strings of the form:\n'
+ ' "<ORIGINAL PATH> > <NEW TARGET PATH>"') from err
+
+
+def parse_args():
+ """Parse CLI arguments."""
+ parser = argparse.ArgumentParser(description=__doc__)
+ parser.add_argument('-t',
+ '--tuf-repo',
+ type=Path,
+ help='Directory to ingest as TUF repository')
+ parser.add_argument('-o',
+ '--out',
+ type=Path,
+ help='Output path for serialized UpdateBundle')
+ parser.add_argument('-e'
+ '--exclude',
+ type=Path,
+ nargs='+',
+ help='Exclude a path from the TUF repository')
+ parser.add_argument('-r',
+ '--remap',
+ type=str,
+ nargs='+',
+ help='Remap a path to a custom target file name')
+ return parser.parse_args()
+
+
+def main(tuf_repo: Path, out: Path, exclude: Iterable[Path],
+ remap: Iterable[str]) -> None:
+ """Generates an UpdateBundle and serializes it to disk."""
+ remap_paths = {}
+ for remap_arg in remap:
+ path, new_target_file_name = parse_remap_arg(remap_arg)
+ remap_paths[path] = new_target_file_name
+
+ bundle = gen_unsigned_update_bundle(tuf_repo, exclude, remap_paths)
+ out.write_bytes(bundle.SerializeToString())
+
+
+if __name__ == '__main__':
+ logging.basicConfig()
+ main(**vars(parse_args()))
diff --git a/pw_software_update/py/update_bundle_test.py b/pw_software_update/py/update_bundle_test.py
new file mode 100644
index 0000000..8954117
--- /dev/null
+++ b/pw_software_update/py/update_bundle_test.py
@@ -0,0 +1,142 @@
+# Copyright 2021 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""Unit tests for pw_software_update/update_bundle.py."""
+
+from pathlib import Path
+import tempfile
+import unittest
+
+from pw_software_update import update_bundle
+
+
+class GenUnsignedUpdateBundleTest(unittest.TestCase):
+ """Test the generation of unsigned update bundles."""
+ def test_bundle_generation(self):
+ """Tests basic creation of an UpdateBundle from temp dir."""
+ foo_bytes = b'\xf0\x0b\xa4'
+ bar_bytes = b'\x0b\xa4\x99'
+ baz_bytes = b'\xba\x59\x06'
+ qux_bytes = b'\x8a\xf3\x12'
+ with tempfile.TemporaryDirectory() as tempdir_name:
+ temp_root = Path(tempdir_name)
+ (temp_root / 'foo.bin').write_bytes(foo_bytes)
+ (temp_root / 'bar.bin').write_bytes(bar_bytes)
+ (temp_root / 'baz.bin').write_bytes(baz_bytes)
+ (temp_root / 'qux.exe').write_bytes(qux_bytes)
+ bundle = update_bundle.gen_unsigned_update_bundle(temp_root)
+
+ self.assertEqual(foo_bytes, bundle.target_payloads['foo.bin'])
+ self.assertEqual(bar_bytes, bundle.target_payloads['bar.bin'])
+ self.assertEqual(baz_bytes, bundle.target_payloads['baz.bin'])
+ self.assertEqual(qux_bytes, bundle.target_payloads['qux.exe'])
+
+ def test_excludes(self):
+ """Checks that excludes are excluded from update bundles."""
+ foo_bytes = b'\xf0\x0b\xa4'
+ bar_bytes = b'\x0b\xa4\x99'
+ baz_bytes = b'\xba\x59\x06'
+ qux_bytes = b'\x8a\xf3\x12'
+ with tempfile.TemporaryDirectory() as tempdir_name:
+ temp_root = Path(tempdir_name)
+ (temp_root / 'foo.bin').write_bytes(foo_bytes)
+ (temp_root / 'bar.bin').write_bytes(bar_bytes)
+ (temp_root / 'baz.bin').write_bytes(baz_bytes)
+ (temp_root / 'qux.exe').write_bytes(qux_bytes)
+ bundle = update_bundle.gen_unsigned_update_bundle(
+ temp_root, exclude=(Path('foo.bin'), Path('baz.bin')))
+
+ self.assertNotIn('foo.bin', bundle.target_payloads)
+ self.assertEqual(bar_bytes, bundle.target_payloads['bar.bin'])
+ self.assertNotIn(
+ 'baz.bin',
+ bundle.target_payloads,
+ )
+ self.assertEqual(qux_bytes, bundle.target_payloads['qux.exe'])
+
+ def test_excludes_and_remapping(self):
+ """Checks that remapping works, even in combination with excludes."""
+ foo_bytes = b'\x12\xab\x34'
+ bar_bytes = b'\xcd\x56\xef'
+ baz_bytes = b'\xa1\xb2\xc3'
+ qux_bytes = b'\x1f\x2e\x3d'
+ remap_paths = {
+ Path('foo.bin'): 'main',
+ Path('bar.bin'): 'backup',
+ Path('baz.bin'): 'tertiary',
+ }
+ with tempfile.TemporaryDirectory() as tempdir_name:
+ temp_root = Path(tempdir_name)
+ (temp_root / 'foo.bin').write_bytes(foo_bytes)
+ (temp_root / 'bar.bin').write_bytes(bar_bytes)
+ (temp_root / 'baz.bin').write_bytes(baz_bytes)
+ (temp_root / 'qux.exe').write_bytes(qux_bytes)
+ bundle = update_bundle.gen_unsigned_update_bundle(
+ temp_root,
+ exclude=(Path('qux.exe'), ),
+ remap_paths=remap_paths)
+
+ self.assertEqual(foo_bytes, bundle.target_payloads['main'])
+ self.assertEqual(bar_bytes, bundle.target_payloads['backup'])
+ self.assertEqual(baz_bytes, bundle.target_payloads['tertiary'])
+ self.assertNotIn('qux.exe', bundle.target_payloads)
+
+ def test_incomplete_remapping_logs(self):
+ """Checks that incomplete remappings log warnings."""
+ foo_bytes = b'\x12\xab\x34'
+ bar_bytes = b'\xcd\x56\xef'
+ remap_paths = {Path('foo.bin'): 'main'}
+ with tempfile.TemporaryDirectory() as tempdir_name:
+ temp_root = Path(tempdir_name)
+ (temp_root / 'foo.bin').write_bytes(foo_bytes)
+ (temp_root / 'bar.bin').write_bytes(bar_bytes)
+ with self.assertLogs(level='WARNING') as log:
+ update_bundle.gen_unsigned_update_bundle(
+ temp_root,
+ exclude=(Path('qux.exe'), ),
+ remap_paths=remap_paths)
+ self.assertIn('Some remaps defined, but not "bar.bin"',
+ log.output[0])
+
+ def test_remap_of_missing_file(self):
+ """Checks that remapping a missing file raises an error."""
+ foo_bytes = b'\x12\xab\x34'
+ remap_paths = {
+ Path('foo.bin'): 'main',
+ Path('bar.bin'): 'backup',
+ }
+ with tempfile.TemporaryDirectory() as tempdir_name:
+ temp_root = Path(tempdir_name)
+ (temp_root / 'foo.bin').write_bytes(foo_bytes)
+ with self.assertRaises(FileNotFoundError):
+ update_bundle.gen_unsigned_update_bundle(
+ temp_root, remap_paths=remap_paths)
+
+
+class ParseRemapArgTest(unittest.TestCase):
+ """Test the parsing of remap argument strings."""
+ def test_valid_arg(self):
+ """Checks that valid remap strings are parsed correctly."""
+ original_path, new_target_file_name = update_bundle.parse_remap_arg(
+ 'foo.bin > main')
+ self.assertEqual(Path('foo.bin'), original_path)
+ self.assertEqual('main', new_target_file_name)
+
+ def test_invalid_arg_raises(self):
+ """Checks that invalid remap string raise an error."""
+ with self.assertRaises(ValueError):
+ update_bundle.parse_remap_arg('foo.bin main')
+
+
+if __name__ == '__main__':
+ unittest.main()