Implement download targets

This adds code to build a downloadable target's URL, download its
content, and write it to a local file.

Change-Id: Ie2c9238278ff735cc2f35c2ad08c10595fc3694f
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/qg/+/126224
Reviewed-by: Erik Gilling <konkers@google.com>
Commit-Queue: Alexei Frolov <frolv@google.com>
diff --git a/examples/simple_project/qg.toml b/examples/simple_project/qg.toml
index 7326390..2a59c73 100644
--- a/examples/simple_project/qg.toml
+++ b/examples/simple_project/qg.toml
@@ -20,6 +20,11 @@
 digest = "sha256:9f7ab348bfb8dbac3f3d70ce591f6fa6992524b0f98d756bc1c3820cd113926d"
 
 [[targets.cipd.variants]]
+match = { os = "macos", arch = "arm64" }
+url_parameters = { platform = "mac-arm64" }
+digest = "sha256:b93a7e45af7b5b062b4b3b97b3e560c45a4a451e9b479cf40773722ecc97000a"
+
+[[targets.cipd.variants]]
 match = { os = "windows", arch = "x64" }
 url_parameters = { platform = "windows-amd64" }
 digest = "sha256:c98d59b02a9251b24f4466a2ce61a870df0416482bbe2c5011abdbde7c96c4dc"
diff --git a/qg/Cargo.toml b/qg/Cargo.toml
index def40d8..f242e21 100644
--- a/qg/Cargo.toml
+++ b/qg/Cargo.toml
@@ -20,6 +20,8 @@
 tokio = { version = "1.21.2", features = ["full"] }
 toml = "0.5.9"
 nom = "7.1.2"
+reqwest = "0.11.13"
+sha2 = "0.10.6"
 
 [dependencies.rustpython]
 git = "https://github.com/RustPython/RustPython"
diff --git a/qg/src/digest.rs b/qg/src/digest.rs
index abaa34d..bc0b1c2 100644
--- a/qg/src/digest.rs
+++ b/qg/src/digest.rs
@@ -14,6 +14,8 @@
 
 use std::str::FromStr;
 
+use sha2::Digest as Sha2Digest;
+
 use crate::Error;
 
 #[derive(Debug, PartialEq)]
@@ -40,12 +42,63 @@
         }
 
         match *parts.first().expect("guaranteed to have two elements") {
-            "sha256" => Ok(Self::Sha256(parts[1].to_string())),
+            "sha256" => {
+                let hash = parts[1];
+                if !hash.chars().all(|c| c.is_ascii_hexdigit()) {
+                    // TODO(frolv): Invalid sha256 digest.
+                    return Err(Error::GenericErrorPlaceholder);
+                }
+                Ok(Self::Sha256(hash.to_string()))
+            }
             _ => Err(Error::GenericErrorPlaceholder),
         }
     }
 }
 
+impl ExpectedDigest {
+    pub fn verifier(&self) -> Verifier {
+        match self {
+            Self::Ignore => Verifier::Ignore,
+            Self::Sha256(s) => Verifier::Sha256 {
+                checksum: sha2::Sha256::new(),
+                expected: s.as_str(),
+            },
+        }
+    }
+}
+
+pub enum Verifier<'a> {
+    Ignore,
+
+    Sha256 {
+        checksum: sha2::Sha256,
+        expected: &'a str,
+    },
+}
+
+impl<'a> Verifier<'a> {
+    pub fn update(&mut self, bytes: &[u8]) {
+        match self {
+            Self::Ignore => (),
+            Self::Sha256 { checksum, .. } => checksum.update(bytes),
+        }
+    }
+
+    pub fn reset(&mut self) {
+        match self {
+            Self::Ignore => (),
+            Self::Sha256 { checksum, .. } => checksum.reset(),
+        }
+    }
+
+    pub fn verify(self) -> bool {
+        match self {
+            Self::Ignore => true,
+            Self::Sha256 { checksum, expected } => format!("{:x}", checksum.finalize()) == expected,
+        }
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
@@ -53,8 +106,10 @@
     #[test]
     fn test_parse_valid() {
         assert_eq!(
-            "sha256:abcdef".parse::<ExpectedDigest>().unwrap(),
-            ExpectedDigest::Sha256("abcdef".into())
+            "sha256:0123456789abcdefABCDEF"
+                .parse::<ExpectedDigest>()
+                .unwrap(),
+            ExpectedDigest::Sha256("0123456789abcdefABCDEF".into())
         );
         assert_eq!(
             "ignore".parse::<ExpectedDigest>().unwrap(),
@@ -92,5 +147,11 @@
             "sha256:abc:def".parse::<ExpectedDigest>().unwrap_err(),
             Error::GenericErrorPlaceholder,
         ));
+        assert!(matches!(
+            "sha256:0123456789abcdefABCDEFg"
+                .parse::<ExpectedDigest>()
+                .unwrap_err(),
+            Error::GenericErrorPlaceholder,
+        ));
     }
 }
