| // Copyright 2025 The Pigweed Authors |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| // use this file except in compliance with the License. You may obtain a copy of |
| // the License at |
| // |
| // https://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| // License for the specific language governing permissions and limitations under |
| // the License. |
| |
| //! Kernel Scheduler / Context Switching engine |
| //! |
| //! The module provides several interfaces for managing context switches. Each is |
| //! different in its contract: |
| //! - `block()` is used when a thread is taken out of the scheduler algorithm |
| //! and is managed by another source such as a wait queue. These threads |
| //! should be in the [`State::WaitingInterruptible`] or |
| //! [`State::WaitingNonInterruptible`] state. |
| //! - `try_reschedule()` is used when there is a change to the scheduling state |
| //! (such as waking a thread). Depending on the local thread's |
| //! [`PreemptDisableGuard`] state and the scheduler state, this may trigger an |
| //! immediate context switch or it may defer it. This is useful for code which |
| //! may or may not be called in an interrupt context. |
| //! - `try_deferred_reschedule()` is used to trigger a context switch if one was |
| //! deferred by `try_reschedule()`. Notably, this does not guarantee a context |
| //! switch if the [`PreemptDisableGuard`] state is not active but does |
| //! guarantee forward progress. This comes into play for architectures like |
| //! M-profile ARM which handles the context switch in an exception handler |
| //! after an interrupt has returned. |
| use core::cell::UnsafeCell; |
| use core::ptr::NonNull; |
| use core::sync::atomic::Ordering; |
| |
| use foreign_box::ForeignBox; |
| use list::*; |
| use memory_config::MemoryConfig as _; |
| use pw_atomic::{ |
| AtomicAdd, AtomicCompareExchange, AtomicFalse, AtomicLoad, AtomicStore, AtomicSub, AtomicZero, |
| }; |
| use pw_log::info; |
| use pw_status::{Error, Result}; |
| use syscall_defs::ExitStatus; |
| use time::Instant; |
| |
| #[cfg(feature = "user_space")] |
| use crate::object::NullObjectTable; |
| use crate::scheduler::timer::Timer; |
| use crate::sync::event::EventSignaler; |
| use crate::sync::spinlock::SpinLockGuard; |
| use crate::trace::trace_context_switch; |
| use crate::{Arch, Kernel}; |
| |
| mod algorithm; |
| mod locks; |
| mod priority; |
| pub mod priority_bitmask; |
| pub mod thread; |
| pub mod timer; |
| |
| pub use algorithm::RescheduleReason; |
| use algorithm::SchedulerAlgorithm; |
| pub use locks::{SchedLockGuard, WaitQueueLock, WaitQueueLockGuard}; |
| pub use priority::Priority; |
| use thread::*; |
| |
| const LOG_SCHEDULER_EVENTS: bool = false; |
| const WAIT_QUEUE_DEBUG: bool = false; |
| macro_rules! wait_queue_debug { |
| ($($args:expr),*) => {{ |
| log_if::debug_if!(WAIT_QUEUE_DEBUG, $($args),*) |
| }} |
| } |
| |
| // generic on `arch` instead of `kernel` as it appears in the `arch` interface. |
| pub struct ThreadLocalState<A: Arch> { |
| preempt_disable_count: A::AtomicUsize, |
| pub(crate) needs_reschedule: A::AtomicBool, |
| } |
| |
| impl<A: Arch> ThreadLocalState<A> { |
| #[must_use] |
| pub const fn new() -> Self { |
| Self { |
| preempt_disable_count: A::AtomicUsize::ZERO, |
| needs_reschedule: A::AtomicBool::FALSE, |
| } |
| } |
| } |
| |
| pub fn start_thread<K: Kernel>(kernel: K, mut thread: ForeignBox<Thread<K>>) -> ThreadHandle<K> { |
| log_if::info_if!( |
| LOG_SCHEDULER_EVENTS, |
| "Starting thread '{}' ({:#010x})", |
| thread.name as &str, |
| thread.id() as usize |
| ); |
| |
| pw_assert::assert!(thread.state == State::Initial); |
| |
| thread.state = State::Ready; |
| let thread_handle = thread.get_ref(kernel); |
| |
| let mut sched_state = kernel.get_scheduler().lock(kernel); |
| |
| sched_state |
| .algorithm |
| .schedule_thread(thread, RescheduleReason::Started); |
| |
| let _sched_state = sched_state.try_reschedule(kernel, RescheduleReason::Preempted); |
| |
| thread_handle |
| } |
| |
| pub fn initialize<K: Kernel>(kernel: K) { |
| let mut sched_state = kernel.get_scheduler().lock(kernel); |
| |
| // The kernel process needs to be initialized before any kernel threads so |
| // that they can properly be parented underneath it. |
| |
| // SAFETY: Per the contract in `SchedulerState` this is the one place that |
| // `kernel_process` is converted into a foreign box and its refcount is |
| // incremented. |
| unsafe { |
| let kernel_process = sched_state.kernel_process.get(); |
| (*kernel_process).state = ProcessState::Active; |
| (*kernel_process).manually_increment_ref_count(); |
| let kernel_process = ForeignBox::new(NonNull::new_unchecked(kernel_process)); |
| |
| sched_state.add_process_to_list(kernel_process); |
| } |
| } |
| |
| pub fn add_process<K: Kernel>(kernel: K, mut process: ForeignBox<Process<K>>) -> ProcessHandle<K> { |
| let mut sched_state = kernel.get_scheduler().lock(kernel); |
| let process_handle = ProcessHandle::new(NonNull::from(&mut *process), kernel); |
| |
| // The only way for a `ForeignBox<Process<K>>` to exist outside the scheduler, |
| // is for it to be newly created or to have been returned by the scheduler from a join. |
| // In both cases, the process is in the `Inactive` state. |
| pw_assert::assert!(process.state == ProcessState::Inactive); |
| process.state = ProcessState::Active; |
| |
| // SAFETY: We are taking ownership of the ForeignBox and interacting with the |
| // scheduler state which we have locked. |
| sched_state.add_process_to_list(process); |
| process_handle |
| } |
| |
| pub fn bootstrap_scheduler<K: Kernel>( |
| kernel: K, |
| preempt_guard: PreemptDisableGuard<K>, |
| mut thread: ForeignBox<Thread<K>>, |
| ) -> ! { |
| let mut sched_state = kernel.get_scheduler().lock(kernel); |
| |
| // TODO: assert that this is called exactly once at bootup to switch |
| // to this particular thread. |
| pw_assert::assert!(thread.state == State::Initial); |
| thread.state = State::Ready; |
| |
| sched_state |
| .algorithm |
| .schedule_thread(thread, RescheduleReason::Started); |
| |
| info!("Context switching to first thread"); |
| |
| // Special case where we're switching from a non-thread to something real |
| let mut temp_arch_thread_state = K::ThreadState::NEW; |
| sched_state.current_arch_thread_state = &raw mut temp_arch_thread_state; |
| |
| drop(preempt_guard); |
| |
| let _ = block( |
| kernel, |
| sched_state, |
| Thread::<K>::null_id(), |
| State::Terminated, |
| ); |
| pw_assert::panic!("Bootstrap scheduler returned unexpectedly"); |
| } |
| |
| pub struct PreemptDisableGuard<K: Kernel>(K); |
| |
| impl<K: Kernel> PreemptDisableGuard<K> { |
| pub fn new(kernel: K) -> Self { |
| let prev_count = kernel |
| .thread_local_state() |
| .preempt_disable_count |
| .fetch_add(1, core::sync::atomic::Ordering::SeqCst); |
| |
| // atomics have wrapping semantics so overflow is explicitly checked. |
| if prev_count == usize::MAX { |
| pw_assert::debug_panic!("PreemptDisableGuard: preempt_disable_count overflow") |
| } |
| |
| Self(kernel) |
| } |
| } |
| |
| impl<K: Kernel> Drop for PreemptDisableGuard<K> { |
| fn drop(&mut self) { |
| let prev_count = self |
| .0 |
| .thread_local_state() |
| .preempt_disable_count |
| .fetch_sub(1, core::sync::atomic::Ordering::SeqCst); |
| |
| if prev_count == 0 { |
| pw_assert::debug_panic!("PreemptDisableGuard: preempt_disable_count underflow") |
| } |
| } |
| } |
| |
| // Global scheduler state (single processor for now) |
| #[allow(dead_code)] |
| pub struct SchedulerState<K: Kernel> { |
| // The scheduler owns the kernel process from which all kernel threads |
| // are parented. |
| // |
| // SAFETY: `kernel_process` must be converted to a `ForeignBox` exactly |
| // once while the scheduler is initialized. This initialization must also |
| // increment the processes refcount. |
| kernel_process: UnsafeCell<Process<K>>, |
| |
| current_thread: Option<ForeignBox<Thread<K>>>, |
| current_arch_thread_state: *mut K::ThreadState, |
| process_list: ForeignList<Process<K>, ProcessListAdapter<K>>, |
| |
| /// The algorithm used for choosing the next thread to run. |
| algorithm: SchedulerAlgorithm<K>, |
| |
| termination_queue: ForeignList<Thread<K>, ThreadListAdapter<K>>, |
| } |
| |
| unsafe impl<K: Kernel> Sync for SchedulerState<K> {} |
| unsafe impl<K: Kernel> Send for SchedulerState<K> {} |
| |
| impl<K: Kernel> SchedulerState<K> { |
| #[allow(dead_code)] |
| #[allow(clippy::new_without_default)] |
| pub const fn new() -> Self { |
| #[cfg(feature = "user_space")] |
| static KERNEL_OBJECT_TABLE: NullObjectTable = NullObjectTable::new(); |
| Self { |
| kernel_process: UnsafeCell::new(Process::new( |
| "kernel", |
| <K::ThreadState as ThreadState>::MemoryConfig::KERNEL_THREAD_MEMORY_CONFIG, |
| // SAFETY: In the event of multiple scheduler objects, they will |
| // refer to the same, immutable, instance of a zero sized type. |
| #[cfg(feature = "user_space")] |
| unsafe { |
| ForeignBox::new(NonNull::from_ref(&KERNEL_OBJECT_TABLE)) |
| }, |
| )), |
| current_thread: None, |
| current_arch_thread_state: core::ptr::null_mut(), |
| process_list: ForeignList::new(), |
| algorithm: SchedulerAlgorithm::new(), |
| termination_queue: ForeignList::new(), |
| } |
| } |
| |
| /// Returns a pointer to the current threads architecture thread state |
| /// struct. |
| /// |
| /// Only meant to be called from within an architecture implementation. |
| /// |
| /// # Safety |
| /// |
| /// Must be called with the scheduler lock held. Pointer is only valid |
| /// while the current threads remains the current thread. |
| #[allow(dead_code)] |
| #[doc(hidden)] |
| pub unsafe fn get_current_arch_thread_state(&mut self) -> *mut K::ThreadState { |
| self.current_arch_thread_state |
| } |
| |
| fn reschedule_current_thread(&mut self, reason: RescheduleReason) -> (usize, State) { |
| let Some(mut current_thread) = self.current_thread.take() else { |
| return (Thread::<K>::null_id(), State::New); |
| }; |
| |
| let current_thread_id = current_thread.id(); |
| current_thread.state = State::Ready; |
| self.algorithm.schedule_thread(current_thread, reason); |
| (current_thread_id, State::Ready) |
| } |
| |
| fn set_current_thread(&mut self, thread: ForeignBox<Thread<K>>) { |
| self.current_arch_thread_state = thread.arch_thread_state.get(); |
| self.current_thread = Some(thread); |
| } |
| |
| pub fn current_thread_id(&self) -> usize { |
| match &self.current_thread { |
| Some(thread) => thread.id(), |
| None => Thread::<K>::null_id(), |
| } |
| } |
| |
| #[allow(dead_code)] |
| pub fn current_thread_name(&self) -> &'static str { |
| match &self.current_thread { |
| Some(thread) => thread.name, |
| None => "none", |
| } |
| } |
| |
| pub fn take_current_thread(&mut self) -> ForeignBox<Thread<K>> { |
| let Some(thread) = self.current_thread.take() else { |
| pw_assert::panic!("No current thread"); |
| }; |
| thread |
| } |
| |
| #[allow(dead_code)] |
| pub fn current_thread(&self) -> &Thread<K> { |
| let Some(thread) = &self.current_thread else { |
| pw_assert::panic!("No current thread"); |
| }; |
| thread |
| } |
| |
| #[allow(dead_code)] |
| pub fn current_thread_mut(&mut self) -> &mut Thread<K> { |
| let Some(thread) = &mut self.current_thread else { |
| pw_assert::panic!("No current thread"); |
| }; |
| thread |
| } |
| |
| pub fn current_process_handle(&self) -> ProcessHandle<K> { |
| self.current_thread().process() |
| } |
| |
| /// # Safety |
| /// |
| /// This method has the same safety preconditions as |
| /// [`UnsafeList::push_front_unchecked`]. |
| #[allow(dead_code)] |
| #[inline(never)] |
| pub fn add_process_to_list(&mut self, process: ForeignBox<Process<K>>) { |
| self.process_list.push_front(process); |
| } |
| |
| fn get_kernel_process_handle(&self, kernel: K) -> ProcessHandle<K> { |
| let Some(process) = NonNull::new(self.kernel_process.get()) else { |
| pw_assert::panic!("Kernel process is null"); |
| }; |
| ProcessHandle::new(process, kernel) |
| } |
| |
| #[allow(dead_code)] |
| pub fn dump(&self, kernel: K) { |
| info!("List of all threads:"); |
| let _ = self |
| .process_list |
| .for_each(|process| -> core::result::Result<(), ()> { |
| process.dump(kernel); |
| Ok(()) |
| }); |
| } |
| } |
| |
| pub enum JoinResult<K: Kernel> { |
| Joined(ForeignBox<Thread<K>>, ExitStatus), |
| Err { |
| error: Error, |
| thread: ThreadHandle<K>, |
| }, |
| } |
| |
| pub enum TryJoinResult<K: Kernel> { |
| Wait(ThreadHandle<K>), |
| Joined(ForeignBox<Thread<K>>, ExitStatus), |
| Err { |
| error: Error, |
| thread: ThreadHandle<K>, |
| }, |
| } |
| |
| pub enum ProcessJoinResult<K: Kernel> { |
| Joined(ForeignBox<Process<K>>, ExitStatus), |
| Err { |
| error: Error, |
| process: ProcessHandle<K>, |
| }, |
| } |
| |
| pub enum ProcessTryJoinResult<K: Kernel> { |
| Wait(ProcessHandle<K>), |
| Joined(ForeignBox<Process<K>>, ExitStatus), |
| Err { |
| error: Error, |
| process: ProcessHandle<K>, |
| }, |
| } |
| |
| // All thread lifecycle management is encapsulated in this `impl` block which |
| // ensures the scheduler lock is held while the state is manipulated. |
| impl<K: Kernel> SpinLockGuard<'_, K, SchedulerState<K>> { |
| /// Reschedule if preemption is enabled |
| #[must_use] |
| fn try_reschedule(mut self, kernel: K, reason: RescheduleReason) -> Self { |
| if kernel |
| .thread_local_state() |
| .preempt_disable_count |
| .load(Ordering::SeqCst) |
| == 1 |
| { |
| let (current_thread_id, current_thread_state) = self.reschedule_current_thread(reason); |
| let (guard, _switched) = |
| context_switch(kernel, self, current_thread_id, current_thread_state); |
| guard |
| } else { |
| kernel |
| .thread_local_state() |
| .needs_reschedule |
| .store(true, Ordering::SeqCst); |
| self |
| } |
| } |
| |
| fn thread_initialize( |
| &self, |
| _kernel: K, |
| thread: &mut Thread<K>, |
| mut process_handle: ProcessHandle<K>, |
| ) -> Result<()> { |
| thread.state = State::Initial; |
| |
| // SAFETY: *process is only accessed with the scheduler lock held. |
| unsafe { |
| let process = process_handle.process.as_mut(); |
| // Assert that the parent process is active. |
| if process.state != ProcessState::Active { |
| return Err(Error::FailedPrecondition); |
| } |
| // Add thread to processes thread list. |
| process.add_to_thread_list(thread); |
| } |
| |
| thread.set_process(process_handle); |
| Ok(()) |
| } |
| |
| pub fn thread_initialize_kernel<A: ThreadArg>( |
| &mut self, |
| kernel: K, |
| thread: &mut Thread<K>, |
| entry_point: fn(K, A), |
| arg: A, |
| ) { |
| pw_assert::assert!(thread.state == State::New); |
| let process_handle = self.get_kernel_process_handle(kernel); |
| let args = (entry_point as usize, kernel.into_usize(), arg.into_usize()); |
| |
| thread.stack.initialize(); |
| |
| unsafe { |
| (*thread.arch_thread_state.get()).initialize_kernel_state( |
| thread.stack, |
| &raw const process_handle.process.as_ref().memory_config, |
| Thread::<K>::trampoline::<A>, |
| args, |
| ); |
| } |
| |
| // The kernel process is bootstrapped as active, and any attempt to |
| // terminate it will panic. Therefore, this initialization must always succeed. |
| pw_assert::assert!( |
| self.thread_initialize(kernel, thread, process_handle) |
| .is_ok() |
| ); |
| } |
| |
| pub fn thread_reinitialize_kernel<A: ThreadArg>( |
| &mut self, |
| kernel: K, |
| thread: &mut Thread<K>, |
| entry_point: fn(K, A), |
| arg: A, |
| ) { |
| pw_assert::assert!(thread.state == State::Joined); |
| let process_handle = self.get_kernel_process_handle(kernel); |
| let args = (entry_point as usize, kernel.into_usize(), arg.into_usize()); |
| |
| thread.stack.initialize(); |
| |
| // SAFETY: The scheduler guarantees that a process' `memory_config` |
| // remains valid while it has any child threads ensuring that the |
| // `memory_config` is valid for the lifetime of the thread. |
| unsafe { |
| (*thread.arch_thread_state.get()).initialize_kernel_state( |
| thread.stack, |
| &raw const process_handle.process.as_ref().memory_config, |
| Thread::<K>::trampoline::<A>, |
| args, |
| ); |
| } |
| |
| // The kernel process is bootstrapped as active, and any attempt to |
| // terminate it will panic. Therefore, this initialization must always succeed. |
| pw_assert::assert!( |
| self.thread_initialize(kernel, thread, process_handle) |
| .is_ok() |
| ); |
| } |
| |
| #[cfg(feature = "user_space")] |
| /// # Safety |
| /// It is up to the caller to ensure that *process is valid. |
| /// Initialize the mutable parts of the non privileged thread, must be |
| /// called once per thread prior to starting it |
| #[allow(clippy::too_many_arguments)] |
| pub unsafe fn thread_initialize_non_priv( |
| &mut self, |
| kernel: K, |
| thread: &mut Thread<K>, |
| process_handle: ProcessHandle<K>, |
| initial_pc: usize, |
| initial_sp: usize, |
| args: (usize, usize, usize), |
| ) -> Result<()> { |
| pw_assert::assert!(matches!(thread.state, State::New | State::Joined)); |
| |
| thread.stack.initialize(); |
| |
| // SAFETY: The scheduler guarantees that a process' `memory_config` |
| // remains valid while it has any child threads. This is because |
| // each `Thread` holds a `ProcessHandle` to its parent process, ensuring |
| // the `Process` and its `memory_config` are valid for the lifetime |
| // of the thread. |
| #[cfg(feature = "user_space")] |
| unsafe { |
| (*thread.arch_thread_state.get()).initialize_user_state( |
| thread.stack, |
| &raw const process_handle.process.as_ref().memory_config, |
| initial_sp, |
| initial_pc, |
| args, |
| )?; |
| } |
| |
| self.thread_initialize(kernel, thread, process_handle) |
| } |
| |
| fn thread_get_state(&self, thread: &ThreadHandle<K>) -> thread::State { |
| // SAFETY: we have exclusive access to thread by virtue of the scheduler lock being held. |
| unsafe { thread.thread.as_ref().state } |
| } |
| |
| /// Request termination of the referenced thread. |
| /// |
| /// This is an ASYNCHRONOUS operation. The thread is marked as terminating, |
| /// but it may not exit immediately. |
| pub fn thread_terminate( |
| &mut self, |
| thread_handle: &mut ThreadHandle<K>, |
| status: ExitStatus, |
| ) -> Result<()> { |
| // SAFETY: we have exclusive access to thread by virtue of the scheduler lock being held. |
| let thread = unsafe { thread_handle.thread.as_mut() }; |
| self.thread_terminate_internal(thread, status) |
| } |
| |
| pub(crate) fn thread_terminate_internal( |
| &mut self, |
| thread: &mut Thread<K>, |
| status: ExitStatus, |
| ) -> Result<()> { |
| thread.exit_status = Some(status); |
| match &mut thread.owner { |
| ThreadOwner::None => Err(Error::InvalidArgument), |
| ThreadOwner::Scheduler => { |
| // Mark thread as terminating so that it can clean itself up. |
| thread.terminating = true; |
| |
| // Should we signal the scheduling algorithm to give it the |
| // chance to move the thread to the front of the queue? |
| |
| Ok(()) |
| } |
| ThreadOwner::WaitQueue { queue, wait_type } => { |
| // First, mark the thread as being in the terminated state. |
| // SAFETY: Threads access is guarded by the scheduler lock. |
| thread.terminating = true; |
| |
| // Second, wake the thread if it is in an interruptible wait. |
| if *wait_type == WaitType::Interruptible { |
| // SAFETY: All thread and wait queue accesses are guarded by |
| // the scheduler lock and the thread is guaranteed to be in |
| // the queue by the fact that the queue is the ThreadOwner. |
| let ret = unsafe { |
| queue |
| .as_mut() |
| .queue |
| .remove_element(NonNull::from_ref(thread)) |
| }; |
| let Some(thread_box) = ret else { |
| // The a thread's owner is `ThreadOwner::WaitQueue` it will always |
| // be in the `WaitQueue`'s queue. A failure to remove here would |
| // be the result of a bug or corruption. |
| pw_assert::panic!("Could not remove thread from its owning WaitQueue"); |
| }; |
| thread.state = State::Ready; |
| self.algorithm |
| .schedule_thread(thread_box, RescheduleReason::Woken); |
| } |
| Ok(()) |
| } |
| } |
| } |
| |
| fn thread_signal_join(mut self, thread: &mut ThreadHandle<K>) -> Self { |
| if let Some(signaler) = unsafe { thread.thread.as_mut() }.join_event.take() { |
| self = signaler.signal_locked(self); |
| } |
| |
| self |
| } |
| |
| fn thread_try_join( |
| mut self, |
| kernel: K, |
| mut thread_handle: ThreadHandle<K>, |
| signaler: Option<EventSignaler<K>>, |
| ) -> (Self, TryJoinResult<K>) { |
| // SAFETY: Exclusive access to thread is guaranteed by scheduler lock |
| let thread = unsafe { thread_handle.thread.as_mut() }; |
| |
| // If the thread is terminated and `thread_handle` is the singular reference to it, |
| // the thread is terminated |
| if thread.state == State::Terminated && thread.ref_count.load(Ordering::SeqCst) == 1 { |
| // SAFETY: Threads only enter the Terminated state through `exit_thread` which adds them |
| // to the termination_queue. Join is the only method by which they are removed from the |
| // queue and the state is set to the Joined state when that happens. Therefore, if the |
| // thread is in the Terminated state, it *must* be in the termination_queue. |
| thread.state = State::Joined; |
| |
| self = self.thread_remove_from_parent_process(kernel, thread); |
| |
| // Reset thread state. |
| thread.terminating = false; |
| |
| // SAFETY: Thread is guaranteed to be in the termination queue by |
| // the fact that it is in the terminated state. |
| #[allow(unused_mut)] |
| let mut thread = unsafe { |
| self.termination_queue |
| .remove_element(thread_handle.thread) |
| .unwrap_unchecked() |
| }; |
| |
| // Clear JOINABLE signal. |
| #[cfg(feature = "user_space")] |
| if let Some(object) = thread.object.take() { |
| self = object.signal_locked(kernel, self, |s| s - syscall_defs::Signals::JOINABLE); |
| } |
| |
| let Some(status) = thread.exit_status.take() else { |
| pw_assert::panic!("Thread joined with no exit status set"); |
| }; |
| return (self, TryJoinResult::Joined(thread, status)); |
| } |
| |
| // Only one call to join is allowed to wait thread termination |
| if thread.join_event.is_some() { |
| return ( |
| self, |
| TryJoinResult::Err { |
| error: Error::AlreadyExists, |
| thread: thread_handle, |
| }, |
| ); |
| } |
| |
| // Register the signaler and return None indicating the thread is not yet |
| // joinable. |
| if let Some(signaler) = signaler { |
| thread.join_event = Some(signaler); |
| } |
| (self, TryJoinResult::Wait(thread_handle)) |
| } |
| |
| fn thread_cancel_try_join(&mut self, thread_handle: &mut ThreadHandle<K>) { |
| // SAFETY: Exclusive access to thread is guaranteed by scheduler lock |
| let thread = unsafe { thread_handle.thread.as_mut() }; |
| pw_assert::assert!(thread.join_event.is_some()); |
| thread.join_event = None; |
| } |
| |
| /// Performs the low-level operations to exit the current thread. |
| /// |
| /// This method handles the actual state transitions and cleanup for thread exit. |
| /// It expects the `exit_status` to have been set by the caller if necessary. |
| /// |
| /// # Differences from other exit/terminate functions: |
| /// - **`exit_thread`**: A high-level wrapper that takes an explicit status, |
| /// sets it on the thread, and then calls this method. It does not return. |
| /// - **`terminate`**: Asynchronous requests to terminate a thread/process. |
| /// They set the target's state to terminating but rely on `thread_exit` |
| /// being called later to perform the actual exit. |
| pub(crate) fn thread_exit(mut self, kernel: K) { |
| let mut current_thread = self.take_current_thread(); |
| let current_thread_id = current_thread.id(); |
| |
| pw_assert::assert!(matches!(current_thread.owner, ThreadOwner::Scheduler)); |
| |
| log_if::info_if!( |
| LOG_SCHEDULER_EVENTS, |
| "Exiting thread '{}' ({:#010x})", |
| current_thread.name as &str, |
| current_thread.id() as usize |
| ); |
| |
| // Since the current thread has already been removed from the scheduler, |
| // a PreemptDisableGuard is taken to prevent the event wait from doing a |
| // reschedule. |
| let guard = PreemptDisableGuard::new(kernel); |
| if let Some(signaler) = current_thread.as_mut().join_event.take() { |
| self = signaler.signal_locked(self); |
| } |
| drop(guard); |
| current_thread.state = State::Terminated; |
| current_thread.terminating = true; |
| |
| #[cfg(feature = "user_space")] |
| { |
| let process_handle = current_thread.process(); |
| let thread_object = current_thread.object.as_ref().cloned(); |
| |
| // It is important that we add the thread to the termination queue |
| // before possibly auto-joining the thread as join expects the thread |
| // to be in termination queue at that time. |
| self.termination_queue.push_back(current_thread); |
| |
| if let Some(thread_object) = thread_object { |
| // Ensure we set them as JOINABLE again if it transitions to Terminated |
| self = thread_object |
| .signal_locked(kernel, self, |s| s | syscall_defs::Signals::JOINABLE); |
| |
| if process_handle.get_state_locked(&self) == ProcessState::Terminating { |
| (self, _) = thread_object.join_locked(kernel, self); |
| } |
| } |
| self = process_handle.drop_locked(kernel, self); |
| } |
| #[cfg(not(feature = "user_space"))] |
| { |
| self.termination_queue.push_back(current_thread); |
| } |
| |
| let _ = context_switch(kernel, self, current_thread_id, State::Terminated); |
| |
| // On some systems like M-profile ARM context switch is deferred if called |
| // from an interrupt so this function may return. |
| } |
| |
| fn thread_is_terminating(&self, thread: &ThreadHandle<K>) -> bool { |
| // SAFETY: Exclusive access to thread is guaranteed by scheduler lock |
| unsafe { thread.thread.as_ref().terminating } |
| } |
| |
| fn thread_remove_from_parent_process(mut self, kernel: K, thread: &mut Thread<K>) -> Self { |
| let mut process_handle = thread.take_process(); |
| |
| // SAFETY: Thread is guaranteed to be in the process because the process |
| // was just taken from the thread. |
| self = unsafe { self.process_remove_from_thread_list(kernel, &mut process_handle, thread) }; |
| |
| // ProcessHandle's drop takes the scheduler lock. Since that is already |
| // held, `drop_locked()` needs to be used instead to avoid recursively |
| // locking the scheduler lock. |
| process_handle.drop_locked(kernel, self) |
| } |
| } |
| |
| impl<K: Kernel> SpinLockGuard<'_, K, SchedulerState<K>> { |
| /// Request termination of the referenced process and all its threads. |
| /// |
| /// This is an ASYNCHRONOUS operation. The process and its threads are marked |
| /// as terminating, but they may not exit immediately. |
| pub fn process_terminate( |
| &mut self, |
| _kernel: K, |
| process_handle: &ProcessHandle<K>, |
| status: ExitStatus, |
| ) { |
| let mut ptr = process_handle.process; |
| pw_assert::assert!( |
| ptr.as_ptr() != self.kernel_process.get(), |
| "Tried to terminate the kernel process" |
| ); |
| // SAFETY: `process_handle` guarantees that the pointed-to process is alive. |
| // Holding the scheduler lock (via `&mut self`) guarantees exclusive mutable |
| // access. The returned mutable reference `proc` is local to this function |
| // and does not outlive the lock. |
| let proc = unsafe { ptr.as_mut() }; |
| |
| if proc.state != ProcessState::Active { |
| return; |
| } |
| |
| // Set state to terminating |
| proc.state = ProcessState::Terminating; |
| proc.exit_status = Some(status); |
| |
| // Terminate all threads |
| unsafe { |
| proc.thread_list.filter(|thread| { |
| log_if::info_if!( |
| LOG_SCHEDULER_EVENTS, |
| "Terminating thread '{}'", |
| thread.name as &str |
| ); |
| let _ = self.thread_terminate_internal(thread, ExitStatus::ProcessTerminated); |
| true // Keep in list |
| }); |
| } |
| } |
| |
| #[must_use] |
| pub fn process_signal_join(mut self, kernel: K, process: &mut ProcessHandle<K>) -> Self { |
| // SAFETY: inner process is protected by the scheduler lock and the |
| // mutable reference does not live beyond the unsafe taking the signaler |
| let signaler = unsafe { process.process.as_mut().join_event.take() }; |
| if let Some(signaler) = signaler { |
| self = signaler.signal_locked(self); |
| } |
| |
| // Raise JOINALBE signal. |
| #[cfg(feature = "user_space")] |
| if let Some(object) = unsafe { process.process.as_ref().object.as_ref() } { |
| self = object.signal_locked(kernel, self, |s| s | syscall_defs::Signals::JOINABLE); |
| } |
| #[cfg(not(feature = "user_space"))] |
| { |
| // `kernel` is only used above when `user_space` is enabled. |
| // Reference it here to suppress warning. |
| let _ = kernel; |
| } |
| self |
| } |
| |
| pub fn process_try_join( |
| mut self, |
| kernel: K, |
| mut process_handle: ProcessHandle<K>, |
| signaler: Option<EventSignaler<K>>, |
| ) -> (Self, ProcessTryJoinResult<K>) { |
| let process = unsafe { process_handle.process.as_ref() }; |
| |
| // If the process is terminated and `process_handle` is the singular reference to it, |
| // the process is terminated |
| if process.state == ProcessState::Terminated |
| && process.ref_count.load(Ordering::SeqCst) == 1 |
| { |
| // SAFETY: The process is guaranteed to be in the process list |
| // because it was passed in by ProcessHandle and the scheduler is |
| // the only creator of those. |
| #[allow(unused_mut)] |
| let mut process_box = unsafe { |
| self.process_list |
| .remove_element(process_handle.process) |
| .unwrap_unchecked() |
| }; |
| process_box.state = ProcessState::Inactive; |
| |
| // Clear JOINABLE signal. |
| #[cfg(feature = "user_space")] |
| if let Some(object) = process_box.object.take() { |
| self = object.signal_locked(kernel, self, |s| s - syscall_defs::Signals::JOINABLE); |
| } |
| #[cfg(not(feature = "user_space"))] |
| { |
| // `kernel` is only used above when `user_space` is enabled. |
| // Reference it here to suppress warning. |
| let _ = kernel; |
| } |
| let Some(status) = process_box.exit_status.take() else { |
| pw_assert::panic!("Process joined with no exit status set"); |
| }; |
| return (self, ProcessTryJoinResult::Joined(process_box, status)); |
| } |
| |
| // Only one call to join is allowed to wait for process termination. If |
| // another caller is waiting, return an error. |
| if process.join_event.is_some() { |
| return ( |
| self, |
| ProcessTryJoinResult::Err { |
| error: Error::AlreadyExists, |
| process: process_handle, |
| }, |
| ); |
| } |
| |
| // Register the signaler and return None indicating the process is not yet |
| // joinable. |
| // |
| // SAFETY: Process mutability guarded by scheduler lock. |
| if let Some(signaler) = signaler { |
| unsafe { process_handle.process.as_mut().join_event = Some(signaler) }; |
| } |
| (self, ProcessTryJoinResult::Wait(process_handle)) |
| } |
| |
| pub fn process_cancel_try_join(&mut self, process_handle: &mut ProcessHandle<K>) { |
| let process = unsafe { process_handle.process.as_mut() }; |
| pw_assert::assert!(process.join_event.is_some()); |
| process.join_event = None; |
| } |
| |
| /// # Safety |
| /// |
| /// Caller must ensure that the thread is already in the processes thread list. |
| unsafe fn process_remove_from_thread_list( |
| mut self, |
| kernel: K, |
| process_handle: &mut ProcessHandle<K>, |
| thread: &mut Thread<K>, |
| ) -> Self { |
| // SAFETY: Scheduler lock is held so it is safe to manipulate the process |
| // list and the caller guarantees that the thread is in said list so it |
| // is safe to remove it. |
| |
| // SAFETY: The scheduler lock ensure mutual excludsion and the mutable |
| // reference does not live out side this function. |
| let process = unsafe { process_handle.process.as_mut() }; |
| |
| // SAFETY: Caller guarantees that the thread is in the process' thread list |
| // and the list is protected by the scheduler lock. |
| let empty = unsafe { |
| process |
| .thread_list |
| .unlink_element_unchecked(NonNull::from(thread)); |
| |
| process.thread_list.is_empty() |
| }; |
| |
| if process.state == ProcessState::Terminating && empty { |
| process.state = ProcessState::Terminated; |
| self = self.process_signal_join(kernel, process_handle); |
| } |
| self |
| } |
| |
| pub fn process_add_thread( |
| &self, |
| process_handle: &mut ProcessHandle<K>, |
| thread_handle: &mut ThreadHandle<K>, |
| ) -> Result<()> { |
| // SAFETY: Scheduler lock is held ensuring exclusive access to process_handle |
| // and thread_handle as well as fulfilling the precondition of `add_to_thread_list`. |
| unsafe { |
| let process = process_handle.process.as_mut(); |
| if process.state != ProcessState::Active { |
| return Err(Error::FailedPrecondition); |
| } |
| process.add_to_thread_list(thread_handle.thread.as_mut()); |
| } |
| Ok(()) |
| } |
| } |
| |
| #[must_use] |
| fn block<K: Kernel>( |
| kernel: K, |
| sched_state: SpinLockGuard<K, SchedulerState<K>>, |
| current_thread_id: usize, |
| current_thread_state: State, |
| ) -> SpinLockGuard<K, SchedulerState<K>> { |
| let (sched_state, switched) = |
| context_switch(kernel, sched_state, current_thread_id, current_thread_state); |
| pw_assert::assert!(switched, "Blocking requires a context switch"); |
| sched_state |
| } |
| |
| #[must_use] |
| fn context_switch<K: Kernel>( |
| kernel: K, |
| mut sched_state: SpinLockGuard<K, SchedulerState<K>>, |
| current_thread_id: usize, |
| current_thread_state: State, |
| ) -> (SpinLockGuard<K, SchedulerState<K>>, bool) { |
| // Caller to reschedule is responsible for removing current thread and |
| // put it in the correct run/wait queue. |
| pw_assert::assert!(sched_state.current_thread.is_none()); |
| |
| // Validate that the only mechanism disabling preemption is the scheduler |
| // lock which is passed in to this function. |
| pw_assert::assert!( |
| kernel |
| .thread_local_state() |
| .preempt_disable_count |
| .load(Ordering::SeqCst) |
| <= 1, |
| "Preemption count greater than 1" |
| ); |
| |
| // Pop a new thread off the head of the run queue. |
| // At the moment cannot handle an empty queue, so will panic in that case. |
| // TODO: Implement either an idle thread or a special idle routine for that case. |
| let Some(mut new_thread) = sched_state.algorithm.get_next_thread() else { |
| pw_assert::panic!("Run queue empty: no runnable threads (idle thread missing or blocked?)"); |
| }; |
| |
| pw_assert::assert!( |
| new_thread.state == State::Ready, |
| "<{}>({:#010x}) not ready", |
| new_thread.name as &str, |
| new_thread.id() as usize, |
| ); |
| new_thread.state = State::Running; |
| |
| if current_thread_id == new_thread.id() { |
| sched_state.current_thread = Some(new_thread); |
| return (sched_state, false); |
| } |
| |
| trace_context_switch( |
| kernel, |
| current_thread_id, |
| new_thread.id(), |
| current_thread_state, |
| ); |
| |
| let old_thread_state = sched_state.current_arch_thread_state; |
| let new_thread_state = new_thread.arch_thread_state.get(); |
| sched_state.set_current_thread(new_thread); |
| unsafe { kernel.context_switch(sched_state, old_thread_state, new_thread_state) } |
| } |
| |
| /// If try_reschedule() was called while preemption was disabled, try to |
| /// reschedule again after the preempt guard is dropped. |
| pub fn try_deferred_reschedule<K: Kernel>(kernel: K, reason: RescheduleReason) { |
| if Ok(true) |
| == kernel |
| .thread_local_state() |
| .needs_reschedule |
| .compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst) |
| { |
| let _ = kernel |
| .get_scheduler() |
| .lock(kernel) |
| .try_reschedule(kernel, reason); |
| } |
| } |
| |
| #[allow(dead_code)] |
| pub fn yield_timeslice<K: Kernel>(kernel: K) { |
| let sched_state = kernel.get_scheduler().lock(kernel); |
| log_if::info_if!( |
| LOG_SCHEDULER_EVENTS, |
| "Yielding thread '{}' ({:#010x})", |
| sched_state.current_thread_name() as &str, |
| sched_state.current_thread_id() as usize |
| ); |
| let _ = sched_state.try_reschedule(kernel, RescheduleReason::Ticked); |
| } |
| |
| // Tick that is called from a timer handler. The scheduler will evaluate if the current thread |
| // should be preempted or not |
| #[allow(dead_code)] |
| pub fn tick<K: Kernel>( |
| kernel: K, |
| now: Instant<K::Clock>, |
| mut guard: crate::interrupt_controller::InterruptGuard<K>, |
| ) -> crate::interrupt_controller::InterruptGuard<K> { |
| log_if::info_if!( |
| LOG_SCHEDULER_EVENTS, |
| "Scheduler tick at {}", |
| now.ticks() as u64 |
| ); |
| pw_assert::assert!( |
| kernel |
| .thread_local_state() |
| .preempt_disable_count |
| .load(Ordering::SeqCst) |
| >= 1, |
| "scheduler::tick() called with preemption enabled" |
| ); |
| |
| // In lieu of a proper timer interface, the scheduler needs to be robust |
| // to timer ticks arriving before it is initialized. |
| if kernel.get_scheduler().lock(kernel).current_thread.is_none() { |
| return guard; |
| } |
| |
| timer::process_queue(kernel, now); |
| |
| guard.set_reschedule_reason(RescheduleReason::Ticked); |
| let _ = kernel |
| .get_scheduler() |
| .lock(kernel) |
| .try_reschedule(kernel, RescheduleReason::Ticked); |
| |
| guard |
| } |
| |
| /// Exit the currently running thread. |
| /// |
| /// The thread will enter `State::Terminated`, wait for all outstanding |
| /// references to be dropped, then wait to be joined. |
| #[allow(dead_code)] |
| /// Break the global scheduler lock. |
| /// |
| /// This method is used to forcibly release the scheduler lock without dropping the |
| /// guard. This is unsafe because it breaks the lock invariants and can |
| /// lead to data corruption if not used carefully. |
| /// |
| /// # Safety |
| /// This method should only be called on thread start. |
| /// The caller must ensure that breaking the lock will not cause data corruption. |
| pub unsafe fn break_scheduler_lock<K: Kernel>(kernel: K) { |
| unsafe { |
| kernel.get_scheduler().break_lock(); |
| } |
| } |
| |
| pub fn exit_thread<K: Kernel>(kernel: K, status: ExitStatus) -> ! { |
| let mut sched = kernel.get_scheduler().lock(kernel); |
| let current_thread = sched.current_thread_mut(); |
| current_thread.exit_status = Some(status); |
| sched.thread_exit(kernel); |
| |
| // This thread should never get scheduled again at this point. This assert |
| // firing indicates a scheduler or context switch bug. |
| pw_assert::panic!("thread_exit returned unexpectedly"); |
| } |
| |
| /// Handle a terminal exception in the current thread. |
| /// |
| /// This should only be called from an architecture specific exception handler. |
| /// An InterruptGuard is passed in and out in order to allow it to take |
| /// architecturally specific action for thread termination. |
| /// |
| /// If the exception occurred in user mode, the process will be terminated. |
| /// If the exception occurred in kernel mode, the kernel will panic. |
| pub fn handle_terminal_exception<K: Kernel>( |
| kernel: K, |
| guard: crate::interrupt_controller::InterruptGuard<K>, |
| ) -> crate::interrupt_controller::InterruptGuard<K> { |
| if !guard.from_userspace() { |
| pw_assert::panic!("Terminal exception in kernel mode"); |
| } |
| |
| let mut sched = kernel.get_scheduler().lock(kernel); |
| let process_handle = sched.current_thread().process(); |
| sched.process_terminate(kernel, &process_handle, ExitStatus::UnhandledException(0)); |
| |
| guard |
| } |
| |
| pub fn sleep_until<K: Kernel>(kernel: K, deadline: Instant<K::Clock>) -> Result<()> { |
| let wait_queue = WaitQueueLock::new(kernel, ()); |
| |
| let (_, ret) = wait_queue |
| .lock() |
| .wait_until(WaitType::Interruptible, deadline); |
| match ret { |
| // DeadlineExceeded is the expected result so return OK in that case. |
| Err(Error::DeadlineExceeded) => Ok(()), |
| |
| // In all other cases, pass on the return value. |
| _ => ret, |
| } |
| } |
| |
| pub struct WaitQueue<K: Kernel> { |
| queue: ForeignList<Thread<K>, ThreadListAdapter<K>>, |
| } |
| |
| unsafe impl<K: Kernel> Sync for WaitQueue<K> {} |
| unsafe impl<K: Kernel> Send for WaitQueue<K> {} |
| |
| impl<K: Kernel> WaitQueue<K> { |
| #[allow(dead_code, clippy::new_without_default)] |
| #[must_use] |
| pub const fn new() -> Self { |
| Self { |
| queue: ForeignList::new(), |
| } |
| } |
| } |
| |
| #[derive(Clone, Copy, Eq, PartialEq)] |
| #[repr(u8)] |
| pub enum WaitType { |
| // These variant values must mirror the values in thread `State` so conversions |
| // from WaitType to State can be optimized out. |
| Interruptible = 6, |
| NonInterruptible = 7, |
| } |
| |
| const _: () = assert!(WaitType::Interruptible as u8 == State::WaitingInterruptible as u8); |
| const _: () = assert!(WaitType::NonInterruptible as u8 == State::WaitingNonInterruptible as u8); |
| |
| impl From<WaitType> for State { |
| fn from(value: WaitType) -> Self { |
| // Note: Since the values align (see above) this is optimized out. |
| match value { |
| WaitType::Interruptible => State::WaitingInterruptible, |
| WaitType::NonInterruptible => State::WaitingNonInterruptible, |
| } |
| } |
| } |
| |
| #[derive(Clone, Copy, Eq, PartialEq)] |
| pub enum WakeResult { |
| Woken, |
| QueueEmpty, |
| } |
| |
| impl<K: Kernel> SchedLockGuard<'_, K, WaitQueue<K>> { |
| fn add_to_queue_and_reschedule( |
| mut self, |
| mut thread: ForeignBox<Thread<K>>, |
| wait_type: WaitType, |
| ) -> Self { |
| let current_thread_id = thread.id(); |
| let current_thread_name = thread.name; |
| let new_state = wait_type.into(); |
| thread.state = new_state; |
| thread.owner = ThreadOwner::WaitQueue { |
| queue: NonNull::from_ref(&self), |
| wait_type, |
| }; |
| self.queue.push_back(thread); |
| wait_queue_debug!( |
| "WaitQueue: thread '{}' ({:#010x}) rescheduling", |
| current_thread_name as &str, |
| current_thread_id as usize |
| ); |
| self.block(current_thread_id, new_state) |
| } |
| |
| // Safety: |
| // Caller guarantees that thread is non-null, valid, and process_timeout |
| // has exclusive access to `waiting_thread`. |
| unsafe fn process_timeout(&mut self, waiting_thread: *mut Thread<K>) -> Option<Error> { |
| if !(unsafe { (*waiting_thread).state }.is_waiting()) { |
| // Thread has already been woken. |
| return None; |
| } |
| |
| let Some(mut thread) = (unsafe { |
| self.queue |
| .remove_element(NonNull::new_unchecked(waiting_thread)) |
| }) else { |
| pw_assert::panic!("Thread no longer in wait queue"); |
| }; |
| |
| wait_queue_debug!( |
| "WaitQueue: timeout for thread '{}' ({:#010x})", |
| thread.name as &str, |
| thread.id() as usize |
| ); |
| thread.state = State::Ready; |
| self.sched_mut() |
| .algorithm |
| .schedule_thread(thread, RescheduleReason::Woken); |
| Some(Error::DeadlineExceeded) |
| } |
| |
| #[allow(clippy::must_use_candidate)] |
| pub fn wake_one(mut self) -> (Self, WakeResult) { |
| let Some(mut thread) = self.queue.pop_head() else { |
| return (self, WakeResult::QueueEmpty); |
| }; |
| wait_queue_debug!( |
| "WaitQueue: waking thread '{}' ({:#010x})", |
| thread.name as &str, |
| thread.id() as usize |
| ); |
| thread.state = State::Ready; |
| self.sched_mut() |
| .algorithm |
| .schedule_thread(thread, RescheduleReason::Woken); |
| |
| ( |
| self.try_reschedule(RescheduleReason::Preempted), |
| WakeResult::Woken, |
| ) |
| } |
| |
| #[allow(clippy::return_self_not_must_use, clippy::must_use_candidate)] |
| pub fn wake_all(mut self) -> Self { |
| loop { |
| let result; |
| (self, result) = self.wake_one(); |
| if result == WakeResult::QueueEmpty { |
| return self; |
| } |
| } |
| } |
| |
| #[allow(clippy::return_self_not_must_use, clippy::must_use_candidate)] |
| pub fn wait(mut self, wait_type: WaitType) -> (Self, Result<()>) { |
| // If the current thread is terminating and this is an interruptible wait, |
| // return early with an error. |
| if self.sched().current_thread().terminating && wait_type == WaitType::Interruptible { |
| return (self, Err(Error::Cancelled)); |
| } |
| |
| let thread = self.sched_mut().take_current_thread(); |
| wait_queue_debug!( |
| "WaitQueue: thread '{}' ({:#010x}) waiting", |
| thread.name as &str, |
| thread.id() as usize |
| ); |
| self = self.add_to_queue_and_reschedule(thread, wait_type); |
| wait_queue_debug!( |
| "WaitQueue: thread '{}' ({:#010x}) resumed", |
| self.sched().current_thread_name() as &str, |
| self.sched().current_thread_id() as usize |
| ); |
| |
| // Return `Error::Cancelled` if the wait was interrupted because this |
| // thread was terminated while it was waiting. |
| if self.sched().current_thread().terminating && wait_type == WaitType::Interruptible { |
| (self, Err(Error::Cancelled)) |
| } else { |
| (self, Ok(())) |
| } |
| } |
| |
| pub fn wait_until( |
| mut self, |
| wait_type: WaitType, |
| deadline: Instant<K::Clock>, |
| ) -> (Self, Result<()>) { |
| // If the current thread is terminating and this is an interruptible wait, |
| // return early with an error. |
| if self.sched().current_thread().terminating && wait_type == WaitType::Interruptible { |
| return (self, Err(Error::Cancelled)); |
| } |
| |
| let mut thread = self.sched_mut().take_current_thread(); |
| wait_queue_debug!( |
| "WaitQueue: thread '{}' ({:#010x}) wait_until", |
| thread.name as &str, |
| thread.id() as usize |
| ); |
| |
| // Smuggle references to the thread and wait queue into the callback. |
| // Safety: |
| // * The thread will always be active for the lifetime of the wait as |
| // it can not be joined while in a WaitQueue. |
| // * The wait queue will outlive the callback because it will either |
| // fire while the thread is in the wait queue or will be the timer |
| // will be canceled before this function returns. |
| // * All access to thread_ptr and wait_queue_ptr in the callback are |
| // done while the wait queue lock is held. |
| let thread_ptr = unsafe { thread.as_mut_ptr() }; |
| let smuggled_wait_queue = unsafe { self.smuggle() }; |
| |
| // Safety: |
| // * Only accessed while the wait_queue_lock is held; |
| let result: UnsafeCell<Result<()>> = UnsafeCell::new(Ok(())); |
| let result_ptr = result.get(); |
| |
| // Timeout callback will remove the thread from the wait queue and put |
| // it back on the run queue. |
| let mut callback_closure = move |_kernel, callback: ForeignBox<Timer<K>>, _now| { |
| // Safety: wait queue lock is valid for the lifetime of the callback. |
| let mut wait_queue = unsafe { smuggled_wait_queue.lock() }; |
| |
| // Safety: the wait queue lock protects access to the thread. |
| wait_queue_debug!( |
| "WaitQueue: timeout callback for thread '{}' ({:#010x}) (state: {})", |
| unsafe { (*thread_ptr).name } as &str, |
| (unsafe { (*thread_ptr).id() }) as usize, |
| unsafe { thread::to_string((*thread_ptr).state) } as &str |
| ); |
| |
| // Safety: We know that thread_ptr is valid for the life of `wait_until` |
| // and this callback will either be called or canceled before `wait_until` |
| // returns. |
| if let Some(error) = unsafe { wait_queue.process_timeout(thread_ptr) } { |
| // Safety: Acquisition of the wait queue lock at the beginning of |
| // the callback ensures mutual exclusion with accesses from the |
| // body of `wait_until`. |
| unsafe { result_ptr.write_volatile(Err(error)) }; |
| } |
| |
| let _ = callback.consume(); |
| None // Don't re-arm |
| }; |
| |
| let mut callback = Timer::new(deadline, unsafe { |
| ForeignBox::new_from_ptr(&raw mut callback_closure) |
| }); |
| let callback_ptr = &raw mut callback; |
| timer::schedule_timer(self.kernel, unsafe { |
| ForeignBox::new_from_ptr(callback_ptr) |
| }); |
| |
| // Safety: It is important hold on to the WaitQueue lock that is returned |
| // from reschedule as the pointers needed by the timer canceling code |
| // below rely on it for correctness. |
| self = self.add_to_queue_and_reschedule(thread, wait_type); |
| |
| wait_queue_debug!( |
| "WaitQueue: thread '{}' ({:#010x}) resumed", |
| self.sched().current_thread_name() as &str, |
| self.sched().current_thread_id() as usize |
| ); |
| |
| // Cancel timeout callback if has not already fired. |
| // |
| // Safety: callback_ptr is valid until callback goes out of scope. |
| unsafe { |
| timer::cancel_and_consume_timer(self.kernel, NonNull::new_unchecked(callback_ptr)) |
| }; |
| |
| wait_queue_debug!( |
| "WaitQueue: thread '{}' ({:#010x}) exiting wait_until", |
| self.sched().current_thread_name() as &str, |
| self.sched().current_thread_id() as usize |
| ); |
| |
| // Safety: |
| // |
| // At this point the thread will be in the run queue by virtue of |
| // `reschedule()` return and the timer callback will have fired or be |
| // canceled. This leaves no dangling references to our "smuggled" |
| // pointers. |
| // |
| // It is also now safe to read the result UnsafeCell |
| |
| // Return `Error::Cancelled` if the wait was interrupted because this |
| // thread was terminated while it was waiting. |
| if self.sched().current_thread().terminating && wait_type == WaitType::Interruptible { |
| (self, Err(Error::Cancelled)) |
| } else { |
| (self, unsafe { result.get().read_volatile() }) |
| } |
| } |
| } |