pw_software_update: Create unsigned update bundles

Testing: Includes baseline unit test coverage
Change-Id: Idb0dfed54cb52e600a5d1acb0002bf051225a84d
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/60043
Commit-Queue: Joe Ethier <jethier@google.com>
Reviewed-by: Ali Zhang <alizhang@google.com>
diff --git a/pw_env_setup/BUILD.gn b/pw_env_setup/BUILD.gn
index 572e859..0cd60bd 100644
--- a/pw_env_setup/BUILD.gn
+++ b/pw_env_setup/BUILD.gn
@@ -47,6 +47,7 @@
   "$dir_pw_rpc/py",
   "$dir_pw_snapshot/py:pw_snapshot",
   "$dir_pw_snapshot/py:pw_snapshot_metadata",
+  "$dir_pw_software_update/py",
   "$dir_pw_status/py",
   "$dir_pw_stm32cube_build/py",
   "$dir_pw_symbolizer/py",
diff --git a/pw_software_update/BUILD.gn b/pw_software_update/BUILD.gn
index 59bf165..16ba387 100644
--- a/pw_software_update/BUILD.gn
+++ b/pw_software_update/BUILD.gn
@@ -28,6 +28,7 @@
     "update_bundle.proto",
   ]
   prefix = "pw_software_update"
