Add target Executor

Change-Id: If8e511224515e6b7b87fc69322938d4a6a289611
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/qg/+/126257
Pigweed-Auto-Submit: Erik Gilling <konkers@google.com>
Reviewed-by: Alexei Frolov <frolv@google.com>
Commit-Queue: Auto-Submit <auto-submit@pigweed.google.com.iam.gserviceaccount.com>
diff --git a/qg/Cargo.toml b/qg/Cargo.toml
index fb4c8cc..def40d8 100644
--- a/qg/Cargo.toml
+++ b/qg/Cargo.toml
@@ -8,6 +8,8 @@
 # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
 
 [dependencies]
+async-channel = "1.8.0"
+fixedbitset = "0.4.2"
 futures = "0.3.25"
 num-traits = "0.2.15"
 once_cell = "1.16.0"
diff --git a/qg/src/executor.rs b/qg/src/executor.rs
new file mode 100644
index 0000000..0603483
--- /dev/null
+++ b/qg/src/executor.rs
@@ -0,0 +1,372 @@
+// Copyright 2023 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+use std::{
+    collections::{BTreeMap, HashSet},
+    path::PathBuf,
+    sync::Arc,
+};
+
+use futures::future::join_all;
+use tokio::{sync::mpsc, task::JoinHandle};
+
+use crate::{registry::Dependency, Error, Project, Result, Target};
+
+#[cfg(test)]
+use crate::target::{Fake, Metadata};
+
+#[derive(Debug, Eq, PartialEq)]
+pub enum ExecutionStatus {
+    #[allow(unused)]
+    InProgress {
+        current: u64,
+        total: u64,
+        unit: &'static str,
+    },
+    #[allow(unused)]
+    Complete,
+    Failed(String),
+}
+
+#[derive(Debug, Eq, PartialEq)]
+pub struct ExecutionStatusMsg {
+    pub name: String,
+    pub status: ExecutionStatus,
+}
+
+#[derive(Debug)]
+pub struct ExecutionContext {
+    target: Arc<Target>,
+    #[allow(unused)]
+    output_dir: PathBuf,
+    #[allow(unused)]
+    work_dir: PathBuf,
+}
+
+pub(crate) struct Executor<'a> {
+    // Below, `BTreeMap`s are used instead of `HashMap`s to:
+    // 1. allow the executor to remove a single element easily.
+    // 2. give a deterministic execution order.
+
+    // The set of targets that have no unfinished dependencies, have been
+    // dispatched to the workers and are waiting status.
+    pending_targets: BTreeMap<String, Arc<Target>>,
+
+    // The set of targets waiting on dependencies to finish.
+    waiting_targets: BTreeMap<String, Arc<Target>>,
+
+    // The set of targets that have completed.  HashSet is used here because
+    // ordering does not matter and constant time lookups are favored.
+    completed_targets: HashSet<String>,
+
+    dispatch_tx: async_channel::Sender<ExecutionContext>,
+
+    status_rx: mpsc::UnboundedReceiver<ExecutionStatusMsg>,
+
+    project: &'a Project,
+
+    workers: Vec<JoinHandle<()>>,
+}
+
+impl<'a> Executor<'a> {
+    #[allow(unused)]
+    pub fn new(project: &'a Project, num_workers: usize) -> Executor<'a> {
+        let (dispatch_tx, dispatch_rx) = async_channel::unbounded();
+        let (status_tx, status_rx) = mpsc::unbounded_channel();
+
+        let workers = (0..num_workers)
+            .map(|_| Worker::spawn(status_tx.clone(), dispatch_rx.clone()))
+            .collect();
+
+        Executor {
+            pending_targets: BTreeMap::new(),
+            waiting_targets: BTreeMap::new(),
+            completed_targets: HashSet::new(),
+            dispatch_tx,
+            status_rx,
+            project,
+            workers,
+        }
+    }
+
+    #[allow(unused)]
+    pub async fn close(mut self) {
+        // Close the dispatch channel to signal to the workers that the
+        // executor is shutting down.
+        self.dispatch_tx.close();
+
+        // Wait for the workers to terminate.
+        join_all(self.workers).await;
+    }
+
+    /// Returns true if the target needs to run.
+    fn is_target_dirty(&self, target: &Target) -> bool {
+        !self.completed_targets.contains(&target.full_name())
+    }
+
+    /// Returns true if any of the targets dependencies are not completed.
+    /// takes `completed_targets` instead of `&self` to allow it to be used
+    /// in places that have mutable references on `self`.
+    fn is_target_blocked(target: &Target, completed_targets: &HashSet<String>) -> bool {
+        !target
+            .dependencies()
+            .all(|dep| completed_targets.contains(dep))
+    }
+
+    /// Schedules a target and all it's transitive dependencies to be executed.
+    async fn schedule_target(&mut self, target_name: &str) -> Result<()> {
+        let registry = self.project.registry();
+
+        for target in registry.transitive_dependencies(target_name)? {
+            let target = match target {
+                Dependency::Resolved(target) => target,
+                Dependency::Unresolved(name) => {
+                    return Err(Error::StringErrorPlaceholder(format!(
+                        "Unresolved transitive dependency of {}: {}",
+                        target_name, name
+                    )));
+                }
+            };
+
+            // Only consider dirty targets.
+            if self.is_target_dirty(target) {
+                if Self::is_target_blocked(target, &self.completed_targets) {
+                    // Blocked targets get put in the waiting set.
+                    self.waiting_targets
+                        .insert(target.full_name(), target.clone());
+                } else {
+                    // Unblocked targets dispatched immediately.
+                    self.dispatch_target(target.clone()).await?;
+                }
+            }
+        }
+
+        Ok(())
+    }
+
+    /// Finds all waiting targets who's dependencies are complete
+    /// and dispatches them
+    async fn dispatch_unblocked_targets(&mut self) -> Result<()> {
+        // Cache targets to dispatch since we can't call async functions in
+        // .retain().  This is a perfect use case for `BTreeMap::drain_filter`
+        //  if/when it is stabilized.
+        let mut targets_to_dispatch = Vec::new();
+
+        self.waiting_targets.retain(|_name, target| {
+            if Self::is_target_blocked(target, &self.completed_targets) {
+                // If blocked, keep it in the `waiting_targets` set.
+                true
+            } else {
+                // If not blocked, dispatch the target and remove it from
+                // `waiting_targets`.
+                targets_to_dispatch.push(target.clone());
+                false
+            }
+        });
+
+        // Dispatch newly unblocked targets.
+        for target in targets_to_dispatch {
+            self.dispatch_target(target).await?;
+        }
+
+        Ok(())
+    }
+
+    async fn handle_completed_target(&mut self, target_name: &str) -> Result<()> {
+        // Remove from pending_targets and mark complete.
+        self.pending_targets.remove(target_name);
+        self.completed_targets.insert(target_name.to_string());
+
+        // Dispatch newly unblocked targets.
+        self.dispatch_unblocked_targets().await?;
+
+        Ok(())
+    }
+
+    async fn run_to_completion(&mut self) -> Result<()> {
+        loop {
+            let msg = self.status_rx.recv().await.ok_or_else(|| {
+                Error::StringErrorPlaceholder("Error receiving status messages from workers".into())
+            })?;
+
+            match msg.status {
+                ExecutionStatus::Complete => self.handle_completed_target(&msg.name).await?,
+
+                // TODO(konkers): Instead of returning errors immediately, errors should
+                // be aggregated and returned once all running tasks have completed.
+                ExecutionStatus::Failed(error_str) => {
+                    return Err(Error::StringErrorPlaceholder(error_str));
+                }
+
+                // Ignore InProgress events for now.
+                ExecutionStatus::InProgress {
+                    current: _current,
+                    total: _total,
+                    unit: _unit,
+                } => (),
+            }
+
+            // If we have no pending targets at this point, we can't make progress and should exit.
+            if self.pending_targets.is_empty() {
+                break;
+            }
+        }
+
+        Ok(())
+    }
+
+    #[allow(unused)]
+    pub async fn execute_target(&mut self, target_name: &str) -> Result<()> {
+        self.schedule_target(target_name).await?;
+        self.run_to_completion().await?;
+
+        Ok(())
+    }
+
+    async fn dispatch_target(&mut self, target: Arc<Target>) -> Result<()> {
+        self.pending_targets
+            .insert(target.full_name(), target.clone());
+        self.dispatch_tx
+            .send(ExecutionContext {
+                target,
+                // TODO(konkers): Add canonical location for `output_dir` and `work_dir`.
+                output_dir: "".into(),
+                work_dir: "".into(),
+            })
+            .await
+            .map_err(|e| Error::StringErrorPlaceholder(format!("Error dispatching target: {e}")))
+    }
+}
+
+struct Worker {
+    status_tx: mpsc::UnboundedSender<ExecutionStatusMsg>,
+    dispatch_rx: async_channel::Receiver<ExecutionContext>,
+}
+
+impl Worker {
+    fn spawn(
+        status_tx: mpsc::UnboundedSender<ExecutionStatusMsg>,
+        dispatch_rx: async_channel::Receiver<ExecutionContext>,
+    ) -> JoinHandle<()> {
+        let worker = Worker {
+            status_tx,
+            dispatch_rx,
+        };
+        tokio::spawn(worker.run())
+    }
+
+    fn send_status(&self, target: &Target, status: ExecutionStatus) -> Result<()> {
+        self.status_tx
+            .send(ExecutionStatusMsg {
+                name: target.full_name(),
+                status,
+            })
+            .map_err(|e| Error::StringErrorPlaceholder(format!("error sending status: {e}")))
+    }
+
+    #[cfg(test)]
+    async fn handle_fake_target(&self, target: &Target, metadata: &Fake) -> Result<()> {
+        use crate::fake;
+
+        fake::run(target, metadata, &self.status_tx).await
+    }
+
+    async fn run(self) {
+        loop {
+            let Ok(context) = self.dispatch_rx.recv().await else {
+                // Closing of the dispatch channel indicates that the executor is shutting down.
+                break;
+            };
+
+            let res = match context.target.metadata() {
+                #[cfg(test)]
+                Metadata::Fake(fake) => self.handle_fake_target(&context.target, fake).await,
+                _ => self.send_status(
+                    &context.target,
+                    ExecutionStatus::Failed("Unsupported target type".into()),
+                ),
+            };
+
+            if let Err(e) = res {
+                // TODO(konkers): Log error.
+                println!("worker terminating: {e}");
+                break;
+            }
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+
+    use crate::fake::{Event, FakeCoordinator};
+
+    use super::*;
+
+    #[tokio::test]
+    async fn fake_execution() {
+        FakeCoordinator::get().await.reset_ticks();
+        let join_handle = tokio::spawn(async move {
+            let project = Project::load("./src/test_projects/dependency_test").unwrap();
+            let mut executor = Executor::new(&project, 2);
+            executor.execute_target("dep-test:a").await.unwrap();
+        });
+
+        // Give the executor time to start.
+        tokio::task::yield_now().await;
+
+        // No ticks have been processed so b and d have been started
+        // and no other progress has been made.
+        assert_eq!(
+            FakeCoordinator::get().await.clone_events(),
+            vec![Event::start(0, "dep-test:b"), Event::start(0, "dep-test:d"),]
+                .into_iter()
+                .collect()
+        );
+
+        // Tick the fake targets 100 times which is sufficient for them to
+        // complete.
+        for _ in 0..100 {
+            FakeCoordinator::get().await.increment_ticks(1).await;
+        }
+
+        // Wait for the executor to finish.
+        join_handle.await.unwrap();
+
+        assert_eq!(
+            FakeCoordinator::get().await.clone_events(),
+            vec![
+                // b, d, and e are all dispatched but we only have two workers
+                // so only b and d are started.
+                Event::start(0, "dep-test:b"),
+                Event::start(0, "dep-test:d"),
+                // Two ticks later b completes and a tick after that e is
+                // started now that an executor is free.
+                Event::end(2, "dep-test:b"),
+                Event::start(3, "dep-test:e"),
+                // Later d and e complete unblocking c which starts a tick
+                // later.
+                Event::end(4, "dep-test:d"),
+                Event::end(8, "dep-test:e"),
+                Event::start(9, "dep-test:c"),
+                // Once c completes, which unblocks a.
+                Event::end(12, "dep-test:c"),
+                Event::start(13, "dep-test:a"),
+                Event::end(14, "dep-test:a"),
+            ]
+            .into_iter()
+            .collect()
+        );
+    }
+}
diff --git a/qg/src/fake.rs b/qg/src/fake.rs
new file mode 100644
index 0000000..02bfeb4
--- /dev/null
+++ b/qg/src/fake.rs
@@ -0,0 +1,144 @@
+// Copyright 2023 The Pigweed Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+use std::collections::BTreeSet;
+
+use once_cell::sync::Lazy;
+use tokio::sync::{
+    mpsc, watch, {Mutex, MutexGuard},
+};
+
+use crate::{
+    executor::{ExecutionStatus, ExecutionStatusMsg},
+    target::Fake,
+    Error, Result, Target,
+};
+
+static COORDINATOR: Lazy<Mutex<FakeCoordinator>> = Lazy::new(|| Mutex::new(FakeCoordinator::new()));
+
+#[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd)]
+pub(crate) struct Event {
+    pub tick: u32,
+    pub target_name: String,
+    pub event_type: String,
+}
+
+impl Event {
+    pub fn start(tick: u32, target_name: &str) -> Event {
+        Event {
+            tick,
+            target_name: target_name.to_string(),
+            event_type: "start".to_string(),
+        }
+    }
+    pub fn end(tick: u32, target_name: &str) -> Event {
+        Event {
+            tick,
+            target_name: target_name.to_string(),
+            event_type: "end".to_string(),
+        }
+    }
+}
+
+pub(crate) struct FakeCoordinator {
+    current_tick: u32,
+    current_tick_tx: watch::Sender<u32>,
+    current_tick_rx: watch::Receiver<u32>,
+    events: BTreeSet<Event>,
+}
+
+impl FakeCoordinator {
+    fn new() -> Self {
+        let (current_tick_tx, current_tick_rx) = watch::channel(0);
+        Self {
+            current_tick: 0,
+            current_tick_tx,
+            current_tick_rx,
+            events: BTreeSet::new(),
+        }
+    }
+
+    pub(crate) async fn get() -> MutexGuard<'static, FakeCoordinator> {
+        COORDINATOR.lock().await
+    }
+
+    pub(crate) fn reset_ticks(&mut self) {
+        self.current_tick = 0;
+        self.current_tick_tx.send(self.current_tick).unwrap();
+    }
+
+    pub(crate) async fn increment_ticks(&mut self, num_ticks: u32) {
+        self.current_tick += num_ticks;
+        self.current_tick_tx.send(self.current_tick).unwrap();
+
+        // Give workers time to run.  Not a great way of ensuring they process the
+        // tick.
+        //
+        // TODO(konkers): Add a mechanism (testing only?) to wait for the executor
+        // to process status messages from the Fake backends for a given tick.
+        tokio::task::yield_now().await;
+    }
+
+    pub(crate) fn clone_events(&self) -> BTreeSet<Event> {
+        self.events.clone()
+    }
+}
+
+async fn log_event(event: Event) {
+    let mut coordinator = FakeCoordinator::get().await;
+    coordinator.events.insert(event);
+}
+
+pub(crate) async fn run(
+    target: &Target,
+    metadata: &Fake,
+    status_tx: &mpsc::UnboundedSender<ExecutionStatusMsg>,
+) -> Result<()> {
+    let mut tick_rx = {
+        let coordinator = FakeCoordinator::get().await;
+        coordinator.current_tick_rx.clone()
+    };
+
+    let start_tick = *tick_rx.borrow();
+
+    log_event(Event::start(start_tick, &target.full_name())).await;
+
+    while tick_rx.changed().await.is_ok() {
+        let current_tick = *tick_rx.borrow();
+        let elapsed_ticks = current_tick - start_tick;
+        if elapsed_ticks >= metadata.duration_ticks {
+            log_event(Event::end(current_tick, &target.full_name())).await;
+            status_tx
+                .send(ExecutionStatusMsg {
+                    name: target.full_name(),
+                    status: ExecutionStatus::Complete,
+                })
+                .map_err(|e| Error::StringErrorPlaceholder(format!("error sending status: {e}")))?;
+            break;
+        }
+
+        status_tx
+            .send(ExecutionStatusMsg {
+                name: target.full_name(),
+                status: ExecutionStatus::InProgress {
+                    current: u64::from(elapsed_ticks),
+                    total: u64::from(metadata.duration_ticks),
+                    unit: "ticks",
+                },
+            })
+            .map_err(|e| Error::StringErrorPlaceholder(format!("error sending status: {e}")))?;
+    }
+
+    Ok(())
+}
diff --git a/qg/src/lib.rs b/qg/src/lib.rs
index 2aeafe7..4ac1154 100644
--- a/qg/src/lib.rs
+++ b/qg/src/lib.rs
@@ -25,8 +25,12 @@
 
 mod digest;
 mod download;
+mod executor;
 mod util;
 
+#[cfg(test)]
+pub(crate) mod fake;
+
 #[doc(inline)]
 pub use target::Target;
 
diff --git a/qg/src/registry.rs b/qg/src/registry.rs
index eefe5cb..50dd38c 100644
--- a/qg/src/registry.rs
+++ b/qg/src/registry.rs
@@ -18,9 +18,11 @@
     sync::Arc,
 };
 
+use fixedbitset::FixedBitSet;
 use petgraph::{
     dot::{self, Dot},
     graph::{Neighbors, NodeIndex},
+    visit::Dfs,
     Directed, Direction, Graph,
 };
 
@@ -69,6 +71,22 @@
 /// An identifier for a node in the dependency graph.
 pub type NodeId = NodeIndex<u32>;
 
+/// An iterator of target dependencies.
+pub struct Dependencies<'a> {
+    registry: &'a Registry,
+    dfs: Dfs<NodeIndex, FixedBitSet>,
+}
+
+impl<'a> Iterator for Dependencies<'a> {
+    type Item = &'a Dependency;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        self.dfs
+            .next(&self.registry.dependency_graph)
+            .and_then(|node_id| self.registry.dependency_graph.node_weight(node_id))
+    }
+}
+
 /// A database of packages known to `qg`.
 #[derive(Debug)]
 pub struct Registry {
@@ -293,6 +311,21 @@
             Dependency::Unresolved(_) => Err(Error::GenericErrorPlaceholder),
         }
     }
+
+    /// Returns an iterator for traversing a target's transitive dependencies.
+    ///
+    /// # Errors
+    /// Returns an error if `target_name` does not name a valid target.
+    #[allow(unused)]
+    pub fn transitive_dependencies(&self, target_name: &str) -> Result<Dependencies> {
+        let node_id = self.get_node_id(target_name)?;
+        let dfs = Dfs::new(&self.dependency_graph, node_id);
+
+        Ok(Dependencies {
+            registry: self,
+            dfs,
+        })
+    }
 }
 
 impl Default for Registry {