| // Copyright 2023 The Pigweed Authors |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| // use this file except in compliance with the License. You may obtain a copy of |
| // the License at |
| // |
| // https://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| // License for the specific language governing permissions and limitations under |
| // the License. |
| |
| use std::collections::BTreeSet; |
| |
| use once_cell::sync::Lazy; |
| use tokio::sync::{ |
| mpsc, watch, {Mutex, MutexGuard}, |
| }; |
| |
| use crate::{ |
| executor::{ExecutionStatus, ExecutionStatusMsg}, |
| target::Fake, |
| Error, Result, Target, |
| }; |
| |
| static COORDINATOR: Lazy<Mutex<FakeCoordinator>> = Lazy::new(|| Mutex::new(FakeCoordinator::new())); |
| |
| #[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd)] |
| pub(crate) struct Event { |
| pub tick: u32, |
| pub target_name: String, |
| pub event_type: String, |
| } |
| |
| impl Event { |
| pub fn start(tick: u32, target_name: &str) -> Event { |
| Event { |
| tick, |
| target_name: target_name.to_string(), |
| event_type: "start".to_string(), |
| } |
| } |
| pub fn end(tick: u32, target_name: &str) -> Event { |
| Event { |
| tick, |
| target_name: target_name.to_string(), |
| event_type: "end".to_string(), |
| } |
| } |
| } |
| |
| pub(crate) struct FakeCoordinator { |
| current_tick: u32, |
| current_tick_tx: watch::Sender<u32>, |
| current_tick_rx: watch::Receiver<u32>, |
| events: BTreeSet<Event>, |
| } |
| |
| impl FakeCoordinator { |
| fn new() -> Self { |
| let (current_tick_tx, current_tick_rx) = watch::channel(0); |
| Self { |
| current_tick: 0, |
| current_tick_tx, |
| current_tick_rx, |
| events: BTreeSet::new(), |
| } |
| } |
| |
| pub(crate) async fn get() -> MutexGuard<'static, FakeCoordinator> { |
| COORDINATOR.lock().await |
| } |
| |
| pub(crate) fn reset_ticks(&mut self) { |
| self.current_tick = 0; |
| self.current_tick_tx.send(self.current_tick).unwrap(); |
| } |
| |
| pub(crate) async fn increment_ticks(&mut self, num_ticks: u32) { |
| self.current_tick += num_ticks; |
| self.current_tick_tx.send(self.current_tick).unwrap(); |
| |
| // Give workers time to run. Not a great way of ensuring they process the |
| // tick. |
| // |
| // TODO(konkers): Add a mechanism (testing only?) to wait for the executor |
| // to process status messages from the Fake backends for a given tick. |
| tokio::task::yield_now().await; |
| } |
| |
| pub(crate) fn clone_events(&self) -> BTreeSet<Event> { |
| self.events.clone() |
| } |
| } |
| |
| async fn log_event(event: Event) { |
| let mut coordinator = FakeCoordinator::get().await; |
| coordinator.events.insert(event); |
| } |
| |
| pub(crate) async fn run( |
| target: &Target, |
| metadata: &Fake, |
| status_tx: &mpsc::UnboundedSender<ExecutionStatusMsg>, |
| ) -> Result<()> { |
| let mut tick_rx = { |
| let coordinator = FakeCoordinator::get().await; |
| coordinator.current_tick_rx.clone() |
| }; |
| |
| let start_tick = *tick_rx.borrow(); |
| |
| log_event(Event::start(start_tick, &target.full_name())).await; |
| |
| while tick_rx.changed().await.is_ok() { |
| let current_tick = *tick_rx.borrow(); |
| let elapsed_ticks = current_tick - start_tick; |
| if elapsed_ticks >= metadata.duration_ticks { |
| log_event(Event::end(current_tick, &target.full_name())).await; |
| status_tx |
| .send(ExecutionStatusMsg { |
| name: target.full_name(), |
| status: ExecutionStatus::Complete, |
| }) |
| .map_err(|e| Error::StringErrorPlaceholder(format!("error sending status: {e}")))?; |
| break; |
| } |
| |
| status_tx |
| .send(ExecutionStatusMsg { |
| name: target.full_name(), |
| status: ExecutionStatus::InProgress { |
| current: u64::from(elapsed_ticks), |
| total: u64::from(metadata.duration_ticks), |
| unit: "ticks", |
| }, |
| }) |
| .map_err(|e| Error::StringErrorPlaceholder(format!("error sending status: {e}")))?; |
| } |
| |
| Ok(()) |
| } |