blob: 02bfeb4990a7d430f9c7fd809475652399d5de33 [file] [log] [blame]
// Copyright 2023 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.
use std::collections::BTreeSet;
use once_cell::sync::Lazy;
use tokio::sync::{
mpsc, watch, {Mutex, MutexGuard},
};
use crate::{
executor::{ExecutionStatus, ExecutionStatusMsg},
target::Fake,
Error, Result, Target,
};
static COORDINATOR: Lazy<Mutex<FakeCoordinator>> = Lazy::new(|| Mutex::new(FakeCoordinator::new()));
#[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd)]
pub(crate) struct Event {
pub tick: u32,
pub target_name: String,
pub event_type: String,
}
impl Event {
pub fn start(tick: u32, target_name: &str) -> Event {
Event {
tick,
target_name: target_name.to_string(),
event_type: "start".to_string(),
}
}
pub fn end(tick: u32, target_name: &str) -> Event {
Event {
tick,
target_name: target_name.to_string(),
event_type: "end".to_string(),
}
}
}
pub(crate) struct FakeCoordinator {
current_tick: u32,
current_tick_tx: watch::Sender<u32>,
current_tick_rx: watch::Receiver<u32>,
events: BTreeSet<Event>,
}
impl FakeCoordinator {
fn new() -> Self {
let (current_tick_tx, current_tick_rx) = watch::channel(0);
Self {
current_tick: 0,
current_tick_tx,
current_tick_rx,
events: BTreeSet::new(),
}
}
pub(crate) async fn get() -> MutexGuard<'static, FakeCoordinator> {
COORDINATOR.lock().await
}
pub(crate) fn reset_ticks(&mut self) {
self.current_tick = 0;
self.current_tick_tx.send(self.current_tick).unwrap();
}
pub(crate) async fn increment_ticks(&mut self, num_ticks: u32) {
self.current_tick += num_ticks;
self.current_tick_tx.send(self.current_tick).unwrap();
// Give workers time to run. Not a great way of ensuring they process the
// tick.
//
// TODO(konkers): Add a mechanism (testing only?) to wait for the executor
// to process status messages from the Fake backends for a given tick.
tokio::task::yield_now().await;
}
pub(crate) fn clone_events(&self) -> BTreeSet<Event> {
self.events.clone()
}
}
async fn log_event(event: Event) {
let mut coordinator = FakeCoordinator::get().await;
coordinator.events.insert(event);
}
pub(crate) async fn run(
target: &Target,
metadata: &Fake,
status_tx: &mpsc::UnboundedSender<ExecutionStatusMsg>,
) -> Result<()> {
let mut tick_rx = {
let coordinator = FakeCoordinator::get().await;
coordinator.current_tick_rx.clone()
};
let start_tick = *tick_rx.borrow();
log_event(Event::start(start_tick, &target.full_name())).await;
while tick_rx.changed().await.is_ok() {
let current_tick = *tick_rx.borrow();
let elapsed_ticks = current_tick - start_tick;
if elapsed_ticks >= metadata.duration_ticks {
log_event(Event::end(current_tick, &target.full_name())).await;
status_tx
.send(ExecutionStatusMsg {
name: target.full_name(),
status: ExecutionStatus::Complete,
})
.map_err(|e| Error::StringErrorPlaceholder(format!("error sending status: {e}")))?;
break;
}
status_tx
.send(ExecutionStatusMsg {
name: target.full_name(),
status: ExecutionStatus::InProgress {
current: u64::from(elapsed_ticks),
total: u64::from(metadata.duration_ticks),
unit: "ticks",
},
})
.map_err(|e| Error::StringErrorPlaceholder(format!("error sending status: {e}")))?;
}
Ok(())
}