+  python_package = "py"
 }
 
 pw_source_set("update_bundle") {
diff --git a/pw_software_update/py/BUILD.gn b/pw_software_update/py/BUILD.gn
new file mode 100644
index 0000000..bc76091
--- /dev/null
+++ b/pw_software_update/py/BUILD.gn
@@ -0,0 +1,39 @@
+# Copyright 2021 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+#     https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+
+import("//build_overrides/pigweed.gni")
+
+import("$dir_pw_build/python.gni")
+
+pw_python_package("py") {
+  generate_setup = {
+    metadata = {
+      name = "pw_software_update"
+      version = "0.0.1"
+    }
+    options = {
+    }
+  }
+  sources = [
+    "pw_software_update/__init__.py",
+    "pw_software_update/metadata.py",
+    "pw_software_update/update_bundle.py",
+  ]
+  tests = [
+    "metadata_test.py",
+    "update_bundle_test.py",
+  ]
+  pylintrc = "$dir_pigweed/.pylintrc"
+  proto_library = "..:protos"
+}
diff --git a/pw_software_update/py/metadata_test.py b/pw_software_update/py/metadata_test.py
new file mode 100644
index 0000000..b78d4ff
--- /dev/null
+++ b/pw_software_update/py/metadata_test.py
@@ -0,0 +1,47 @@
+# Copyright 2021 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+#     https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""Unit tests for pw_software_update/metadata.py."""
+
+import unittest
+
+from pw_software_update import metadata
+from pw_software_update.tuf_pb2 import HashFunction
+
+
+class GenTargetsMetadataTest(unittest.TestCase):
+    """Test the generation of targets metadata."""
+    def test_multiple_targets(self):
+        """Checks that multiple targets generates multiple TargetFiles."""
+        target_payloads = {
+            'foo': b'\x1e\xe7',
+            'bar': b'\x12\x34',
+        }
+        targets_metadata = metadata.gen_targets_metadata(
+            target_payloads, (HashFunction.SHA256, ))
+        self.assertEqual(2, len(targets_metadata.target_files))
+
+
+class GenHashesTest(unittest.TestCase):
+    """Test the generation of hashes."""
+    def test_sha256(self):
+        """Checks that SHA256 hashes are computed and stored properly."""
+        data = b'\x1e\xe7'
+        sha256_hash = metadata.gen_hashes(data, (HashFunction.SHA256, ))[0]
+        self.assertEqual(
+            '9f36ce605a3b28110d2a25ec36bdfff86059086cbd53c9efc1428ef01070515d',
+            sha256_hash.hash.hex())
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/pw_software_update/py/pw_software_update/__init__.py b/pw_software_update/py/pw_software_update/__init__.py
new file mode 100644
index 0000000..a643e27
--- /dev/null
+++ b/pw_software_update/py/pw_software_update/__init__.py
@@ -0,0 +1,14 @@
+# Copyright 2021 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+#     https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""pw_software_update"""
diff --git a/pw_software_update/py/pw_software_update/metadata.py b/pw_software_update/py/pw_software_update/metadata.py
new file mode 100644
index 0000000..21fdefe
--- /dev/null
+++ b/pw_software_update/py/pw_software_update/metadata.py
@@ -0,0 +1,61 @@
+# Copyright 2021 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+#     https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""Facilities for generating TUF target metadata."""
+
+import hashlib
+from typing import Dict, Iterable
+
+from pw_software_update.tuf_pb2 import (CommonMetadata, Hash, HashFunction,
+                                        TargetFile, TargetsMetadata)
+
+HASH_FACTORIES = {
+    HashFunction.SHA256: hashlib.sha256,
+}
+DEFAULT_HASHES = (HashFunction.SHA256, )
+
+
+def gen_targets_metadata(
+    target_payloads: Dict[str, bytes],
+    hash_funcs: Iterable['HashFunction.V'] = DEFAULT_HASHES
+) -> TargetsMetadata:
+    """Generates TargetsMetadata the given target payloads."""
+    target_files = []
+    for target_file_name, target_payload in target_payloads.items():
+        target_files.append(
+            TargetFile(file_name=target_file_name,
+                       length=len(target_payload),
+                       hashes=gen_hashes(target_payload, hash_funcs)))
+
+    return TargetsMetadata(common_metadata=gen_commmon_metadata(),
+                           target_files=target_files)
+
+
+def gen_hashes(data: bytes,
+               hash_funcs: Iterable['HashFunction.V']) -> Iterable[Hash]:
+    """Computes all the specified hashes over the data."""
+    result = []
+    for func in hash_funcs:
+        if func == HashFunction.UNKNOWN_HASH_FUNCTION:
+            raise ValueError(
+                'UNKNOWN_HASH_FUNCTION cannot be used to generate hashes.')
+        digest = HASH_FACTORIES[func](data).digest()
+        result.append(Hash(function=func, hash=digest))
+
+    return result
+
+
+def gen_commmon_metadata() -> CommonMetadata:
+    """Generates CommonMetadata."""
+    # TODO(jethier): Figure out where common metadata should actually come from.
+    return CommonMetadata(spec_version="0.0.0", version=0)
diff --git a/pw_software_update/py/pw_software_update/update_bundle.py b/pw_software_update/py/pw_software_update/update_bundle.py
new file mode 100644
index 0000000..ae6887d
--- /dev/null
+++ b/pw_software_update/py/pw_software_update/update_bundle.py
@@ -0,0 +1,137 @@
+# Copyright 2021 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+#     https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""Facilities for generating and serializing update bundles."""
+
+import argparse
+import logging
+from pathlib import Path
+from typing import Dict, Iterable, Optional, Tuple
+
+from pw_software_update import metadata
+from pw_software_update.tuf_pb2 import SignedTargetsMetadata
+from pw_software_update.update_bundle_pb2 import UpdateBundle
+
+_LOG = logging.getLogger(__package__)
+
+
+def gen_unsigned_update_bundle(
+        tuf_repo: Path,
+        exclude: Iterable[Path] = tuple(),
+        remap_paths: Optional[Dict[Path, str]] = None) -> UpdateBundle:
+    """Given a set of targets, generates an unsigned UpdateBundle.
+
+    Args:
+      tuf_repo: Path to a directory which will be ingested as a TUF repository.
+      exclude: Iterable of paths in tuf_repo to exclude from the UpdateBundle.
+      remap_paths: Dict mapping paths in tuf_repo to new target file names.
+
+    The input directory will be treated as a TUF repository for the purposes of
+    building an UpdateBundle instance. Each file in the input directory will be
+    read in as a target file, unless its path (relative to the TUF repo root) is
+    among the excludes.
+
+    Default behavior is to treat TUF repo root-relative paths as the strings to
+    use as targets file names, but remapping can be used to change a target file
+    name to any string. If some remappings are provided but a file is found that
+    does not have a remapping, a warning will be logged. If a remap is declared
+    for a file that does not exist, FileNotFoundError will be raised.
+    """
+    if not tuf_repo.is_dir():
+        raise ValueError('TUF repository must be a directory.')
+    target_payloads = {}
+    for path in tuf_repo.glob('*'):
+        rel_path = path.relative_to(tuf_repo)
+        if rel_path in exclude:
+            continue
+        target_file_name = str(rel_path)
+        if remap_paths:
+            if rel_path in remap_paths:
+                target_file_name = remap_paths[rel_path]
+            else:
+                _LOG.warning('Some remaps defined, but not "%s"',
+                             target_file_name)
+        target_payloads[target_file_name] = path.read_bytes()
+
+    if remap_paths is not None:
+        for original_path, new_target_file_name in remap_paths.items():
+            if new_target_file_name not in target_payloads:
+                raise FileNotFoundError(
+                    f'Unable to remap "{original_path}" to'
+                    f' "{new_target_file_name}"; file not found in TUF'
+                    ' repository.')
+
+    targets_metadata = metadata.gen_targets_metadata(target_payloads)
+    unsigned_targets_metadata = SignedTargetsMetadata(
+        serialized_targets_metadata=targets_metadata.SerializeToString())
+    return UpdateBundle(
+        targets_metadata=dict(targets=unsigned_targets_metadata),
+        target_payloads=target_payloads)
+
+
+def parse_remap_arg(remap_arg: str) -> Tuple[Path, str]:
+    """Parse the string passed in to the remap argument.
+
+    Remap strings take the following form:
+      "<ORIGINAL FILENAME> > <NEW TARGET PATH>"
+
+    For example:
+      "fw_images/main_image.bin > main"
+    """
+    try:
+        original_path, new_target_file_name = remap_arg.split('>')
+        return Path(original_path.strip()), new_target_file_name.strip()
+    except ValueError as err:
+        raise ValueError('Path remaps must be strings of the form:\n'
+                         '  "<ORIGINAL PATH> > <NEW TARGET PATH>"') from err
+
+
+def parse_args():
+    """Parse CLI arguments."""
+    parser = argparse.ArgumentParser(description=__doc__)
+    parser.add_argument('-t',
+                        '--tuf-repo',
+                        type=Path,
+                        help='Directory to ingest as TUF repository')
+    parser.add_argument('-o',
+                        '--out',
+                        type=Path,
+                        help='Output path for serialized UpdateBundle')
+    parser.add_argument('-e'
+                        '--exclude',
+                        type=Path,
+                        nargs='+',
+                        help='Exclude a path from the TUF repository')
+    parser.add_argument('-r',
+                        '--remap',
+                        type=str,
+                        nargs='+',
+                        help='Remap a path to a custom target file name')
+    return parser.parse_args()
+
+
+def main(tuf_repo: Path, out: Path, exclude: Iterable[Path],
+         remap: Iterable[str]) -> None:
+    """Generates an UpdateBundle and serializes it to disk."""
+    remap_paths = {}
+    for remap_arg in remap:
+        path, new_target_file_name = parse_remap_arg(remap_arg)
+        remap_paths[path] = new_target_file_name
+
+    bundle = gen_unsigned_update_bundle(tuf_repo, exclude, remap_paths)
+    out.write_bytes(bundle.SerializeToString())
+
+
+if __name__ == '__main__':
+    logging.basicConfig()
+    main(**vars(parse_args()))
diff --git a/pw_software_update/py/update_bundle_test.py b/pw_software_update/py/update_bundle_test.py
new file mode 100644
index 0000000..8954117
--- /dev/null
+++ b/pw_software_update/py/update_bundle_test.py
@@ -0,0 +1,142 @@
+# Copyright 2021 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+#     https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""Unit tests for pw_software_update/update_bundle.py."""
+
+from pathlib import Path
+import tempfile
+import unittest
+
+from pw_software_update import update_bundle
+
+
+class GenUnsignedUpdateBundleTest(unittest.TestCase):
+    """Test the generation of unsigned update bundles."""
+    def test_bundle_generation(self):
+        """Tests basic creation of an UpdateBundle from temp dir."""
+        foo_bytes = b'\xf0\x0b\xa4'
+        bar_bytes = b'\x0b\xa4\x99'
+        baz_bytes = b'\xba\x59\x06'
+        qux_bytes = b'\x8a\xf3\x12'
+        with tempfile.TemporaryDirectory() as tempdir_name:
+            temp_root = Path(tempdir_name)
+            (temp_root / 'foo.bin').write_bytes(foo_bytes)
+            (temp_root / 'bar.bin').write_bytes(bar_bytes)
+            (temp_root / 'baz.bin').write_bytes(baz_bytes)
+            (temp_root / 'qux.exe').write_bytes(qux_bytes)
+            bundle = update_bundle.gen_unsigned_update_bundle(temp_root)
+
+        self.assertEqual(foo_bytes, bundle.target_payloads['foo.bin'])
+        self.assertEqual(bar_bytes, bundle.target_payloads['bar.bin'])
+        self.assertEqual(baz_bytes, bundle.target_payloads['baz.bin'])
+        self.assertEqual(qux_bytes, bundle.target_payloads['qux.exe'])
+
+    def test_excludes(self):
+        """Checks that excludes are excluded from update bundles."""
+        foo_bytes = b'\xf0\x0b\xa4'
+        bar_bytes = b'\x0b\xa4\x99'
+        baz_bytes = b'\xba\x59\x06'
+        qux_bytes = b'\x8a\xf3\x12'
+        with tempfile.TemporaryDirectory() as tempdir_name:
+            temp_root = Path(tempdir_name)
+            (temp_root / 'foo.bin').write_bytes(foo_bytes)
+            (temp_root / 'bar.bin').write_bytes(bar_bytes)
+            (temp_root / 'baz.bin').write_bytes(baz_bytes)
+            (temp_root / 'qux.exe').write_bytes(qux_bytes)
+            bundle = update_bundle.gen_unsigned_update_bundle(
+                temp_root, exclude=(Path('foo.bin'), Path('baz.bin')))
+
+        self.assertNotIn('foo.bin', bundle.target_payloads)
+        self.assertEqual(bar_bytes, bundle.target_payloads['bar.bin'])
+        self.assertNotIn(
+            'baz.bin',
+            bundle.target_payloads,
+        )
+        self.assertEqual(qux_bytes, bundle.target_payloads['qux.exe'])
+
+    def test_excludes_and_remapping(self):
+        """Checks that remapping works, even in combination with excludes."""
+        foo_bytes = b'\x12\xab\x34'
+        bar_bytes = b'\xcd\x56\xef'
+        baz_bytes = b'\xa1\xb2\xc3'
+        qux_bytes = b'\x1f\x2e\x3d'
+        remap_paths = {
+            Path('foo.bin'): 'main',
+            Path('bar.bin'): 'backup',
+            Path('baz.bin'): 'tertiary',
+        }
+        with tempfile.TemporaryDirectory() as tempdir_name:
+            temp_root = Path(tempdir_name)
+            (temp_root / 'foo.bin').write_bytes(foo_bytes)
+            (temp_root / 'bar.bin').write_bytes(bar_bytes)
+            (temp_root / 'baz.bin').write_bytes(baz_bytes)
+            (temp_root / 'qux.exe').write_bytes(qux_bytes)
+            bundle = update_bundle.gen_unsigned_update_bundle(
+                temp_root,
+                exclude=(Path('qux.exe'), ),
+                remap_paths=remap_paths)
+
+        self.assertEqual(foo_bytes, bundle.target_payloads['main'])
+        self.assertEqual(bar_bytes, bundle.target_payloads['backup'])
+        self.assertEqual(baz_bytes, bundle.target_payloads['tertiary'])
+        self.assertNotIn('qux.exe', bundle.target_payloads)
+
+    def test_incomplete_remapping_logs(self):
+        """Checks that incomplete remappings log warnings."""
+        foo_bytes = b'\x12\xab\x34'
+        bar_bytes = b'\xcd\x56\xef'
+        remap_paths = {Path('foo.bin'): 'main'}
+        with tempfile.TemporaryDirectory() as tempdir_name:
+            temp_root = Path(tempdir_name)
+            (temp_root / 'foo.bin').write_bytes(foo_bytes)
+            (temp_root / 'bar.bin').write_bytes(bar_bytes)
+            with self.assertLogs(level='WARNING') as log:
+                update_bundle.gen_unsigned_update_bundle(
+                    temp_root,
+                    exclude=(Path('qux.exe'), ),
+                    remap_paths=remap_paths)
+                self.assertIn('Some remaps defined, but not "bar.bin"',
+                              log.output[0])
+
+    def test_remap_of_missing_file(self):
+        """Checks that remapping a missing file raises an error."""
+        foo_bytes = b'\x12\xab\x34'
+        remap_paths = {
+            Path('foo.bin'): 'main',
+            Path('bar.bin'): 'backup',
+        }
+        with tempfile.TemporaryDirectory() as tempdir_name:
+            temp_root = Path(tempdir_name)
+            (temp_root / 'foo.bin').write_bytes(foo_bytes)
+            with self.assertRaises(FileNotFoundError):
+                update_bundle.gen_unsigned_update_bundle(
+                    temp_root, remap_paths=remap_paths)
+
+
+class ParseRemapArgTest(unittest.TestCase):
+    """Test the parsing of remap argument strings."""
+    def test_valid_arg(self):
+        """Checks that valid remap strings are parsed correctly."""
+        original_path, new_target_file_name = update_bundle.parse_remap_arg(
+            'foo.bin > main')
+        self.assertEqual(Path('foo.bin'), original_path)
+        self.assertEqual('main', new_target_file_name)
+
+    def test_invalid_arg_raises(self):
+        """Checks that invalid remap string raise an error."""
+        with self.assertRaises(ValueError):
+            update_bundle.parse_remap_arg('foo.bin main')
+
+
+if __name__ == '__main__':
+    unittest.main()