diff --git a/qg/src/download.rs b/qg/src/download.rs
index b735785..a1cb0e9 100644
--- a/qg/src/download.rs
+++ b/qg/src/download.rs
@@ -12,7 +12,25 @@
 // License for the specific language governing permissions and limitations under
 // the License.
 
-use std::path::PathBuf;
+use std::{
+    collections::HashMap,
+    path::{Path, PathBuf},
+};
+
+use tokio::{
+    fs::{self, File},
+    io::{AsyncSeekExt, AsyncWriteExt},
+    sync::mpsc,
+};
+
+use crate::{
+    digest::ExpectedDigest,
+    executor::{ExecutionContext, ExecutionStatus, ExecutionStatusMsg},
+    target::{Download, DownloadVariant},
+    Error, Result,
+};
+
+const MAX_DOWNLOAD_ATTEMPTS: u32 = 5;
 
 /// The file format of a download target.
 #[derive(Debug)]
@@ -21,3 +39,216 @@
     /// should be downloaded.
     Binary(PathBuf),
 }
+
+/// Returns the URL of a download target, filling in any parameter substitutions.
+fn download_url(metadata: &Download, variant: Option<&DownloadVariant>) -> Result<String> {
+    let mut params: HashMap<&str, &str> = metadata
+        .url_parameters
+        .iter()
+        .map(|(k, v)| (k.as_str(), v.as_str()))
+        .collect();
+    if let Some(variant) = variant {
+        params.extend(
+            variant
+                .url_parameters
+                .iter()
+                .map(|(k, v)| (k.as_str(), v.as_str())),
+        );
+    }
+
+    metadata.url.substitute(&params)
+}
+
+pub(crate) struct Downloader<'a> {
+    context: &'a ExecutionContext,
+    metadata: &'a Download,
+    status_tx: &'a mpsc::UnboundedSender<ExecutionStatusMsg>,
+}
+
+enum Status {
+    Retry,
+    Failure(Error),
+}
+
+impl<'a> Downloader<'a> {
+    /// Instantiates a downloader for a target within the context of an
+    /// [`Executor`](crate::executor::Executor) run.
+    ///
+    /// # Errors
+    /// If `context.target` is not a downloadable target, returns an error.
+    pub fn new(
+        context: &'a ExecutionContext,
+        metadata: &'a Download,
+        status_tx: &'a mpsc::UnboundedSender<ExecutionStatusMsg>,
+    ) -> Self {
+        Self {
+            context,
+            metadata,
+            status_tx,
+        }
+    }
+
+    /// Run the target to completion, downloading the file to its output directory.
+    pub async fn run(&self) -> Result<()> {
+        let status = match self.run_result().await {
+            Ok(_) => ExecutionStatus::Complete,
+            Err(e) => ExecutionStatus::Failed(e.to_string()),
+        };
+
+        // TODO(frolv): Remove this and use the return value of this function.
+        self.status_tx
+            .send(ExecutionStatusMsg {
+                name: self.context.target.full_name(),
+                status,
+            })
+            .map_err(|e| Error::StringErrorPlaceholder(format!("error sending status: {e}")))
+    }
+
+    async fn run_result(&self) -> Result<()> {
+        let variant = self
+            .metadata
+            .variants
+            .iter()
+            .find(|v| v.matches.matches(&self.context.target_platform));
+
+        let url = download_url(self.metadata, variant)?;
+
+        let digest = match variant {
+            Some(v) => v.digest.as_ref().or(self.metadata.digest.as_ref()),
+            None => self.metadata.digest.as_ref(),
+        };
+        let Some(digest) = digest else {
+            return Err(Error::StringErrorPlaceholder("no digest provided".into()));
+        };
+
+        let tmpfile_path = self.temporary_file();
+        let mut tmpfile = File::create(&tmpfile_path).await?;
+
+        self.retry_download(&url, digest, &mut tmpfile).await?;
+
+        match &self.metadata.format {
+            Format::Binary(bin_name) => {
+                self.handle_binary_file(&tmpfile, &tmpfile_path, bin_name)
+                    .await
+            }
+        }
+    }
+
+    /// Attempt to download from the URL to `tmpfile` several times unless a fatal error occurs.
+    async fn retry_download(
+        &self,
+        url: &str,
+        digest: &ExpectedDigest,
+        tmpfile: &mut File,
+    ) -> Result<()> {
+        for _attempt in 0..MAX_DOWNLOAD_ATTEMPTS {
+            tmpfile.rewind().await?;
+
+            match self.run_download(url, digest, tmpfile).await {
+                Ok(()) => return Ok(()),
+                Err(Status::Retry) => continue,
+                Err(Status::Failure(e)) => return Err(e),
+            };
+        }
+
+        Err(Error::StringErrorPlaceholder(format!(
+            "failed to download after {} attempts",
+            MAX_DOWNLOAD_ATTEMPTS,
+        )))
+    }
+
+    async fn run_download(
+        &self,
+        url: &str,
+        digest: &ExpectedDigest,
+        tmpfile: &mut File,
+    ) -> std::result::Result<(), Status> {
+        let download_error_to_status = |e: reqwest::Error| {
+            // TODO(frolv): Expand this with more error cases and logging.
+            if e.is_status() {
+                Status::Failure(Error::StringErrorPlaceholder(format!(
+                    "failed to download: {e}",
+                )))
+            } else {
+                Status::Retry
+            }
+        };
+
+        let mut verifier = digest.verifier();
+
+        let mut response = reqwest::get(url).await.map_err(download_error_to_status)?;
+        let maybe_length = response.content_length();
+        let mut downloaded_size = 0u64;
+
+        // Stream the chunks of the file, updating the checksum and sending
+        // progress reports.
+        loop {
+            let chunk = match response.chunk().await {
+                Ok(Some(c)) => c,
+                Ok(None) => break,
+                Err(e) => return Err(download_error_to_status(e)),
+            };
+
+            tmpfile
+                .write_all(&chunk)
+                .await
+                .map_err(|e| Status::Failure(e.into()))?;
+            verifier.update(&chunk);
+            downloaded_size += chunk.len() as u64;
+
+            if let Some(len) = maybe_length {
+                // TODO(frolv): Create a context API for sending progress updates.
+                self.status_tx
+                    .send(ExecutionStatusMsg {
+                        name: self.context.target.full_name(),
+                        status: ExecutionStatus::InProgress {
+                            current: downloaded_size,
+                            total: len,
+                            unit: "B",
+                        },
+                    })
+                    .map_err(|e| {
+                        Status::Failure(Error::StringErrorPlaceholder(format!(
+                            "error sending status: {e}"
+                        )))
+                    })?;
+            }
+        }
+
+        if !verifier.verify() {
+            return Err(Status::Failure(Error::StringErrorPlaceholder(
+                "digest of downloaded file does not match expected".into(),
+            )));
+        }
+
+        Ok(())
+    }
+
+    /// Makes `tmpfile` executable and renames it to its final binary path.
+    async fn handle_binary_file(
+        &self,
+        tmpfile: &File,
+        tmpfile_path: &Path,
+        bin_name: &Path,
+    ) -> Result<()> {
+        // TODO(frolv): Handle non-UNIX platforms.
+        if cfg!(unix) {
+            use std::os::unix::prelude::PermissionsExt;
+            let mut permissions = tmpfile.metadata().await?.permissions();
+            permissions.set_mode(0o755);
+            tmpfile.set_permissions(permissions).await?;
+        }
+
+        let download_path = self.context.output_dir.join(bin_name);
+        fs::rename(&tmpfile_path, &download_path).await?;
+        Ok(())
+    }
+
+    /// Returns the path to a temporary file to which downloaded content can be
+    /// written.
+    fn temporary_file(&self) -> PathBuf {
+        match &self.metadata.format {
+            Format::Binary(bin_name) => self.context.work_dir.join(bin_name),
+        }
+    }
+}
diff --git a/qg/src/executor.rs b/qg/src/executor.rs
index 0603483..0efc07d 100644
--- a/qg/src/executor.rs
+++ b/qg/src/executor.rs
@@ -21,10 +21,16 @@
 use futures::future::join_all;
 use tokio::{sync::mpsc, task::JoinHandle};
 
