Implement download targets
This adds code to build a downloadable target's URL, download its
content, and write it to a local file.
Change-Id: Ie2c9238278ff735cc2f35c2ad08c10595fc3694f
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/qg/+/126224
Reviewed-by: Erik Gilling <konkers@google.com>
Commit-Queue: Alexei Frolov <frolv@google.com>
diff --git a/examples/simple_project/qg.toml b/examples/simple_project/qg.toml
index 7326390..2a59c73 100644
--- a/examples/simple_project/qg.toml
+++ b/examples/simple_project/qg.toml
@@ -20,6 +20,11 @@
digest = "sha256:9f7ab348bfb8dbac3f3d70ce591f6fa6992524b0f98d756bc1c3820cd113926d"
[[targets.cipd.variants]]
+match = { os = "macos", arch = "arm64" }
+url_parameters = { platform = "mac-arm64" }
+digest = "sha256:b93a7e45af7b5b062b4b3b97b3e560c45a4a451e9b479cf40773722ecc97000a"
+
+[[targets.cipd.variants]]
match = { os = "windows", arch = "x64" }
url_parameters = { platform = "windows-amd64" }
digest = "sha256:c98d59b02a9251b24f4466a2ce61a870df0416482bbe2c5011abdbde7c96c4dc"
diff --git a/qg/Cargo.toml b/qg/Cargo.toml
index def40d8..f242e21 100644
--- a/qg/Cargo.toml
+++ b/qg/Cargo.toml
@@ -20,6 +20,8 @@
tokio = { version = "1.21.2", features = ["full"] }
toml = "0.5.9"
nom = "7.1.2"
+reqwest = "0.11.13"
+sha2 = "0.10.6"
[dependencies.rustpython]
git = "https://github.com/RustPython/RustPython"
diff --git a/qg/src/digest.rs b/qg/src/digest.rs
index abaa34d..bc0b1c2 100644
--- a/qg/src/digest.rs
+++ b/qg/src/digest.rs
@@ -14,6 +14,8 @@
use std::str::FromStr;
+use sha2::Digest as Sha2Digest;
+
use crate::Error;
#[derive(Debug, PartialEq)]
@@ -40,12 +42,63 @@
}
match *parts.first().expect("guaranteed to have two elements") {
- "sha256" => Ok(Self::Sha256(parts[1].to_string())),
+ "sha256" => {
+ let hash = parts[1];
+ if !hash.chars().all(|c| c.is_ascii_hexdigit()) {
+ // TODO(frolv): Invalid sha256 digest.
+ return Err(Error::GenericErrorPlaceholder);
+ }
+ Ok(Self::Sha256(hash.to_string()))
+ }
_ => Err(Error::GenericErrorPlaceholder),
}
}
}
+impl ExpectedDigest {
+ pub fn verifier(&self) -> Verifier {
+ match self {
+ Self::Ignore => Verifier::Ignore,
+ Self::Sha256(s) => Verifier::Sha256 {
+ checksum: sha2::Sha256::new(),
+ expected: s.as_str(),
+ },
+ }
+ }
+}
+
+pub enum Verifier<'a> {
+ Ignore,
+
+ Sha256 {
+ checksum: sha2::Sha256,
+ expected: &'a str,
+ },
+}
+
+impl<'a> Verifier<'a> {
+ pub fn update(&mut self, bytes: &[u8]) {
+ match self {
+ Self::Ignore => (),
+ Self::Sha256 { checksum, .. } => checksum.update(bytes),
+ }
+ }
+
+ pub fn reset(&mut self) {
+ match self {
+ Self::Ignore => (),
+ Self::Sha256 { checksum, .. } => checksum.reset(),
+ }
+ }
+
+ pub fn verify(self) -> bool {
+ match self {
+ Self::Ignore => true,
+ Self::Sha256 { checksum, expected } => format!("{:x}", checksum.finalize()) == expected,
+ }
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;
@@ -53,8 +106,10 @@
#[test]
fn test_parse_valid() {
assert_eq!(
- "sha256:abcdef".parse::<ExpectedDigest>().unwrap(),
- ExpectedDigest::Sha256("abcdef".into())
+ "sha256:0123456789abcdefABCDEF"
+ .parse::<ExpectedDigest>()
+ .unwrap(),
+ ExpectedDigest::Sha256("0123456789abcdefABCDEF".into())
);
assert_eq!(
"ignore".parse::<ExpectedDigest>().unwrap(),
@@ -92,5 +147,11 @@
"sha256:abc:def".parse::<ExpectedDigest>().unwrap_err(),
Error::GenericErrorPlaceholder,
));
+ assert!(matches!(
+ "sha256:0123456789abcdefABCDEFg"
+ .parse::<ExpectedDigest>()
+ .unwrap_err(),
+ Error::GenericErrorPlaceholder,
+ ));
}
}
diff --git a/qg/src/download.rs b/qg/src/download.rs
index b735785..a1cb0e9 100644
--- a/qg/src/download.rs
+++ b/qg/src/download.rs
@@ -12,7 +12,25 @@
// License for the specific language governing permissions and limitations under
// the License.
-use std::path::PathBuf;
+use std::{
+ collections::HashMap,
+ path::{Path, PathBuf},
+};
+
+use tokio::{
+ fs::{self, File},
+ io::{AsyncSeekExt, AsyncWriteExt},
+ sync::mpsc,
+};
+
+use crate::{
+ digest::ExpectedDigest,
+ executor::{ExecutionContext, ExecutionStatus, ExecutionStatusMsg},
+ target::{Download, DownloadVariant},
+ Error, Result,
+};
+
+const MAX_DOWNLOAD_ATTEMPTS: u32 = 5;
/// The file format of a download target.
#[derive(Debug)]
@@ -21,3 +39,216 @@
/// should be downloaded.
Binary(PathBuf),
}
+
+/// Returns the URL of a download target, filling in any parameter substitutions.
+fn download_url(metadata: &Download, variant: Option<&DownloadVariant>) -> Result<String> {
+ let mut params: HashMap<&str, &str> = metadata
+ .url_parameters
+ .iter()
+ .map(|(k, v)| (k.as_str(), v.as_str()))
+ .collect();
+ if let Some(variant) = variant {
+ params.extend(
+ variant
+ .url_parameters
+ .iter()
+ .map(|(k, v)| (k.as_str(), v.as_str())),
+ );
+ }
+
+ metadata.url.substitute(¶ms)
+}
+
+pub(crate) struct Downloader<'a> {
+ context: &'a ExecutionContext,
+ metadata: &'a Download,
+ status_tx: &'a mpsc::UnboundedSender<ExecutionStatusMsg>,
+}
+
+enum Status {
+ Retry,
+ Failure(Error),
+}
+
+impl<'a> Downloader<'a> {
+ /// Instantiates a downloader for a target within the context of an
+ /// [`Executor`](crate::executor::Executor) run.
+ ///
+ /// # Errors
+ /// If `context.target` is not a downloadable target, returns an error.
+ pub fn new(
+ context: &'a ExecutionContext,
+ metadata: &'a Download,
+ status_tx: &'a mpsc::UnboundedSender<ExecutionStatusMsg>,
+ ) -> Self {
+ Self {
+ context,
+ metadata,
+ status_tx,
+ }
+ }
+
+ /// Run the target to completion, downloading the file to its output directory.
+ pub async fn run(&self) -> Result<()> {
+ let status = match self.run_result().await {
+ Ok(_) => ExecutionStatus::Complete,
+ Err(e) => ExecutionStatus::Failed(e.to_string()),
+ };
+
+ // TODO(frolv): Remove this and use the return value of this function.
+ self.status_tx
+ .send(ExecutionStatusMsg {
+ name: self.context.target.full_name(),
+ status,
+ })
+ .map_err(|e| Error::StringErrorPlaceholder(format!("error sending status: {e}")))
+ }
+
+ async fn run_result(&self) -> Result<()> {
+ let variant = self
+ .metadata
+ .variants
+ .iter()
+ .find(|v| v.matches.matches(&self.context.target_platform));
+
+ let url = download_url(self.metadata, variant)?;
+
+ let digest = match variant {
+ Some(v) => v.digest.as_ref().or(self.metadata.digest.as_ref()),
+ None => self.metadata.digest.as_ref(),
+ };
+ let Some(digest) = digest else {
+ return Err(Error::StringErrorPlaceholder("no digest provided".into()));
+ };
+
+ let tmpfile_path = self.temporary_file();
+ let mut tmpfile = File::create(&tmpfile_path).await?;
+
+ self.retry_download(&url, digest, &mut tmpfile).await?;
+
+ match &self.metadata.format {
+ Format::Binary(bin_name) => {
+ self.handle_binary_file(&tmpfile, &tmpfile_path, bin_name)
+ .await
+ }
+ }
+ }
+
+ /// Attempt to download from the URL to `tmpfile` several times unless a fatal error occurs.
+ async fn retry_download(
+ &self,
+ url: &str,
+ digest: &ExpectedDigest,
+ tmpfile: &mut File,
+ ) -> Result<()> {
+ for _attempt in 0..MAX_DOWNLOAD_ATTEMPTS {
+ tmpfile.rewind().await?;
+
+ match self.run_download(url, digest, tmpfile).await {
+ Ok(()) => return Ok(()),
+ Err(Status::Retry) => continue,
+ Err(Status::Failure(e)) => return Err(e),
+ };
+ }
+
+ Err(Error::StringErrorPlaceholder(format!(
+ "failed to download after {} attempts",
+ MAX_DOWNLOAD_ATTEMPTS,
+ )))
+ }
+
+ async fn run_download(
+ &self,
+ url: &str,
+ digest: &ExpectedDigest,
+ tmpfile: &mut File,
+ ) -> std::result::Result<(), Status> {
+ let download_error_to_status = |e: reqwest::Error| {
+ // TODO(frolv): Expand this with more error cases and logging.
+ if e.is_status() {
+ Status::Failure(Error::StringErrorPlaceholder(format!(
+ "failed to download: {e}",
+ )))
+ } else {
+ Status::Retry
+ }
+ };
+
+ let mut verifier = digest.verifier();
+
+ let mut response = reqwest::get(url).await.map_err(download_error_to_status)?;
+ let maybe_length = response.content_length();
+ let mut downloaded_size = 0u64;
+
+ // Stream the chunks of the file, updating the checksum and sending
+ // progress reports.
+ loop {
+ let chunk = match response.chunk().await {
+ Ok(Some(c)) => c,
+ Ok(None) => break,
+ Err(e) => return Err(download_error_to_status(e)),
+ };
+
+ tmpfile
+ .write_all(&chunk)
+ .await
+ .map_err(|e| Status::Failure(e.into()))?;
+ verifier.update(&chunk);
+ downloaded_size += chunk.len() as u64;
+
+ if let Some(len) = maybe_length {
+ // TODO(frolv): Create a context API for sending progress updates.
+ self.status_tx
+ .send(ExecutionStatusMsg {
+ name: self.context.target.full_name(),
+ status: ExecutionStatus::InProgress {
+ current: downloaded_size,
+ total: len,
+ unit: "B",
+ },
+ })
+ .map_err(|e| {
+ Status::Failure(Error::StringErrorPlaceholder(format!(
+ "error sending status: {e}"
+ )))
+ })?;
+ }
+ }
+
+ if !verifier.verify() {
+ return Err(Status::Failure(Error::StringErrorPlaceholder(
+ "digest of downloaded file does not match expected".into(),
+ )));
+ }
+
+ Ok(())
+ }
+
+ /// Makes `tmpfile` executable and renames it to its final binary path.
+ async fn handle_binary_file(
+ &self,
+ tmpfile: &File,
+ tmpfile_path: &Path,
+ bin_name: &Path,
+ ) -> Result<()> {
+ // TODO(frolv): Handle non-UNIX platforms.
+ if cfg!(unix) {
+ use std::os::unix::prelude::PermissionsExt;
+ let mut permissions = tmpfile.metadata().await?.permissions();
+ permissions.set_mode(0o755);
+ tmpfile.set_permissions(permissions).await?;
+ }
+
+ let download_path = self.context.output_dir.join(bin_name);
+ fs::rename(&tmpfile_path, &download_path).await?;
+ Ok(())
+ }
+
+ /// Returns the path to a temporary file to which downloaded content can be
+ /// written.
+ fn temporary_file(&self) -> PathBuf {
+ match &self.metadata.format {
+ Format::Binary(bin_name) => self.context.work_dir.join(bin_name),
+ }
+ }
+}
diff --git a/qg/src/executor.rs b/qg/src/executor.rs
index 0603483..0efc07d 100644
--- a/qg/src/executor.rs
+++ b/qg/src/executor.rs
@@ -21,10 +21,16 @@
use futures::future::join_all;
use tokio::{sync::mpsc, task::JoinHandle};
-use crate::{registry::Dependency, Error, Project, Result, Target};
+use crate::{
+ download::Downloader,
+ platform::Platform,
+ registry::Dependency,
+ target::{Download, Metadata},
+ Error, Project, Result, Target,
+};
#[cfg(test)]
-use crate::target::{Fake, Metadata};
+use crate::target::Fake;
#[derive(Debug, Eq, PartialEq)]
pub enum ExecutionStatus {
@@ -47,11 +53,10 @@
#[derive(Debug)]
pub struct ExecutionContext {
- target: Arc<Target>,
- #[allow(unused)]
- output_dir: PathBuf,
- #[allow(unused)]
- work_dir: PathBuf,
+ pub target: Arc<Target>,
+ pub target_platform: Platform,
+ pub output_dir: PathBuf,
+ pub work_dir: PathBuf,
}
pub(crate) struct Executor<'a> {
@@ -206,7 +211,9 @@
// TODO(konkers): Instead of returning errors immediately, errors should
// be aggregated and returned once all running tasks have completed.
ExecutionStatus::Failed(error_str) => {
- return Err(Error::StringErrorPlaceholder(error_str));
+ return Err(Error::StringErrorPlaceholder(format!(
+ "Error when executing target: {error_str}",
+ )));
}
// Ignore InProgress events for now.
@@ -240,6 +247,8 @@
self.dispatch_tx
.send(ExecutionContext {
target,
+ // TODO(frolv): Allow setting platforms dynamically.
+ target_platform: Platform::current(),
// TODO(konkers): Add canonical location for `output_dir` and `work_dir`.
output_dir: "".into(),
work_dir: "".into(),
@@ -282,6 +291,15 @@
fake::run(target, metadata, &self.status_tx).await
}
+ async fn handle_download_target(
+ &self,
+ context: &ExecutionContext,
+ metadata: &Download,
+ ) -> Result<()> {
+ let downloader = Downloader::new(context, metadata, &self.status_tx);
+ downloader.run().await
+ }
+
async fn run(self) {
loop {
let Ok(context) = self.dispatch_rx.recv().await else {
@@ -292,7 +310,11 @@
let res = match context.target.metadata() {
#[cfg(test)]
Metadata::Fake(fake) => self.handle_fake_target(&context.target, fake).await,
- _ => self.send_status(
+
+ Metadata::Download(download) => {
+ self.handle_download_target(&context, download).await
+ }
+ Metadata::Cipd(_) | Metadata::DepOnly => self.send_status(
&context.target,
ExecutionStatus::Failed("Unsupported target type".into()),
),
diff --git a/qg/src/platform.rs b/qg/src/platform.rs
index 6925311..c5bb56d 100644
--- a/qg/src/platform.rs
+++ b/qg/src/platform.rs
@@ -17,6 +17,60 @@
use crate::Error;
#[derive(Debug)]
+pub struct Platform {
+ pub system: System,
+ pub architecture: Architecture,
+}
+
+impl Platform {
+ #[must_use]
+ pub(crate) fn current() -> Self {
+ Self::new(Self::target_system(), Self::target_architecture())
+ }
+
+ #[must_use]
+ pub fn new(system: System, architecture: Architecture) -> Self {
+ Platform {
+ system,
+ architecture,
+ }
+ }
+
+ #[must_use]
+ #[cfg(target_os = "linux")]
+ fn target_system() -> System {
+ System::Linux
+ }
+
+ #[must_use]
+ #[cfg(target_os = "macos")]
+ fn target_system() -> System {
+ System::MacOs
+ }
+
+ #[must_use]
+ #[cfg(target_os = "windows")]
+ fn target_system() -> System {
+ System::Windows
+ }
+
+ #[must_use]
+ #[cfg(target_arch = "x86_64")]
+ fn target_architecture() -> Architecture {
+ Architecture::X64
+ }
+
+ #[must_use]
+ #[cfg(any(
+ all(target_arch = "arm", target_pointer_width = "64"),
+ target_arch = "aarch64"
+ ))]
+ fn target_architecture() -> Architecture {
+ Architecture::Arm64
+ }
+}
+
+#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum System {
Linux,
MacOs,
@@ -36,7 +90,7 @@
}
}
-#[derive(Debug)]
+#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum Architecture {
X64,
Arm64,
diff --git a/qg/src/registry.rs b/qg/src/registry.rs
index 50dd38c..9942547 100644
--- a/qg/src/registry.rs
+++ b/qg/src/registry.rs
@@ -316,7 +316,6 @@
///
/// # Errors
/// Returns an error if `target_name` does not name a valid target.
- #[allow(unused)]
pub fn transitive_dependencies(&self, target_name: &str) -> Result<Dependencies> {
let node_id = self.get_node_id(target_name)?;
let dfs = Dfs::new(&self.dependency_graph, node_id);
diff --git a/qg/src/target.rs b/qg/src/target.rs
index 28619b1..c4bd141 100644
--- a/qg/src/target.rs
+++ b/qg/src/target.rs
@@ -16,9 +16,10 @@
use std::path::{Path, PathBuf};
use crate::digest::ExpectedDigest;
+use crate::platform::{self, Platform};
use crate::project::manifest;
use crate::util::StringSub;
-use crate::{download, platform, Error, Result};
+use crate::{download, Error, Result};
/// A source of targets.
#[derive(Debug)]
@@ -110,7 +111,7 @@
#[derive(Debug)]
pub struct DownloadVariant {
pub matches: VariantMatch,
- url_parameters: HashMap<String, String>,
+ pub url_parameters: HashMap<String, String>,
pub digest: Option<ExpectedDigest>,
}
@@ -120,6 +121,25 @@
arch: Option<platform::Architecture>,
}
+impl VariantMatch {
+ #[must_use]
+ pub fn matches(&self, platform: &Platform) -> bool {
+ if let Some(sys) = &self.system {
+ if platform.system != *sys {
+ return false;
+ }
+ }
+
+ if let Some(arch) = &self.arch {
+ if platform.architecture != *arch {
+ return false;
+ }
+ }
+
+ true
+ }
+}
+
impl TryFrom<manifest::DownloadablePackage> for Download {
type Error = Error;
@@ -313,6 +333,8 @@
#[cfg(test)]
mod tests {
+ use crate::platform::{Architecture, System};
+
use super::*;
#[test]
@@ -482,4 +504,52 @@
assert_eq!(fake.duration_ticks, 42);
}
+
+ #[test]
+ fn variant_matches() {
+ assert!(VariantMatch {
+ system: Some(System::Linux),
+ arch: Some(Architecture::X64),
+ }
+ .matches(&Platform::new(System::Linux, Architecture::X64)));
+
+ assert!(VariantMatch {
+ system: None,
+ arch: Some(Architecture::X64),
+ }
+ .matches(&Platform::new(System::Linux, Architecture::X64)));
+
+ assert!(VariantMatch {
+ system: Some(System::Linux),
+ arch: None,
+ }
+ .matches(&Platform::new(System::Linux, Architecture::X64)));
+
+ assert!(VariantMatch {
+ system: None,
+ arch: None,
+ }
+ .matches(&Platform::new(System::Linux, Architecture::X64)));
+ }
+
+ #[test]
+ fn variant_doesnt_match() {
+ assert!(!VariantMatch {
+ system: Some(System::MacOs),
+ arch: Some(Architecture::X64),
+ }
+ .matches(&Platform::new(System::Linux, Architecture::X64)));
+
+ assert!(!VariantMatch {
+ system: Some(System::MacOs),
+ arch: None,
+ }
+ .matches(&Platform::new(System::Linux, Architecture::X64)));
+
+ assert!(!VariantMatch {
+ system: None,
+ arch: Some(Architecture::Arm64),
+ }
+ .matches(&Platform::new(System::Linux, Architecture::X64)));
+ }
}
diff --git a/qg/src/util.rs b/qg/src/util.rs
index 549363c..af907fb 100644
--- a/qg/src/util.rs
+++ b/qg/src/util.rs
@@ -115,13 +115,17 @@
}
pub fn substitute<'a>(&self, vars: &HashMap<&'a str, &'a str>) -> Result<String> {
+ self.substitute_cb(|var| vars.get(var).copied())
+ }
+
+ pub fn substitute_cb<'a>(&self, get_var: impl Fn(&str) -> Option<&'a str>) -> Result<String> {
let mut s = String::new();
for frag in &self.fragments {
match frag {
StringFragment::Literal(lit) => s.push_str(lit),
StringFragment::Variable(var) => {
- let Some(&value) = vars.get(var.as_str()) else {
+ let Some(value) = get_var(var.as_str()) else {
// TODO(frolv): no value provided for `var`.
return Err(Error::GenericErrorPlaceholder);
};