Add target Executor
Change-Id: If8e511224515e6b7b87fc69322938d4a6a289611
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/qg/+/126257
Pigweed-Auto-Submit: Erik Gilling <konkers@google.com>
Reviewed-by: Alexei Frolov <frolv@google.com>
Commit-Queue: Auto-Submit <auto-submit@pigweed.google.com.iam.gserviceaccount.com>
diff --git a/qg/Cargo.toml b/qg/Cargo.toml
index fb4c8cc..def40d8 100644
--- a/qg/Cargo.toml
+++ b/qg/Cargo.toml
@@ -8,6 +8,8 @@
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
+async-channel = "1.8.0"
+fixedbitset = "0.4.2"
futures = "0.3.25"
num-traits = "0.2.15"
once_cell = "1.16.0"
diff --git a/qg/src/executor.rs b/qg/src/executor.rs
new file mode 100644
index 0000000..0603483
--- /dev/null
+++ b/qg/src/executor.rs
@@ -0,0 +1,372 @@
+// Copyright 2023 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+use std::{
+ collections::{BTreeMap, HashSet},
+ path::PathBuf,
+ sync::Arc,
+};
+
+use futures::future::join_all;
+use tokio::{sync::mpsc, task::JoinHandle};
+
+use crate::{registry::Dependency, Error, Project, Result, Target};
+
+#[cfg(test)]
+use crate::target::{Fake, Metadata};
+
+#[derive(Debug, Eq, PartialEq)]
+pub enum ExecutionStatus {
+ #[allow(unused)]
+ InProgress {
+ current: u64,
+ total: u64,
+ unit: &'static str,
+ },
+ #[allow(unused)]
+ Complete,
+ Failed(String),
+}
+
+#[derive(Debug, Eq, PartialEq)]
+pub struct ExecutionStatusMsg {
+ pub name: String,
+ pub status: ExecutionStatus,
+}
+
+#[derive(Debug)]
+pub struct ExecutionContext {
+ target: Arc<Target>,
+ #[allow(unused)]
+ output_dir: PathBuf,
+ #[allow(unused)]
+ work_dir: PathBuf,
+}
+
+pub(crate) struct Executor<'a> {
+ // Below, `BTreeMap`s are used instead of `HashMap`s to:
+ // 1. allow the executor to remove a single element easily.
+ // 2. give a deterministic execution order.
+
+ // The set of targets that have no unfinished dependencies, have been
+ // dispatched to the workers and are waiting status.
+ pending_targets: BTreeMap<String, Arc<Target>>,
+
+ // The set of targets waiting on dependencies to finish.
+ waiting_targets: BTreeMap<String, Arc<Target>>,
+
+ // The set of targets that have completed. HashSet is used here because
+ // ordering does not matter and constant time lookups are favored.
+ completed_targets: HashSet<String>,
+
+ dispatch_tx: async_channel::Sender<ExecutionContext>,
+
+ status_rx: mpsc::UnboundedReceiver<ExecutionStatusMsg>,
+
+ project: &'a Project,
+
+ workers: Vec<JoinHandle<()>>,
+}
+
+impl<'a> Executor<'a> {
+ #[allow(unused)]
+ pub fn new(project: &'a Project, num_workers: usize) -> Executor<'a> {
+ let (dispatch_tx, dispatch_rx) = async_channel::unbounded();
+ let (status_tx, status_rx) = mpsc::unbounded_channel();
+
+ let workers = (0..num_workers)
+ .map(|_| Worker::spawn(status_tx.clone(), dispatch_rx.clone()))
+ .collect();
+
+ Executor {
+ pending_targets: BTreeMap::new(),
+ waiting_targets: BTreeMap::new(),
+ completed_targets: HashSet::new(),
+ dispatch_tx,
+ status_rx,
+ project,
+ workers,
+ }
+ }
+
+ #[allow(unused)]
+ pub async fn close(mut self) {
+ // Close the dispatch channel to signal to the workers that the
+ // executor is shutting down.
+ self.dispatch_tx.close();
+
+ // Wait for the workers to terminate.
+ join_all(self.workers).await;
+ }
+
+ /// Returns true if the target needs to run.
+ fn is_target_dirty(&self, target: &Target) -> bool {
+ !self.completed_targets.contains(&target.full_name())
+ }
+
+ /// Returns true if any of the targets dependencies are not completed.
+ /// takes `completed_targets` instead of `&self` to allow it to be used
+ /// in places that have mutable references on `self`.
+ fn is_target_blocked(target: &Target, completed_targets: &HashSet<String>) -> bool {
+ !target
+ .dependencies()
+ .all(|dep| completed_targets.contains(dep))
+ }
+
+ /// Schedules a target and all it's transitive dependencies to be executed.
+ async fn schedule_target(&mut self, target_name: &str) -> Result<()> {
+ let registry = self.project.registry();
+
+ for target in registry.transitive_dependencies(target_name)? {
+ let target = match target {
+ Dependency::Resolved(target) => target,
+ Dependency::Unresolved(name) => {
+ return Err(Error::StringErrorPlaceholder(format!(
+ "Unresolved transitive dependency of {}: {}",
+ target_name, name
+ )));
+ }
+ };
+
+ // Only consider dirty targets.
+ if self.is_target_dirty(target) {
+ if Self::is_target_blocked(target, &self.completed_targets) {
+ // Blocked targets get put in the waiting set.
+ self.waiting_targets
+ .insert(target.full_name(), target.clone());
+ } else {
+ // Unblocked targets dispatched immediately.
+ self.dispatch_target(target.clone()).await?;
+ }
+ }
+ }
+
+ Ok(())
+ }
+
+ /// Finds all waiting targets who's dependencies are complete
+ /// and dispatches them
+ async fn dispatch_unblocked_targets(&mut self) -> Result<()> {
+ // Cache targets to dispatch since we can't call async functions in
+ // .retain(). This is a perfect use case for `BTreeMap::drain_filter`
+ // if/when it is stabilized.
+ let mut targets_to_dispatch = Vec::new();
+
+ self.waiting_targets.retain(|_name, target| {
+ if Self::is_target_blocked(target, &self.completed_targets) {
+ // If blocked, keep it in the `waiting_targets` set.
+ true
+ } else {
+ // If not blocked, dispatch the target and remove it from
+ // `waiting_targets`.
+ targets_to_dispatch.push(target.clone());
+ false
+ }
+ });
+
+ // Dispatch newly unblocked targets.
+ for target in targets_to_dispatch {
+ self.dispatch_target(target).await?;
+ }
+
+ Ok(())
+ }
+
+ async fn handle_completed_target(&mut self, target_name: &str) -> Result<()> {
+ // Remove from pending_targets and mark complete.
+ self.pending_targets.remove(target_name);
+ self.completed_targets.insert(target_name.to_string());
+
+ // Dispatch newly unblocked targets.
+ self.dispatch_unblocked_targets().await?;
+
+ Ok(())
+ }
+
+ async fn run_to_completion(&mut self) -> Result<()> {
+ loop {
+ let msg = self.status_rx.recv().await.ok_or_else(|| {
+ Error::StringErrorPlaceholder("Error receiving status messages from workers".into())
+ })?;
+
+ match msg.status {
+ ExecutionStatus::Complete => self.handle_completed_target(&msg.name).await?,
+
+ // TODO(konkers): Instead of returning errors immediately, errors should
+ // be aggregated and returned once all running tasks have completed.
+ ExecutionStatus::Failed(error_str) => {
+ return Err(Error::StringErrorPlaceholder(error_str));
+ }
+
+ // Ignore InProgress events for now.
+ ExecutionStatus::InProgress {
+ current: _current,
+ total: _total,
+ unit: _unit,
+ } => (),
+ }
+
+ // If we have no pending targets at this point, we can't make progress and should exit.
+ if self.pending_targets.is_empty() {
+ break;
+ }
+ }
+
+ Ok(())
+ }
+
+ #[allow(unused)]
+ pub async fn execute_target(&mut self, target_name: &str) -> Result<()> {
+ self.schedule_target(target_name).await?;
+ self.run_to_completion().await?;
+
+ Ok(())
+ }
+
+ async fn dispatch_target(&mut self, target: Arc<Target>) -> Result<()> {
+ self.pending_targets
+ .insert(target.full_name(), target.clone());
+ self.dispatch_tx
+ .send(ExecutionContext {
+ target,
+ // TODO(konkers): Add canonical location for `output_dir` and `work_dir`.
+ output_dir: "".into(),
+ work_dir: "".into(),
+ })
+ .await
+ .map_err(|e| Error::StringErrorPlaceholder(format!("Error dispatching target: {e}")))
+ }
+}
+
+struct Worker {
+ status_tx: mpsc::UnboundedSender<ExecutionStatusMsg>,
+ dispatch_rx: async_channel::Receiver<ExecutionContext>,
+}
+
+impl Worker {
+ fn spawn(
+ status_tx: mpsc::UnboundedSender<ExecutionStatusMsg>,
+ dispatch_rx: async_channel::Receiver<ExecutionContext>,
+ ) -> JoinHandle<()> {
+ let worker = Worker {
+ status_tx,
+ dispatch_rx,
+ };
+ tokio::spawn(worker.run())
+ }
+
+ fn send_status(&self, target: &Target, status: ExecutionStatus) -> Result<()> {
+ self.status_tx
+ .send(ExecutionStatusMsg {
+ name: target.full_name(),
+ status,
+ })
+ .map_err(|e| Error::StringErrorPlaceholder(format!("error sending status: {e}")))
+ }
+
+ #[cfg(test)]
+ async fn handle_fake_target(&self, target: &Target, metadata: &Fake) -> Result<()> {
+ use crate::fake;
+
+ fake::run(target, metadata, &self.status_tx).await
+ }
+
+ async fn run(self) {
+ loop {
+ let Ok(context) = self.dispatch_rx.recv().await else {
+ // Closing of the dispatch channel indicates that the executor is shutting down.
+ break;
+ };
+
+ let res = match context.target.metadata() {
+ #[cfg(test)]
+ Metadata::Fake(fake) => self.handle_fake_target(&context.target, fake).await,
+ _ => self.send_status(
+ &context.target,
+ ExecutionStatus::Failed("Unsupported target type".into()),
+ ),
+ };
+
+ if let Err(e) = res {
+ // TODO(konkers): Log error.
+ println!("worker terminating: {e}");
+ break;
+ }
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+
+ use crate::fake::{Event, FakeCoordinator};
+
+ use super::*;
+
+ #[tokio::test]
+ async fn fake_execution() {
+ FakeCoordinator::get().await.reset_ticks();
+ let join_handle = tokio::spawn(async move {
+ let project = Project::load("./src/test_projects/dependency_test").unwrap();
+ let mut executor = Executor::new(&project, 2);
+ executor.execute_target("dep-test:a").await.unwrap();
+ });
+
+ // Give the executor time to start.
+ tokio::task::yield_now().await;
+
+ // No ticks have been processed so b and d have been started
+ // and no other progress has been made.
+ assert_eq!(
+ FakeCoordinator::get().await.clone_events(),
+ vec![Event::start(0, "dep-test:b"), Event::start(0, "dep-test:d"),]
+ .into_iter()
+ .collect()
+ );
+
+ // Tick the fake targets 100 times which is sufficient for them to
+ // complete.
+ for _ in 0..100 {
+ FakeCoordinator::get().await.increment_ticks(1).await;
+ }
+
+ // Wait for the executor to finish.
+ join_handle.await.unwrap();
+
+ assert_eq!(
+ FakeCoordinator::get().await.clone_events(),
+ vec![
+ // b, d, and e are all dispatched but we only have two workers
+ // so only b and d are started.
+ Event::start(0, "dep-test:b"),
+ Event::start(0, "dep-test:d"),
+ // Two ticks later b completes and a tick after that e is
+ // started now that an executor is free.
+ Event::end(2, "dep-test:b"),
+ Event::start(3, "dep-test:e"),
+ // Later d and e complete unblocking c which starts a tick
+ // later.
+ Event::end(4, "dep-test:d"),
+ Event::end(8, "dep-test:e"),
+ Event::start(9, "dep-test:c"),
+ // Once c completes, which unblocks a.
+ Event::end(12, "dep-test:c"),
+ Event::start(13, "dep-test:a"),
+ Event::end(14, "dep-test:a"),
+ ]
+ .into_iter()
+ .collect()
+ );
+ }
+}
diff --git a/qg/src/fake.rs b/qg/src/fake.rs
new file mode 100644
index 0000000..02bfeb4
--- /dev/null
+++ b/qg/src/fake.rs
@@ -0,0 +1,144 @@
+// Copyright 2023 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+use std::collections::BTreeSet;
+
+use once_cell::sync::Lazy;
+use tokio::sync::{
+ mpsc, watch, {Mutex, MutexGuard},
+};
+
+use crate::{
+ executor::{ExecutionStatus, ExecutionStatusMsg},
+ target::Fake,
+ Error, Result, Target,
+};
+
+static COORDINATOR: Lazy<Mutex<FakeCoordinator>> = Lazy::new(|| Mutex::new(FakeCoordinator::new()));
+
+#[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd)]
+pub(crate) struct Event {
+ pub tick: u32,
+ pub target_name: String,
+ pub event_type: String,
+}
+
+impl Event {
+ pub fn start(tick: u32, target_name: &str) -> Event {
+ Event {
+ tick,
+ target_name: target_name.to_string(),
+ event_type: "start".to_string(),
+ }
+ }
+ pub fn end(tick: u32, target_name: &str) -> Event {
+ Event {
+ tick,
+ target_name: target_name.to_string(),
+ event_type: "end".to_string(),
+ }
+ }
+}
+
+pub(crate) struct FakeCoordinator {
+ current_tick: u32,
+ current_tick_tx: watch::Sender<u32>,
+ current_tick_rx: watch::Receiver<u32>,
+ events: BTreeSet<Event>,
+}
+
+impl FakeCoordinator {
+ fn new() -> Self {
+ let (current_tick_tx, current_tick_rx) = watch::channel(0);
+ Self {
+ current_tick: 0,
+ current_tick_tx,
+ current_tick_rx,
+ events: BTreeSet::new(),
+ }
+ }
+
+ pub(crate) async fn get() -> MutexGuard<'static, FakeCoordinator> {
+ COORDINATOR.lock().await
+ }
+
+ pub(crate) fn reset_ticks(&mut self) {
+ self.current_tick = 0;
+ self.current_tick_tx.send(self.current_tick).unwrap();
+ }
+
+ pub(crate) async fn increment_ticks(&mut self, num_ticks: u32) {
+ self.current_tick += num_ticks;
+ self.current_tick_tx.send(self.current_tick).unwrap();
+
+ // Give workers time to run. Not a great way of ensuring they process the
+ // tick.
+ //
+ // TODO(konkers): Add a mechanism (testing only?) to wait for the executor
+ // to process status messages from the Fake backends for a given tick.
+ tokio::task::yield_now().await;
+ }
+
+ pub(crate) fn clone_events(&self) -> BTreeSet<Event> {
+ self.events.clone()
+ }
+}
+
+async fn log_event(event: Event) {
+ let mut coordinator = FakeCoordinator::get().await;
+ coordinator.events.insert(event);
+}
+
+pub(crate) async fn run(
+ target: &Target,
+ metadata: &Fake,
+ status_tx: &mpsc::UnboundedSender<ExecutionStatusMsg>,
+) -> Result<()> {
+ let mut tick_rx = {
+ let coordinator = FakeCoordinator::get().await;
+ coordinator.current_tick_rx.clone()
+ };
+
+ let start_tick = *tick_rx.borrow();
+
+ log_event(Event::start(start_tick, &target.full_name())).await;
+
+ while tick_rx.changed().await.is_ok() {
+ let current_tick = *tick_rx.borrow();
+ let elapsed_ticks = current_tick - start_tick;
+ if elapsed_ticks >= metadata.duration_ticks {
+ log_event(Event::end(current_tick, &target.full_name())).await;
+ status_tx
+ .send(ExecutionStatusMsg {
+ name: target.full_name(),
+ status: ExecutionStatus::Complete,
+ })
+ .map_err(|e| Error::StringErrorPlaceholder(format!("error sending status: {e}")))?;
+ break;
+ }
+
+ status_tx
+ .send(ExecutionStatusMsg {
+ name: target.full_name(),
+ status: ExecutionStatus::InProgress {
+ current: u64::from(elapsed_ticks),
+ total: u64::from(metadata.duration_ticks),
+ unit: "ticks",
+ },
+ })
+ .map_err(|e| Error::StringErrorPlaceholder(format!("error sending status: {e}")))?;
+ }
+
+ Ok(())
+}
diff --git a/qg/src/lib.rs b/qg/src/lib.rs
index 2aeafe7..4ac1154 100644
--- a/qg/src/lib.rs
+++ b/qg/src/lib.rs
@@ -25,8 +25,12 @@
mod digest;
mod download;
+mod executor;
mod util;
+#[cfg(test)]
+pub(crate) mod fake;
+
#[doc(inline)]
pub use target::Target;
diff --git a/qg/src/registry.rs b/qg/src/registry.rs
index eefe5cb..50dd38c 100644
--- a/qg/src/registry.rs
+++ b/qg/src/registry.rs
@@ -18,9 +18,11 @@
sync::Arc,
};
+use fixedbitset::FixedBitSet;
use petgraph::{
dot::{self, Dot},
graph::{Neighbors, NodeIndex},
+ visit::Dfs,
Directed, Direction, Graph,
};
@@ -69,6 +71,22 @@
/// An identifier for a node in the dependency graph.
pub type NodeId = NodeIndex<u32>;
+/// An iterator of target dependencies.
+pub struct Dependencies<'a> {
+ registry: &'a Registry,
+ dfs: Dfs<NodeIndex, FixedBitSet>,
+}
+
+impl<'a> Iterator for Dependencies<'a> {
+ type Item = &'a Dependency;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ self.dfs
+ .next(&self.registry.dependency_graph)
+ .and_then(|node_id| self.registry.dependency_graph.node_weight(node_id))
+ }
+}
+
/// A database of packages known to `qg`.
#[derive(Debug)]
pub struct Registry {
@@ -293,6 +311,21 @@
Dependency::Unresolved(_) => Err(Error::GenericErrorPlaceholder),
}
}
+
+ /// Returns an iterator for traversing a target's transitive dependencies.
+ ///
+ /// # Errors
+ /// Returns an error if `target_name` does not name a valid target.
+ #[allow(unused)]
+ pub fn transitive_dependencies(&self, target_name: &str) -> Result<Dependencies> {
+ let node_id = self.get_node_id(target_name)?;
+ let dfs = Dfs::new(&self.dependency_graph, node_id);
+
+ Ok(Dependencies {
+ registry: self,
+ dfs,
+ })
+ }
}
impl Default for Registry {