-use crate::{registry::Dependency, Error, Project, Result, Target};
+use crate::{
+    download::Downloader,
+    platform::Platform,
+    registry::Dependency,
+    target::{Download, Metadata},
+    Error, Project, Result, Target,
+};
 
 #[cfg(test)]
-use crate::target::{Fake, Metadata};
+use crate::target::Fake;
 
 #[derive(Debug, Eq, PartialEq)]
 pub enum ExecutionStatus {
@@ -47,11 +53,10 @@
 
 #[derive(Debug)]
 pub struct ExecutionContext {
-    target: Arc<Target>,
-    #[allow(unused)]
-    output_dir: PathBuf,
-    #[allow(unused)]
-    work_dir: PathBuf,
+    pub target: Arc<Target>,
+    pub target_platform: Platform,
+    pub output_dir: PathBuf,
+    pub work_dir: PathBuf,
 }
 
 pub(crate) struct Executor<'a> {
@@ -206,7 +211,9 @@
                 // TODO(konkers): Instead of returning errors immediately, errors should
                 // be aggregated and returned once all running tasks have completed.
                 ExecutionStatus::Failed(error_str) => {
-                    return Err(Error::StringErrorPlaceholder(error_str));
+                    return Err(Error::StringErrorPlaceholder(format!(
+                        "Error when executing target: {error_str}",
+                    )));
                 }
 
                 // Ignore InProgress events for now.
@@ -240,6 +247,8 @@
         self.dispatch_tx
             .send(ExecutionContext {
                 target,
+                // TODO(frolv): Allow setting platforms dynamically.
+                target_platform: Platform::current(),
                 // TODO(konkers): Add canonical location for `output_dir` and `work_dir`.
                 output_dir: "".into(),
                 work_dir: "".into(),
@@ -282,6 +291,15 @@
         fake::run(target, metadata, &self.status_tx).await
     }
 
+    async fn handle_download_target(
+        &self,
+        context: &ExecutionContext,
+        metadata: &Download,
+    ) -> Result<()> {
+        let downloader = Downloader::new(context, metadata, &self.status_tx);
+        downloader.run().await
+    }
+
     async fn run(self) {
         loop {
             let Ok(context) = self.dispatch_rx.recv().await else {
@@ -292,7 +310,11 @@
             let res = match context.target.metadata() {
                 #[cfg(test)]
                 Metadata::Fake(fake) => self.handle_fake_target(&context.target, fake).await,
-                _ => self.send_status(
+
+                Metadata::Download(download) => {
+                    self.handle_download_target(&context, download).await
+                }
+                Metadata::Cipd(_) | Metadata::DepOnly => self.send_status(
                     &context.target,
                     ExecutionStatus::Failed("Unsupported target type".into()),
                 ),
diff --git a/qg/src/platform.rs b/qg/src/platform.rs
index 6925311..c5bb56d 100644
--- a/qg/src/platform.rs
+++ b/qg/src/platform.rs
@@ -17,6 +17,60 @@
 use crate::Error;
 
 #[derive(Debug)]
+pub struct Platform {
+    pub system: System,
+    pub architecture: Architecture,
+}
+
+impl Platform {
+    #[must_use]
+    pub(crate) fn current() -> Self {
+        Self::new(Self::target_system(), Self::target_architecture())
+    }
+
+    #[must_use]
+    pub fn new(system: System, architecture: Architecture) -> Self {
+        Platform {
+            system,
+            architecture,
+        }
+    }
+
+    #[must_use]
+    #[cfg(target_os = "linux")]
+    fn target_system() -> System {
+        System::Linux
+    }
+
+    #[must_use]
+    #[cfg(target_os = "macos")]
+    fn target_system() -> System {
+        System::MacOs
+    }
+
+    #[must_use]
+    #[cfg(target_os = "windows")]
+    fn target_system() -> System {
+        System::Windows
+    }
+
+    #[must_use]
+    #[cfg(target_arch = "x86_64")]
+    fn target_architecture() -> Architecture {
+        Architecture::X64
+    }
+
+    #[must_use]
+    #[cfg(any(
+        all(target_arch = "arm", target_pointer_width = "64"),
+        target_arch = "aarch64"
+    ))]
+    fn target_architecture() -> Architecture {
+        Architecture::Arm64
+    }
+}
+
+#[derive(Clone, Copy, Debug, PartialEq, Eq)]
 pub enum System {
     Linux,
     MacOs,
@@ -36,7 +90,7 @@
     }
 }
 
-#[derive(Debug)]
+#[derive(Clone, Copy, Debug, PartialEq, Eq)]
 pub enum Architecture {
     X64,
     Arm64,
diff --git a/qg/src/registry.rs b/qg/src/registry.rs
index 50dd38c..9942547 100644
--- a/qg/src/registry.rs
+++ b/qg/src/registry.rs
@@ -316,7 +316,6 @@
     ///
     /// # Errors
     /// Returns an error if `target_name` does not name a valid target.
-    #[allow(unused)]
     pub fn transitive_dependencies(&self, target_name: &str) -> Result<Dependencies> {
         let node_id = self.get_node_id(target_name)?;
         let dfs = Dfs::new(&self.dependency_graph, node_id);
diff --git a/qg/src/target.rs b/qg/src/target.rs
index 28619b1..c4bd141 100644
--- a/qg/src/target.rs
+++ b/qg/src/target.rs
@@ -16,9 +16,10 @@
 use std::path::{Path, PathBuf};
 
 use crate::digest::ExpectedDigest;
+use crate::platform::{self, Platform};
 use crate::project::manifest;
 use crate::util::StringSub;
-use crate::{download, platform, Error, Result};
+use crate::{download, Error, Result};
 
 /// A source of targets.
 #[derive(Debug)]
@@ -110,7 +111,7 @@
 #[derive(Debug)]
 pub struct DownloadVariant {
     pub matches: VariantMatch,
-    url_parameters: HashMap<String, String>,
+    pub url_parameters: HashMap<String, String>,
     pub digest: Option<ExpectedDigest>,
 }
 
@@ -120,6 +121,25 @@
     arch: Option<platform::Architecture>,
 }
 
+impl VariantMatch {
+    #[must_use]
+    pub fn matches(&self, platform: &Platform) -> bool {
+        if let Some(sys) = &self.system {
+            if platform.system != *sys {
+                return false;
+            }
+        }
+
+        if let Some(arch) = &self.arch {
+            if platform.architecture != *arch {
+                return false;
+            }
+        }
+
+        true
+    }
+}
+
 impl TryFrom<manifest::DownloadablePackage> for Download {
     type Error = Error;
 
@@ -313,6 +333,8 @@
 
 #[cfg(test)]
 mod tests {
+    use crate::platform::{Architecture, System};
+
     use super::*;
 
     #[test]
@@ -482,4 +504,52 @@
 
         assert_eq!(fake.duration_ticks, 42);
     }
+
+    #[test]
+    fn variant_matches() {
+        assert!(VariantMatch {
+            system: Some(System::Linux),
+            arch: Some(Architecture::X64),
+        }
+        .matches(&Platform::new(System::Linux, Architecture::X64)));
+
+        assert!(VariantMatch {
+            system: None,
+            arch: Some(Architecture::X64),
+        }
+        .matches(&Platform::new(System::Linux, Architecture::X64)));
+
+        assert!(VariantMatch {
+            system: Some(System::Linux),
+            arch: None,
+        }
+        .matches(&Platform::new(System::Linux, Architecture::X64)));
+
+        assert!(VariantMatch {
+            system: None,
+            arch: None,
+        }
+        .matches(&Platform::new(System::Linux, Architecture::X64)));
+    }
+
+    #[test]
+    fn variant_doesnt_match() {
+        assert!(!VariantMatch {
+            system: Some(System::MacOs),
+            arch: Some(Architecture::X64),
+        }
+        .matches(&Platform::new(System::Linux, Architecture::X64)));
+
+        assert!(!VariantMatch {
+            system: Some(System::MacOs),
+            arch: None,
+        }
+        .matches(&Platform::new(System::Linux, Architecture::X64)));
+
+        assert!(!VariantMatch {
+            system: None,
+            arch: Some(Architecture::Arm64),
+        }
+        .matches(&Platform::new(System::Linux, Architecture::X64)));
+    }
 }
diff --git a/qg/src/util.rs b/qg/src/util.rs
index 549363c..af907fb 100644
--- a/qg/src/util.rs
+++ b/qg/src/util.rs
@@ -115,13 +115,17 @@
     }
 
     pub fn substitute<'a>(&self, vars: &HashMap<&'a str, &'a str>) -> Result<String> {
+        self.substitute_cb(|var| vars.get(var).copied())
+    }
+
+    pub fn substitute_cb<'a>(&self, get_var: impl Fn(&str) -> Option<&'a str>) -> Result<String> {
         let mut s = String::new();
 
         for frag in &self.fragments {
             match frag {
                 StringFragment::Literal(lit) => s.push_str(lit),
                 StringFragment::Variable(var) => {
-                    let Some(&value) = vars.get(var.as_str()) else {
+                    let Some(value) = get_var(var.as_str()) else {
                         // TODO(frolv): no value provided for `var`.
                         return Err(Error::GenericErrorPlaceholder);
                     };