From 2333e45292d63b0cfa49509cc773a70a05bae646 Mon Sep 17 00:00:00 2001 From: jD91mZM2 <me@krake.one> Date: Sat, 6 Jul 2019 21:59:57 +0200 Subject: [PATCH] Implement the Once<T> synchronization structure Not sure if I should add a RwLock for the ptrace state too... --- src/header/stdio/default.rs | 2 +- src/header/stdio/helpers.rs | 4 +- src/header/stdio/mod.rs | 6 +- src/lib.rs | 6 +- src/platform/pte.rs | 2 +- src/platform/redox/ptrace.rs | 19 ++++++ src/sync/mod.rs | 109 +++++++++++++++++++++++++++++++++++ src/{ => sync}/mutex.rs | 77 +++++++++---------------- src/sync/once.rs | 66 +++++++++++++++++++++ 9 files changed, 232 insertions(+), 59 deletions(-) create mode 100644 src/sync/mod.rs rename src/{ => sync}/mutex.rs (58%) create mode 100644 src/sync/once.rs diff --git a/src/header/stdio/default.rs b/src/header/stdio/default.rs index 0dc2be97c..ceae34bd6 100644 --- a/src/header/stdio/default.rs +++ b/src/header/stdio/default.rs @@ -4,8 +4,8 @@ use core::ptr; use fs::File; use io::LineWriter; -use mutex::Mutex; use platform::types::*; +use sync::Mutex; pub struct GlobalFile(UnsafeCell<FILE>); impl GlobalFile { diff --git a/src/header/stdio/helpers.rs b/src/header/stdio/helpers.rs index 878ec3cf8..340f3ea27 100644 --- a/src/header/stdio/helpers.rs +++ b/src/header/stdio/helpers.rs @@ -5,9 +5,9 @@ use header::errno; use header::fcntl::*; use header::string::strchr; use io::LineWriter; -use mutex::Mutex; -use platform; use platform::types::*; +use platform; +use sync::Mutex; use super::constants::*; use super::{Buffer, FILE}; diff --git a/src/header/stdio/mod.rs b/src/header/stdio/mod.rs index 491025ac5..08a66c26f 100644 --- a/src/header/stdio/mod.rs +++ b/src/header/stdio/mod.rs @@ -14,11 +14,11 @@ use header::errno::{self, STR_ERROR}; use header::string::{self, strlen}; use header::{fcntl, stdlib, unistd}; use io::{self, BufRead, LineWriter, Read, Write}; -use mutex::Mutex; -use platform; use platform::types::*; -use platform::{errno, WriteByte}; use platform::{Pal, Sys}; +use platform::{errno, WriteByte}; +use platform; +use sync::Mutex; pub use self::constants::*; mod constants; diff --git a/src/lib.rs b/src/lib.rs index 2ce9a65a9..00a29df74 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,11 +11,13 @@ #![feature(const_vec_new)] #![feature(core_intrinsics)] #![feature(global_asm)] -// FIXME: Stable on nightly, remove once redox fork is updated +// FIXME: Add #![feature(maybe_uninit_extra)] +// FIXME: The following are stable on nightly, remove once redox fork is updated #![feature(alloc)] #![feature(iter_copied)] #![feature(lang_items)] #![feature(linkage)] +#![feature(maybe_uninit)] #![feature(stmt_expr_attributes)] #![feature(str_internals)] #![feature(thread_local)] @@ -57,9 +59,9 @@ pub mod fs; pub mod header; pub mod io; pub mod ld_so; -pub mod mutex; pub mod platform; pub mod start; +pub mod sync; use platform::{Allocator, Pal, Sys}; diff --git a/src/platform/pte.rs b/src/platform/pte.rs index 95cc9469d..2bb61dd55 100644 --- a/src/platform/pte.rs +++ b/src/platform/pte.rs @@ -8,9 +8,9 @@ use core::{intrinsics, ptr}; use header::sys_mman; use header::time::timespec; use ld_so::tcb::{Master, Tcb}; -use mutex::Mutex; use platform::types::{c_int, c_uint, c_void, pid_t, size_t}; use platform::{Pal, Sys}; +use sync::Mutex; pub struct Semaphore { lock: Mutex<()>, diff --git a/src/platform/redox/ptrace.rs b/src/platform/redox/ptrace.rs index 9c3c71803..912903d17 100644 --- a/src/platform/redox/ptrace.rs +++ b/src/platform/redox/ptrace.rs @@ -1,6 +1,25 @@ +//! Note: This module is not going to be clean. We're not going to be +//! able to follow the specs 100%. Linux ptrace is very, very, +//! different to Redox. Many people agree that Linux ptrace is bad, so +//! we are NOT going to bend our API for the sake of +//! compatibility. So, this module will be a hellhole. + use super::super::types::*; use super::super::PalPtrace; use super::{e, Sys}; +use crate::sync::{Mutex, Once}; +use alloc::collections::BTreeMap; + +#[derive(Default)] +struct State { + +} + +static STATE: Once<Mutex<State>> = Once::new(); + +fn state() -> &'static Mutex<State> { + STATE.call_once(|| Mutex::new(State::default())) +} impl PalPtrace for Sys { fn ptrace(request: c_int, pid: pid_t, addr: *mut c_void, data: *mut c_void) -> c_int { diff --git a/src/sync/mod.rs b/src/sync/mod.rs new file mode 100644 index 000000000..e8dce8214 --- /dev/null +++ b/src/sync/mod.rs @@ -0,0 +1,109 @@ +pub mod mutex; +pub mod once; + +pub use self::mutex::{Mutex, MutexGuard}; +pub use self::once::Once; + +use core::cell::UnsafeCell; +use core::ops::Deref; +use core::sync::atomic::AtomicI32 as AtomicInt; +use core::sync::atomic; +use platform::types::*; +use platform::{Pal, Sys}; + +const FUTEX_WAIT: c_int = 0; +const FUTEX_WAKE: c_int = 1; + +#[derive(Clone, Copy, PartialEq, Eq)] +enum AttemptStatus { + Desired, + Waiting, + Other +} + +/// Convenient wrapper around the "futex" system call for +/// synchronization implementations +struct AtomicLock { + atomic: UnsafeCell<AtomicInt> +} +impl AtomicLock { + pub const fn new(value: c_int) -> Self { + Self { + atomic: UnsafeCell::new(AtomicInt::new(value)) + } + } + pub fn notify_one(&self) { + Sys::futex(unsafe { &mut *self.atomic.get() }.get_mut(), FUTEX_WAKE, 1); + } + pub fn notify_all(&self) { + Sys::futex(unsafe { &mut *self.atomic.get() }.get_mut(), FUTEX_WAKE, c_int::max_value()); + } + pub fn wait_if(&self, value: c_int) { + Sys::futex(unsafe { &mut *self.atomic.get() }.get_mut(), FUTEX_WAIT, value); + } + /// A general way to efficiently wait for what might be a long time, using two closures: + /// + /// - `attempt` = Attempt to modify the atomic value to any + /// desired state. + /// - `mark_long` = Attempt to modify the atomic value to sign + /// that it want's to get notified when waiting is done. + /// + /// Both of these closures are allowed to spuriously give a + /// non-success return value, they are used only as optimization + /// hints. However, what counts as a "desired value" may differ + /// per closure. Therefore, `mark_long` can notify a value as + /// "desired" in order to get `attempt` retried immediately. + /// + /// The `long` parameter is the only one which actually cares + /// about the specific value of your atomics. This is needed + /// because it needs to pass this to the futex system call in + /// order to avoid race conditions where the atomic could be + /// modified to the desired value before the call is complete and + /// we receive the wakeup notification. + pub fn wait_until<F1, F2>(&self, attempt: F1, mark_long: F2, long: c_int) + where + F1: Fn(&AtomicInt) -> AttemptStatus, + F2: Fn(&AtomicInt) -> AttemptStatus + { + // First, try spinning for really short durations + for _ in 0..999 { + atomic::spin_loop_hint(); + if attempt(self) == AttemptStatus::Desired { + return; + } + } + + // One last attempt, to initiate "previous" + let mut previous = attempt(self); + + // Ok, that seems to take quite some time. Let's go into a + // longer, more patient, wait. + loop { + if previous == AttemptStatus::Desired { + return; + } + + if + // If we or somebody else already initiated a long + // wait, OR + previous == AttemptStatus::Waiting || + // Otherwise, unless our attempt to initiate a long + // wait informed us that we might be done waiting + mark_long(self) != AttemptStatus::Desired + { + self.wait_if(long); + } + + previous = attempt(self); + } + } +} +impl Deref for AtomicLock { + type Target = AtomicInt; + + fn deref(&self) -> &Self::Target { + unsafe { + &*self.atomic.get() + } + } +} diff --git a/src/mutex.rs b/src/sync/mutex.rs similarity index 58% rename from src/mutex.rs rename to src/sync/mutex.rs index 34ff1ed44..a88f25157 100644 --- a/src/mutex.rs +++ b/src/sync/mutex.rs @@ -1,16 +1,15 @@ +use super::{AtomicLock, AttemptStatus}; use core::cell::UnsafeCell; use core::ops::{Deref, DerefMut}; -use core::sync::atomic; -use core::sync::atomic::AtomicI32 as AtomicInt; use core::sync::atomic::Ordering::SeqCst; use platform::types::*; -use platform::{Pal, Sys}; -pub const FUTEX_WAIT: c_int = 0; -pub const FUTEX_WAKE: c_int = 1; +const UNLOCKED: c_int = 0; +const LOCKED: c_int = 1; +const WAITING: c_int = 2; pub struct Mutex<T> { - lock: UnsafeCell<AtomicInt>, + lock: AtomicLock, content: UnsafeCell<T>, } unsafe impl<T: Send> Send for Mutex<T> {} @@ -19,7 +18,7 @@ impl<T> Mutex<T> { /// Create a new mutex pub const fn new(content: T) -> Self { Self { - lock: UnsafeCell::new(AtomicInt::new(0)), + lock: AtomicLock::new(UNLOCKED), content: UnsafeCell::new(content), } } @@ -31,67 +30,45 @@ impl<T> Mutex<T> { /// ``` pub unsafe fn locked(content: T) -> Self { Self { - lock: UnsafeCell::new(AtomicInt::new(1)), + lock: AtomicLock::new(LOCKED), content: UnsafeCell::new(content), } } - unsafe fn atomic(&self) -> &mut AtomicInt { - &mut *self.lock.get() - } - /// Tries to lock the mutex, fails if it's already locked. Manual means /// it's up to you to unlock it after mutex. Returns the last atomic value /// on failure. You should probably not worry about this, it's used for /// internal optimizations. pub unsafe fn manual_try_lock(&self) -> Result<&mut T, c_int> { - self.atomic() - .compare_exchange(0, 1, SeqCst, SeqCst) + self.lock.compare_exchange(UNLOCKED, LOCKED, SeqCst, SeqCst) .map(|_| &mut *self.content.get()) } /// Lock the mutex, returning the inner content. After doing this, it's /// your responsibility to unlock it after usage. Mostly useful for FFI: /// Prefer normal .lock() where possible. pub unsafe fn manual_lock(&self) -> &mut T { - let mut last = 0; - - // First, try spinning for really short durations: - for _ in 0..100 { - atomic::spin_loop_hint(); - last = match self.manual_try_lock() { - Ok(content) => return content, - Err(value) => value, - }; - } - - // We're waiting for a longer duration, so let's employ a futex. - loop { - // If the value is 1, set it to 2 to signify that we're waiting for - // it to to send a FUTEX_WAKE on unlock. - // - // - Skip the atomic operation if the last value was 2, since it most likely hasn't changed. - // - Skip the futex wait if the atomic operation says the mutex is unlocked. - if last == 2 - || self - .atomic() - .compare_exchange(1, 2, SeqCst, SeqCst) - .unwrap_or_else(|err| err) - != 0 - { - Sys::futex(self.atomic().get_mut(), FUTEX_WAIT, 2); - } - - last = match self.manual_try_lock() { - Ok(content) => return content, - Err(value) => value, - }; - } + self.lock.wait_until( + |lock| { + lock.compare_exchange_weak(UNLOCKED, LOCKED, SeqCst, SeqCst) + .map(|_| AttemptStatus::Desired) + .unwrap_or_else(|e| match e { + WAITING => AttemptStatus::Waiting, + _ => AttemptStatus::Other + }) + }, + |lock| match lock.compare_exchange_weak(LOCKED, WAITING, SeqCst, SeqCst).unwrap_or_else(|e| e) { + UNLOCKED => AttemptStatus::Desired, + WAITING => AttemptStatus::Waiting, + _ => AttemptStatus::Other + }, + WAITING + ); + &mut *self.content.get() } /// Unlock the mutex, if it's locked. pub unsafe fn manual_unlock(&self) { - if self.atomic().swap(0, SeqCst) == 2 { - // At least one futex is up, so let's notify it - Sys::futex(self.atomic().get_mut(), FUTEX_WAKE, 1); + if self.lock.swap(UNLOCKED, SeqCst) == WAITING { + self.lock.notify_one(); } } diff --git a/src/sync/once.rs b/src/sync/once.rs new file mode 100644 index 000000000..06a7f6a4e --- /dev/null +++ b/src/sync/once.rs @@ -0,0 +1,66 @@ +use super::{AtomicLock, AttemptStatus}; +use core::cell::UnsafeCell; +use core::mem::MaybeUninit; +use core::sync::atomic::Ordering::SeqCst; +use platform::types::*; + +const UNINITIALIZED: c_int = 0; +const INITIALIZING: c_int = 1; +const WAITING: c_int = 2; +const INITIALIZED: c_int = 3; + +pub struct Once<T> { + status: AtomicLock, + data: UnsafeCell<MaybeUninit<T>> +} +unsafe impl<T: Send> Send for Once<T> {} +unsafe impl<T: Send> Sync for Once<T> {} +impl<T> Once<T> { + pub const fn new() -> Self { + Self { + status: AtomicLock::new(UNINITIALIZED), + data: UnsafeCell::new(MaybeUninit::uninit()) + } + } + pub fn call_once<F>(&self, f: F) -> &mut T + where F: FnOnce() -> T + { + match self.status.compare_and_swap(UNINITIALIZED, INITIALIZING, SeqCst) { + UNINITIALIZED => { + // We now have a lock, let's initiate things! + let ret = unsafe { &mut *self.data.get() }.write(f()); + + // Mark the data as initialized + if self.status.swap(INITIALIZED, SeqCst) == WAITING { + // At least one thread is waiting on this to finish + self.status.notify_all(); + } + }, + INITIALIZING | WAITING => self.status.wait_until( + |lock| match lock.load(SeqCst) { + WAITING => AttemptStatus::Waiting, + INITIALIZED => AttemptStatus::Desired, + _ => AttemptStatus::Other + }, + |lock| match lock.compare_exchange_weak(INITIALIZING, WAITING, SeqCst, SeqCst).unwrap_or_else(|e| e) { + WAITING => AttemptStatus::Waiting, + INITIALIZED => AttemptStatus::Desired, + _ => AttemptStatus::Other + }, + WAITING + ), + INITIALIZED => (), + _ => unreachable!("invalid state for Once<T>") + } + + // At this point the data must be initialized! + unsafe { + &mut *(&mut *self.data.get()).as_mut_ptr() + } + } +} +impl<T> Default for Once<T> { + fn default() -> Self { + Self::new() + } +} -- GitLab