diff --git a/src/header/bits_pthread/mod.rs b/src/header/bits_pthread/mod.rs index 285f3a00e7a6180779a51b8ac7b15630d8d25547..3a82389f0249f5daab78cb8e9ba5fc01be1b6992 100644 --- a/src/header/bits_pthread/mod.rs +++ b/src/header/bits_pthread/mod.rs @@ -30,7 +30,7 @@ pub union pthread_rwlock_t { } #[repr(C)] pub union pthread_barrier_t { - __relibc_internal_size: [c_uchar; 16], + __relibc_internal_size: [c_uchar; 24], __relibc_internal_align: c_int, } #[repr(C)] diff --git a/src/sync/barrier.rs b/src/sync/barrier.rs index c07c9d3ca3d44946d94bf1079ece97d2a1b1ef9a..9d5aaf9fe00774dd880c5d7b5dc44a4e0c9ce1c9 100644 --- a/src/sync/barrier.rs +++ b/src/sync/barrier.rs @@ -3,10 +3,16 @@ use core::num::NonZeroU32; use core::sync::atomic::{AtomicU32 as AtomicUint, Ordering}; pub struct Barrier { - waited_count: AtomicUint, - notified_count: AtomicUint, - cycles_count: AtomicUint, original_count: NonZeroU32, + // 4 + lock: crate::sync::Mutex<Inner>, + // 16 + cvar: crate::header::pthread::RlctCond, + // 24 +} +struct Inner { + count: u32, + gen_id: u32, } pub enum WaitResult { @@ -17,75 +23,31 @@ pub enum WaitResult { impl Barrier { pub fn new(count: NonZeroU32) -> Self { Self { - waited_count: AtomicUint::new(0), - notified_count: AtomicUint::new(0), - cycles_count: AtomicUint::new(0), original_count: count, + lock: crate::sync::Mutex::new(Inner { count: 0, gen_id: 0 }), + cvar: crate::header::pthread::RlctCond::new(), } } pub fn wait(&self) -> WaitResult { - // The barrier wait operation can be divided into two parts: (1) incrementing the wait count where - // N-1 waiters wait and one notifies the rest, and (2) notifying all threads that have been - // waiting. - let original_count = self.original_count.get(); - let mut new = self.waited_count.fetch_add(1, Ordering::Acquire) + 1; - let original_cycle_count = self.cycles_count.load(Ordering::Acquire); - - loop { - let result = match Ord::cmp(&new, &original_count) { - cmp::Ordering::Less => { - // new < original_count, i.e. we were one of the threads that incremented the - // counter, and will return without SERIAL_THREAD later, but need to continue - // waiting for the last waiter to notify the others. - - loop { - let count = self.waited_count.load(Ordering::Acquire); - - if count >= original_count { break } - - let _ = crate::sync::futex_wait(&self.waited_count, count, None); - } - - WaitResult::Waited - } - cmp::Ordering::Equal => { - // new == original_count, i.e. we were the one thread doing the last increment, and we - // will be responsible for waking up all other waiters. + let mut guard = self.lock.lock(); + let Inner { count, gen_id } = *guard; + let last = self.original_count.get() - 1; - crate::sync::futex_wake(&self.waited_count, original_count as i32 - 1); + if count == last { + guard.gen_id = guard.gen_id.wrapping_add(1); + guard.count = 0; - WaitResult::NotifiedAll - } - cmp::Ordering::Greater => { - let mut next_cycle_count; + self.cvar.broadcast(); - loop { - next_cycle_count = self.cycles_count.load(Ordering::Acquire); + WaitResult::NotifiedAll + } else { + guard.count += 1; - if next_cycle_count != original_cycle_count { - break; - } - - crate::sync::futex_wait(&self.cycles_count, next_cycle_count, None); - } - let difference = next_cycle_count.wrapping_sub(original_cycle_count); - - new = new.saturating_sub(difference * original_cycle_count); - continue; - } - }; - - if self.notified_count.fetch_add(1, Ordering::AcqRel) + 1 == original_count { - self.notified_count.store(0, Ordering::Relaxed); - // Cycle count can be incremented nonatomically here, as this branch can only be - // reached once until waited_count is decremented again. - self.cycles_count.store(self.cycles_count.load(Ordering::Acquire).wrapping_add(1), Ordering::Release); - - let _ = self.waited_count.fetch_sub(original_count, Ordering::Relaxed); - - let _ = crate::sync::futex_wake(&self.cycles_count, i32::MAX); + while guard.count != last && guard.gen_id == gen_id { + guard = self.cvar.wait_inner_typedmutex(guard); } - return result; + + WaitResult::Waited } } } diff --git a/src/sync/barrier.rs.unused b/src/sync/barrier.rs.unused new file mode 100644 index 0000000000000000000000000000000000000000..c07c9d3ca3d44946d94bf1079ece97d2a1b1ef9a --- /dev/null +++ b/src/sync/barrier.rs.unused @@ -0,0 +1,91 @@ +use core::cmp; +use core::num::NonZeroU32; +use core::sync::atomic::{AtomicU32 as AtomicUint, Ordering}; + +pub struct Barrier { + waited_count: AtomicUint, + notified_count: AtomicUint, + cycles_count: AtomicUint, + original_count: NonZeroU32, +} + +pub enum WaitResult { + Waited, + NotifiedAll, +} + +impl Barrier { + pub fn new(count: NonZeroU32) -> Self { + Self { + waited_count: AtomicUint::new(0), + notified_count: AtomicUint::new(0), + cycles_count: AtomicUint::new(0), + original_count: count, + } + } + pub fn wait(&self) -> WaitResult { + // The barrier wait operation can be divided into two parts: (1) incrementing the wait count where + // N-1 waiters wait and one notifies the rest, and (2) notifying all threads that have been + // waiting. + let original_count = self.original_count.get(); + let mut new = self.waited_count.fetch_add(1, Ordering::Acquire) + 1; + let original_cycle_count = self.cycles_count.load(Ordering::Acquire); + + loop { + let result = match Ord::cmp(&new, &original_count) { + cmp::Ordering::Less => { + // new < original_count, i.e. we were one of the threads that incremented the + // counter, and will return without SERIAL_THREAD later, but need to continue + // waiting for the last waiter to notify the others. + + loop { + let count = self.waited_count.load(Ordering::Acquire); + + if count >= original_count { break } + + let _ = crate::sync::futex_wait(&self.waited_count, count, None); + } + + WaitResult::Waited + } + cmp::Ordering::Equal => { + // new == original_count, i.e. we were the one thread doing the last increment, and we + // will be responsible for waking up all other waiters. + + crate::sync::futex_wake(&self.waited_count, original_count as i32 - 1); + + WaitResult::NotifiedAll + } + cmp::Ordering::Greater => { + let mut next_cycle_count; + + loop { + next_cycle_count = self.cycles_count.load(Ordering::Acquire); + + if next_cycle_count != original_cycle_count { + break; + } + + crate::sync::futex_wait(&self.cycles_count, next_cycle_count, None); + } + let difference = next_cycle_count.wrapping_sub(original_cycle_count); + + new = new.saturating_sub(difference * original_cycle_count); + continue; + } + }; + + if self.notified_count.fetch_add(1, Ordering::AcqRel) + 1 == original_count { + self.notified_count.store(0, Ordering::Relaxed); + // Cycle count can be incremented nonatomically here, as this branch can only be + // reached once until waited_count is decremented again. + self.cycles_count.store(self.cycles_count.load(Ordering::Acquire).wrapping_add(1), Ordering::Release); + + let _ = self.waited_count.fetch_sub(original_count, Ordering::Relaxed); + + let _ = crate::sync::futex_wake(&self.cycles_count, i32::MAX); + } + return result; + } + } +} diff --git a/src/sync/cond.rs b/src/sync/cond.rs index 4942653e613e3386504c0da87b4ec79bbde90c00..f3ab57d162a83a9db09ffb92babc3c80fd6ffe40 100644 --- a/src/sync/cond.rs +++ b/src/sync/cond.rs @@ -11,6 +11,9 @@ pub struct Cond { cur: AtomicUint, prev: AtomicUint, } + +type Result<T, E = crate::pthread::Errno> = core::result::Result<T, E>; + impl Cond { pub fn new() -> Self { Self{ @@ -37,21 +40,31 @@ impl Cond { self.wait_inner(mutex, Some(timeout)) } fn wait_inner(&self, mutex: &RlctMutex, timeout: Option<×pec>) -> Result<(), Errno> { + self.wait_inner_generic(|| mutex.unlock(), || mutex.lock(), |timeout| mutex.lock_with_timeout(timeout), timeout) + } + pub fn wait_inner_typedmutex<'lock, T>(&self, guard: crate::sync::MutexGuard<'lock, T>) -> crate::sync::MutexGuard<'lock, T> { + let mut newguard = None; + let lock = guard.mutex; + self.wait_inner_generic(|| Ok(drop(guard)), || Ok(newguard = Some(lock.lock())), |_| unreachable!(), None).unwrap(); + newguard.unwrap() + } + // TODO: FUTEX_REQUEUE + fn wait_inner_generic(&self, unlock: impl FnOnce() -> Result<()>, lock: impl FnOnce() -> Result<()>, lock_with_timeout: impl FnOnce(×pec) -> Result<()>, timeout: Option<×pec>) -> Result<(), Errno> { // TODO: Error checking for certain types (i.e. robust and errorcheck) of mutexes, e.g. if the // mutex is not locked. let current = self.cur.load(Ordering::Relaxed); self.prev.store(current, Ordering::Relaxed); - mutex.unlock(); + unlock(); match timeout { Some(timeout) => { crate::sync::futex_wait(&self.cur, current, timespec::subtract(*timeout, crate::sync::rttime()).as_ref()); - mutex.lock_with_timeout(timeout); + lock_with_timeout(timeout); } None => { crate::sync::futex_wait(&self.cur, current, None); - mutex.lock(); + lock(); } } Ok(()) diff --git a/src/sync/mutex.rs b/src/sync/mutex.rs index 57db44dff3fa6a1054cb97388634ac89439eba43..14617919e6793c412e0a1724c4cfd8ecded012f7 100644 --- a/src/sync/mutex.rs +++ b/src/sync/mutex.rs @@ -114,7 +114,7 @@ impl<T> Mutex<T> { } pub struct MutexGuard<'a, T: 'a> { - mutex: &'a Mutex<T>, + pub(crate) mutex: &'a Mutex<T>, content: &'a mut T, } impl<'a, T> Deref for MutexGuard<'a, T> { diff --git a/src/sync/once.rs b/src/sync/once.rs index 0073f51cc8d7b7a6cb5adbac769e76bd1299c1b9..5e8341645bcd0510cf82382d91ef2ac03dd7aea9 100644 --- a/src/sync/once.rs +++ b/src/sync/once.rs @@ -1,53 +1,35 @@ use super::{AtomicLock, AttemptStatus}; use crate::platform::types::*; use core::{cell::UnsafeCell, mem::MaybeUninit}; -use core::sync::atomic::{AtomicI32 as AtomicInt, Ordering::SeqCst}; +use core::sync::atomic::{AtomicI32 as AtomicInt, Ordering}; const UNINITIALIZED: c_int = 0; const INITIALIZING: c_int = 1; const WAITING: c_int = 2; const INITIALIZED: c_int = 3; -pub(crate) fn call_once_generic(word: &AtomicInt, f: impl FnOnce()) { - match word.compare_and_swap(UNINITIALIZED, INITIALIZING, SeqCst) { - UNINITIALIZED => { - // We now have a lock, let's initiate things! - - // Mark the data as initialized - if word.swap(INITIALIZED, SeqCst) == WAITING { - // At least one thread is waiting on this to finish - crate::sync::futex_wake(word, i32::MAX); - } - } - INITIALIZING | WAITING => crate::sync::wait_until_generic( - word, - |lock| match lock.load(SeqCst) { - WAITING => AttemptStatus::Waiting, - INITIALIZED => AttemptStatus::Desired, - _ => AttemptStatus::Other, - }, - |lock| match lock - .compare_exchange_weak(INITIALIZING, WAITING, SeqCst, SeqCst) - .unwrap_or_else(|e| e) - { - WAITING => AttemptStatus::Waiting, - INITIALIZED => AttemptStatus::Desired, - _ => AttemptStatus::Other, - }, - WAITING, - ), - INITIALIZED => (), - _ => unreachable!("invalid state for Once<T>"), - } - -} - pub struct Once<T> { status: AtomicInt, data: UnsafeCell<MaybeUninit<T>>, } + +// SAFETY: +// +// Sending a Once is the same as sending a (wrapped) T. unsafe impl<T: Send> Send for Once<T> {} -unsafe impl<T: Send> Sync for Once<T> {} + +// SAFETY: +// +// For Once to be shared between threads without being unsound, only call_once needs to be safe, at +// the moment. +// +// Send requirement: the thread that gets to run the initializer function, will put a T in the cell +// which can then be accessed by other threads, thus T needs to be send. +// +// Sync requirement: after call_once has been called, it returns the value via &T, which naturally +// forces T to be Sync. +unsafe impl<T: Send + Sync> Sync for Once<T> {} + impl<T> Once<T> { pub const fn new() -> Self { Self { @@ -55,17 +37,63 @@ impl<T> Once<T> { data: UnsafeCell::new(MaybeUninit::uninit()), } } - // FIXME: Isn't &mut UB? - pub fn call_once<F>(&self, f: F) -> &mut T - where - F: FnOnce() -> T, - { - call_once_generic(&self.status, || { - unsafe { &mut *self.data.get() }.write(f()); - }); + pub fn call_once(&self, constructor: impl FnOnce() -> T) -> &T { + match self.status.compare_exchange( + UNINITIALIZED, + INITIALIZING, + // SAFETY: Success ordering: if the CAS succeeds, we technically need no + // synchronization besides the Release store to INITIALIZED, and Acquire here forbids + // possible loads in f() to be re-ordered before this CAS. One could argue whether or + // not that is reasonable, but the main point is that the success ordering must be at + // least as strong as the failure ordering. + Ordering::Acquire, + // SAFETY: Failure ordering: if the CAS fails, and status was INITIALIZING | WAITING, + // then Relaxed is sufficient, as it will have to be Acquire-loaded again later. If + // INITIALIZED is encountered however, it will nonatomically read the value in the + // Cell, which necessitates Acquire. + Ordering::Acquire + // TODO: On archs where this matters, use Relaxed and core::sync::atomic::fence? + ) { + Ok(_must_be_uninit) => { + // We now have exclusive access to the cell, let's initiate things! + unsafe { self.data.get().cast::<T>().write(constructor()) }; + + // Mark the data as initialized + if self.status.swap(INITIALIZED, Ordering::Release) == WAITING { + // At least one thread is waiting on this to finish + crate::sync::futex_wake(&self.status, i32::MAX); + } + } + Err(INITIALIZING) | Err(WAITING) => crate::sync::wait_until_generic( + &self.status, + // SAFETY: An Acquire load is necessary for the nonatomic store by the thread + // running the constructor, to become visible. + |status| match status.load(Ordering::Acquire) { + WAITING => AttemptStatus::Waiting, + INITIALIZED => AttemptStatus::Desired, + _ => AttemptStatus::Other, + }, + // SAFETY: Double-Acquire is necessary here as well, because if the CAS fails and + // it was INITIALIZED, the nonatomic write by the constructor thread, must be + // visible. + |status| match status + .compare_exchange_weak(INITIALIZING, WAITING, Ordering::Acquire, Ordering::Acquire) + .unwrap_or_else(|e| e) + { + WAITING => AttemptStatus::Waiting, + INITIALIZED => AttemptStatus::Desired, + _ => AttemptStatus::Other, + }, + WAITING, + ), + Err(INITIALIZED) => (), + + // TODO: Only for debug builds? + Err(_) => unreachable!("invalid state for Once<T>"), + } // At this point the data must be initialized! - unsafe { &mut *(&mut *self.data.get()).as_mut_ptr() } + unsafe { (&*self.data.get()).assume_init_ref() } } } impl<T> Default for Once<T> { @@ -73,3 +101,17 @@ impl<T> Default for Once<T> { Self::new() } } +// TODO: Drop doesn't work well in const fn, instead use a wrapper for relibc Rust code that adds +// Drop, and don't use that wrapper when writing the header file impls. +/* +impl<T> Drop for Once<T> { + fn drop(&mut self) { + unsafe { + if *self.status.get_mut() == INITIALIZED { + // SAFETY: It must be initialized, because of the above condition. + self.data.get_mut().assume_init_drop(); + } + } + } +} +*/