| # Copyright 2021 The Pigweed Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| # use this file except in compliance with the License. You may obtain a copy of |
| # the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations under |
| # the License. |
| """Upload update bundles to a GCS bucket for signing. |
| |
| This module does not implement any actual signing logic; that is left up to the |
| remote signing service. This module takes care of uploading bundles to GCS for |
| signing, waiting the the signed version to appear, and downloading the signed |
| bundle from the output bucket. It can be used either as an entry point by |
| invoking it as a runnable module and providing all the necessary arguments (run |
| the tool with --help for details), or as a library by instantiating |
| RemoteSignClient and calling its sign() method. |
| |
| The expected API for the remote signing service consists of the following: |
| |
| - A pair of GCS buckets. One bucket to serve as a queue of update bundles |
| to be signed, and the other bucket to serve as the output area where |
| signed bundles are deposited. |
| |
| - Three artifacts should be placed into the input queue bucket: |
| 1. The update bundle to be signed |
| 2. A signing request file whose name ends with "signing_request.json" |
| 3. A public builder key named "<signing_request_name>.public_key.pem" |
| |
| Builder keys are used to generate intermediate signatures for the signing |
| request. Specifically, a private builder key is used to generate an |
| intermediate signature for both the update bundle to be signed, and the signing |
| request file. These signatures are then added to the GCS blob metadata for |
| their respective blobs. The corresponding public builder key is uploaded |
| alongside the signing request. |
| |
| The signing service should be set up to trigger whenever a new signing request |
| file is added anywhere inside the input queue bucket. The signing request file |
| is a JSON file with the following fields: |
| |
| remote_signing_key_name: A string that should correspond to the name of a |
| signing key known to the remote signing service. |
| |
| bundle_path: The path (relative to GCS input bucket root) to an update |
| bundle to be signed by the remote signing service. |
| |
| bundle_public_key_path: The path (relative to GCS input bucket root) to a |
| public builder key .pem file corresponding to the private builder key |
| that was used to sign the update bundle and signing request file. |
| |
| output_bucket: Name of the output GCS bucket into which the remote signing |
| service should place signed artifacts. |
| |
| output_path: The path (relative to the GCS output bucket root) at which the |
| signed update bundle should be deposited by the remote signing service. |
| |
| On the remote side, the signing service is expected to check the public builder |
| key against its list of allowed builder keys. Provided the key is found in the |
| allow list, the signing service should use it to verify the intermediate |
| signatures of both the update bundle to be signed and the signing request file. |
| If the key is not found in the allow list, or if the signature on the update |
| bundle or signing request file do not match the public builder key, the signing |
| service should reject the signing request. |
| |
| If the builder public key is found in the allow list and the intermediate |
| signatures are verified, the signing service should produce a signed version of |
| the update bundle, and place it the GCS output bucket at the specified path. |
| |
| In order to authenticate to GCS, the "Application Default Credentials" will be |
| used. This assumes remote_sign.py is running in an environment that provides |
| such credentials. See the Google cloud platform documentation for details: |
| |
| https://cloud.google.com/docs/authentication/production |
| |
| For development purposes, it is possible to provide an alternate method of |
| authenticating to GCS. The alternate authentication method should be a Python |
| module that is importable in the running Python environment. The module should |
| define a 'get_credentials()' function that takes no arguments and returns an |
| instance of google.auth.credentials.Credentials. |
| """ |
| import argparse |
| import base64 |
| import importlib |
| import json |
| from pathlib import Path |
| import time |
| from typing import Dict, Union |
| |
| from cryptography.hazmat import backends |
| from cryptography.hazmat.primitives import hashes, serialization |
| from cryptography.hazmat.primitives.asymmetric import ed25519, padding, rsa |
| from google.auth.credentials import Credentials # type: ignore |
| from google.cloud import storage # type: ignore |
| from google.cloud.storage.bucket import Bucket # type: ignore |
| |
| DEFAULT_TIMEOUT_S = 600 |
| |
| PathOrBytes = Union[Path, bytes] |
| |
| |
| def _parse_args(): |
| """Parse CLI aguments.""" |
| parser = argparse.ArgumentParser(description=__doc__) |
| parser.add_argument('--project', |
| help='GCP project that owns storage buckets.') |
| parser.add_argument('--input-bucket', |
| help='GCS bucket used as a signing queue') |
| parser.add_argument('--output-bucket', |
| help='GCS bucket to watch for signed bundles') |
| parser.add_argument('--bundle', |
| type=Path, |
| help='Update bundle to upload for signing') |
| parser.add_argument('--out', |
| type=Path, |
| help='Path to which to download signed bundle') |
| parser.add_argument( |
| '--signing-key-name', |
| help='Name of signing key remote signing service should use') |
| parser.add_argument( |
| '--builder-key', |
| type=Path, |
| help='Path to builder private key for intermediate signatures') |
| parser.add_argument('--builder-public-key', |
| type=Path, |
| help='Path to builder public key') |
| parser.add_argument( |
| '--bundle-blob-name', |
| default=None, |
| help='Path in the input bucket at which to upload bundle') |
| parser.add_argument( |
| '--request-blob-name', |
| default=None, |
| help='Path in the input bucket at which to put signing request') |
| parser.add_argument('--signed-bundle-blob-name', |
| default=None, |
| help='Path in the output bucket for the signed bundle') |
| parser.add_argument( |
| '--dev-gcs-auth-module-override', |
| default=None, |
| help='Developer use only; custom auth module to use with GCS.') |
| parser.add_argument( |
| '--timeout', |
| type=int, |
| default=DEFAULT_TIMEOUT_S, |
| help='Seconds to wait for signed bundle to appeaer before giving up.') |
| |
| return parser.parse_args() |
| |
| |
| class BlobExistsError(Exception): |
| """Raised if the blob to be uploaded already exists in the input bucket.""" |
| |
| |
| class RemoteSignClient(): |
| """GCS client for use in remote signing.""" |
| def __init__(self, input_bucket: Bucket, output_bucket: Bucket): |
| # "Application Default Credentials" are used implicitly when None is |
| # passed to Client() as credentials. See the cloud docs for details: |
| # https://cloud.google.com/docs/authentication/production |
| self._input_bucket = input_bucket |
| self._output_bucket = output_bucket |
| |
| @classmethod |
| def from_names(cls, |
| project_name: str, |
| input_bucket_name: str, |
| output_bucket_name: str, |
| gcs_credentials: Credentials = None): |
| storage_client = storage.Client(project=project_name, |
| credentials=gcs_credentials) |
| return cls(input_bucket=storage_client.bucket(input_bucket_name), |
| output_bucket=storage_client.bucket(output_bucket_name)) |
| |
| def sign(self, |
| bundle: Path, |
| signing_key_name: str, |
| builder_key: Path, |
| builder_public_key: Path, |
| bundle_blob_name: str = None, |
| request_blob_name: str = None, |
| signed_bundle_blob_name: str = None, |
| request_overrides: Dict = None, |
| timeout_s: int = DEFAULT_TIMEOUT_S) -> bytes: |
| """Upload file to GCS and download signed counterpart when ready. |
| |
| Args: |
| bundle: Path object for an UpdateBundle to upload for signing. |
| signing_key_name: Name of remote signing key to use for signing. |
| builder_key: Path to builder private key for intermediate signature. |
| builder_public_key: Path to corresponding builder public key. |
| bundle_blob_name: GCS path at which to upload bundle to sign. |
| request_blob_name: GCS path at which to upload request file. |
| signed_bundle_blob_name: GCS path in output bucket to request. |
| request_overrides: Dict of signing request JSON keys and values to |
| add to the signing requests. If this dict contains any keys whose |
| values are already in the signing request, the existing values |
| will be overwritten by the ones passed in here. |
| timeout_s: Maximum seconds to wait for output before failing. |
| """ |
| if bundle_blob_name is None: |
| bundle_blob_name = bundle.name |
| |
| if request_blob_name is None: |
| request_blob_name = f't{time.time()}_signing_request.json' |
| |
| if not request_blob_name.endswith('signing_request.json'): |
| raise ValueError(f'Signing request blob name {request_blob_name}' |
| ' does not end with "signing_request.json".') |
| |
| request_name = request_blob_name[:-5] # strip the ".json" |
| builder_public_key_blob_name = f'{request_name}.publickey.pem' |
| |
| if signed_bundle_blob_name is None: |
| signed_bundle_blob_name = f'{bundle.name}.signed' |
| |
| signing_request = { |
| 'remote_signing_key_names': [signing_key_name], |
| 'bundle_path': bundle_blob_name, |
| 'bundle_public_key_path': builder_public_key_blob_name, |
| 'output_bucket': self._output_bucket.name, |
| 'output_path': signed_bundle_blob_name, |
| } |
| |
| if request_overrides is not None: |
| signing_request.update(request_overrides) |
| |
| builder_public_key_blob = self._input_bucket.blob( |
| builder_public_key_blob_name) |
| bundle_blob = self._input_bucket.blob(bundle_blob_name) |
| request_blob = self._input_bucket.blob(request_blob_name) |
| |
| for blob in (builder_public_key_blob, bundle_blob, request_blob): |
| if blob.exists(): |
| raise BlobExistsError( |
| f'A blob named "{blob}" already exists in the input bucket.' |
| ' A unique blob name is required for uploading.') |
| |
| builder_public_key_blob.upload_from_filename(str(builder_public_key)) |
| |
| bundle_blob.metadata = { |
| 'signature': |
| self._get_builder_signature(bundle, builder_key).decode('ascii') |
| } |
| bundle_blob.upload_from_filename(str(bundle)) |
| |
| encoded_json = bytes(json.dumps(signing_request), 'utf-8') |
| request_blob.metadata = { |
| 'signature': |
| self._get_builder_signature(encoded_json, |
| builder_key).decode('ascii') |
| } |
| |
| # Despite its name, the upload_from_string() method can take either a |
| # str or bytes object; here we already pre-encoded the string in utf-8. |
| request_blob.upload_from_string(encoded_json) |
| |
| return self._wait_for_blob(signed_bundle_blob_name, |
| timeout_s=timeout_s) |
| |
| def _wait_for_blob(self, |
| blob_name, |
| interval: int = 1, |
| max_tries: int = None, |
| timeout_s: int = DEFAULT_TIMEOUT_S) -> storage.Blob: |
| """Wait for a specific blob to appear in the output bucket. |
| |
| Args: |
| blob_name: Name of the blob to wait for. |
| interval: Time (seconds) to wait between checks for blob's existence. |
| max_tries: Number of times to check for the blob before failing. |
| timeout_s: Maximum seconds to keep watching before failing. |
| """ |
| blob = self._output_bucket.blob(blob_name) |
| end_time = time.time() + timeout_s |
| tries = 0 |
| while (max_tries is None or tries < max_tries): |
| if time.time() > end_time: |
| raise FileNotFoundError( |
| 'Timed out while waiting for signed blob.') |
| if blob.exists(): |
| return blob |
| tries += 1 |
| time.sleep(interval) |
| |
| raise FileNotFoundError( |
| 'Too many retries while waiting for signed blob.') |
| |
| @staticmethod |
| def _get_builder_signature(data: PathOrBytes, key: Path) -> bytes: |
| """Generate a base64-encided builder signature for file. |
| |
| In order for the remote signing system to have some level of trust in |
| the artifacts it's signing, an intermediate signature is used to verify |
| that the artifacts came from an approved builder. |
| """ |
| if isinstance(data, Path): |
| data = data.read_bytes() |
| |
| private_key = serialization.load_pem_private_key( |
| key.read_bytes(), None, backends.default_backend()) |
| |
| if isinstance(private_key, ed25519.Ed25519PrivateKey): |
| signature = private_key.sign(data) |
| |
| elif isinstance(private_key, rsa.RSAPrivateKey): |
| signature = private_key.sign( |
| data, # type: ignore |
| padding=padding.PSS(mgf=padding.MGF1(hashes.SHA256()), |
| salt_length=padding.PSS.MAX_LENGTH), |
| algorithm=hashes.SHA256()) |
| |
| else: |
| raise TypeError(f'Key {private_key} has unsupported type' |
| f' ({type(private_key)}). Valid types are' |
| ' Ed25519PrivateKey and RSAPrivateKey.') |
| |
| return base64.b64encode(signature) |
| |
| |
| def _credentials_from_module(module_name: str) -> Credentials: |
| """Return GCS Credential from the named auth module. |
| |
| The module name should correspond to a module that's importable in the |
| running Python environment. It must define a get_credentials() function |
| that takes no args and returns a Credentials instance. |
| """ |
| auth_module = importlib.import_module(module_name) |
| return auth_module.get_credentials() # type: ignore |
| |
| |
| def main( # pylint: disable=too-many-arguments |
| project: str, input_bucket: str, output_bucket: str, bundle: Path, |
| out: Path, signing_key_name: str, builder_key: Path, |
| builder_public_key: Path, bundle_blob_name: str, |
| request_blob_name: str, signed_bundle_blob_name: str, |
| dev_gcs_auth_module_override: str, timeout: int) -> None: |
| """Send bundle for remote signing and write signed counterpart to disk. |
| |
| Args: |
| project: Project name for GCS project containing signing bucket pair. |
| input_bucket: Name of GCS bucket to deposit to-be-signed artifacts in. |
| output_bucket: Name of GCS bucket to watch for signed artifacts. |
| bundle: Update bundle to be signed. |
| out: Path to which to download signed version of bundle. |
| signing_key_name: Name of key the remote signing service should use. |
| bundle_blob_name: Path in input bucket to upload bundle to. |
| request_blob_name: Path in input bucket to upload signing request to. |
| signed_bundle_blob_name: Output bucket path for signed bundle. |
| dev_gcs_auth_module_override: For developer use only; optional module |
| to use to generate GCS client credentials. Must be importable in |
| the running Python environment, and must define a get_credentials() |
| function that takes no args and returns a Credentials instance. |
| timeout: Seconds to wait for signed bundle before giving up. |
| """ |
| credentials = None |
| if dev_gcs_auth_module_override is not None: |
| credentials = _credentials_from_module(dev_gcs_auth_module_override) |
| |
| remote_sign_client = RemoteSignClient.from_names( |
| project_name=project, |
| input_bucket_name=input_bucket, |
| output_bucket_name=output_bucket, |
| gcs_credentials=credentials) |
| |
| signed_bundle = remote_sign_client.sign(bundle, |
| signing_key_name, |
| builder_key, |
| builder_public_key, |
| bundle_blob_name, |
| request_blob_name, |
| signed_bundle_blob_name, |
| timeout_s=timeout) |
| |
| out.write_bytes(signed_bundle.download_as_bytes()) |
| |
| |
| if __name__ == '__main__': |
| main(**vars(_parse_args())) |