diff --git a/src/header/bits_pthread/mod.rs b/src/header/bits_pthread/mod.rs index e3dc67bf2e85c542227a9999bd230d29ff348865..9574110834e21d15c1ead1546b723218022d426e 100644 --- a/src/header/bits_pthread/mod.rs +++ b/src/header/bits_pthread/mod.rs @@ -40,7 +40,7 @@ pub union pthread_barrierattr_t { } #[repr(C)] pub union pthread_mutex_t { - __relibc_internal_size: [c_uchar; 4], + __relibc_internal_size: [c_uchar; 12], __relibc_internal_align: c_int, } #[repr(C)] diff --git a/src/header/pthread/cond.rs b/src/header/pthread/cond.rs index 2f664583d3366caa60990fd5f53f32500e0bec2e..e91edac356628431111de25f9cf78a8c395890a6 100644 --- a/src/header/pthread/cond.rs +++ b/src/header/pthread/cond.rs @@ -4,10 +4,6 @@ use super::*; // PTHREAD_COND_INITIALIZER is defined manually in bits_pthread/cbindgen.toml -fn e(r: Result<(), pthread::Errno>) -> c_int { - r.map_or_else(|pthread::Errno(errno)| errno, |()| 0) -} - #[no_mangle] pub unsafe extern "C" fn pthread_cond_broadcast(cond: *mut pthread_cond_t) -> c_int { e((&*cond.cast::<RlctCond>()).broadcast()) @@ -33,13 +29,13 @@ pub unsafe extern "C" fn pthread_cond_signal(cond: *mut pthread_cond_t) -> c_int } #[no_mangle] -pub unsafe extern "C" fn pthread_cond_timedwait(cond: *mut pthread_cond_t, mutex_ptr: *mut pthread_mutex_t, timeout: *const timespec) -> c_int { - e((&*cond.cast::<RlctCond>()).timedwait(mutex_ptr, Some(&*timeout))) +pub unsafe extern "C" fn pthread_cond_timedwait(cond: *mut pthread_cond_t, mutex: *mut pthread_mutex_t, timeout: *const timespec) -> c_int { + e((&*cond.cast::<RlctCond>()).timedwait(&*mutex.cast::<RlctMutex>(), &*timeout)) } #[no_mangle] pub unsafe extern "C" fn pthread_cond_wait(cond: *mut pthread_cond_t, mutex: *mut pthread_mutex_t) -> c_int { - e((&*cond.cast::<RlctCond>()).wait(mutex)) + e((&*cond.cast::<RlctCond>()).wait(&*mutex.cast::<RlctMutex>())) } #[no_mangle] diff --git a/src/header/pthread/mod.rs b/src/header/pthread/mod.rs index f2e1e810f29a0a1367f57353351ab491d6256943..466df19d2a132f7e796dfa1526e39b7e6c0cd64d 100644 --- a/src/header/pthread/mod.rs +++ b/src/header/pthread/mod.rs @@ -7,7 +7,7 @@ use crate::platform::{self, Pal, Sys, types::*}; use crate::header::{sched::*, time::timespec}; use crate::pthread; -fn e(result: Result<(), pthread::Errno>) -> i32 { +pub fn e(result: Result<(), pthread::Errno>) -> i32 { match result { Ok(()) => 0, Err(pthread::Errno(error)) => error, diff --git a/src/header/pthread/mutex.rs b/src/header/pthread/mutex.rs index a6570108dc215edc3aec192fef7b29f9a0585fc7..a8b9a13c1ac1173989d940bbda73c0f52f9700c1 100644 --- a/src/header/pthread/mutex.rs +++ b/src/header/pthread/mutex.rs @@ -1,89 +1,74 @@ use super::*; use crate::header::errno::*; +use crate::pthread::Errno; use core::sync::atomic::AtomicI32 as AtomicInt; // PTHREAD_MUTEX_INITIALIZER is defined in bits_pthread/cbindgen.toml -#[repr(u8)] -enum State { - Unlocked, - Locked, - Waiting, -} - -// #[no_mangle] +#[no_mangle] pub unsafe extern "C" fn pthread_mutex_consistent(mutex: *mut pthread_mutex_t) -> c_int { - let mutex = &*mutex.cast::<RlctMutex>(); - - todo!() + e((&*mutex.cast::<RlctMutex>()).make_consistent()) } #[no_mangle] pub unsafe extern "C" fn pthread_mutex_destroy(mutex: *mut pthread_mutex_t) -> c_int { - let _mutex = &mut *mutex.cast::<RlctMutex>(); + // No-op + core::ptr::drop_in_place(mutex.cast::<RlctMutex>()); 0 } -// #[no_mangle] -pub extern "C" fn pthread_mutex_getprioceiling(mutex: *const pthread_mutex_t, prioceiling: *mut c_int) -> c_int { - todo!(); +#[no_mangle] +pub unsafe extern "C" fn pthread_mutex_getprioceiling(mutex: *const pthread_mutex_t, prioceiling: *mut c_int) -> c_int { + match (&*mutex.cast::<RlctMutex>()).prioceiling() { + Ok(value) => { + prioceiling.write(value); + 0 + } + Err(Errno(errno)) => errno, + } } #[no_mangle] pub unsafe extern "C" fn pthread_mutex_init(mutex: *mut pthread_mutex_t, attr: *const pthread_mutexattr_t) -> c_int { let attr = attr.cast::<RlctMutexAttr>().as_ref().copied().unwrap_or_default(); - // TODO: attr - mutex.cast::<RlctMutex>().write(RlctMutex { - inner: crate::sync::mutex::UNLOCKED.into(), - /*robust: attr.robust != 0, - ty: match attr.ty { - PTHREAD_MUTEX_DEFAULT => Ty::Def, - PTHREAD_MUTEX_ERRORCHECK => Ty::Errck, - PTHREAD_MUTEX_RECURSIVE => Ty::Recursive, - PTHREAD_MUTEX_NORMAL => Ty::Normal, - - _ => return EINVAL, - }*/ - }); - 0 + match RlctMutex::new(&attr) { + Ok(new) => { + mutex.cast::<RlctMutex>().write(new); + + 0 + } + Err(Errno(errno)) => errno, + } } #[no_mangle] pub unsafe extern "C" fn pthread_mutex_lock(mutex: *mut pthread_mutex_t) -> c_int { - let mutex = &*mutex.cast::<RlctMutex>(); - - crate::sync::mutex::manual_lock_generic(&(&*mutex).inner); - - 0 + e((&*mutex.cast::<RlctMutex>()).lock()) } -// #[no_mangle] -pub extern "C" fn pthread_mutex_setprioceiling(mutex: *mut pthread_mutex_t, prioceiling: c_int, old_prioceiling: *mut c_int) -> c_int { - todo!(); +#[no_mangle] +pub unsafe extern "C" fn pthread_mutex_setprioceiling(mutex: *mut pthread_mutex_t, prioceiling: c_int, old_prioceiling: *mut c_int) -> c_int { + match (&*mutex.cast::<RlctMutex>()).replace_prioceiling(prioceiling) { + Ok(old) => { + old_prioceiling.write(old); + 0 + } + Err(Errno(errno)) => errno, + } } #[no_mangle] -pub unsafe extern "C" fn pthread_mutex_timedlock(mutex: *mut pthread_mutex_t, _timespec: *const timespec) -> c_int { - // TODO - pthread_mutex_lock(mutex) +pub unsafe extern "C" fn pthread_mutex_timedlock(mutex: *mut pthread_mutex_t, timespec: *const timespec) -> c_int { + e((&*mutex.cast::<RlctMutex>()).lock_with_timeout(&*timespec)) } #[no_mangle] pub unsafe extern "C" fn pthread_mutex_trylock(mutex: *mut pthread_mutex_t) -> c_int { - let mutex = &*mutex.cast::<RlctMutex>(); - - if crate::sync::mutex::manual_try_lock_generic(&(&*mutex).inner) { - 0 - } else { - EBUSY - } + e((&*mutex.cast::<RlctMutex>()).try_lock()) } #[no_mangle] pub unsafe extern "C" fn pthread_mutex_unlock(mutex: *mut pthread_mutex_t) -> c_int { - let mutex = &*mutex.cast::<RlctMutex>(); - - crate::sync::mutex::manual_unlock_generic(&(&*mutex).inner); - 0 + e((&*mutex.cast::<RlctMutex>()).unlock()) } #[no_mangle] @@ -156,22 +141,7 @@ pub unsafe extern "C" fn pthread_mutexattr_settype(attr: *mut pthread_mutexattr_ 0 } -#[repr(C)] -pub(crate) struct RlctMutex { - // Actual locking word. Allows the states UNLOCKED, LOCKED, and WAITING, a substate of LOCKED. - inner: AtomicInt, - - /*robust: bool, - ty: Ty,*/ - - // TODO: Robust mutexes, errorcheck, recursive mutexes -} -enum Ty { - Normal, - Def, - Errck, - Recursive, -} +pub use crate::sync::pthread_mutex::RlctMutex; #[repr(C)] #[derive(Clone, Copy)] diff --git a/src/pthread/mod.rs b/src/pthread/mod.rs index fe357a3f45c3c44aba48483d5ece634bb478556d..1d09c62ecf672814947869d587dfb51c519ccd0e 100644 --- a/src/pthread/mod.rs +++ b/src/pthread/mod.rs @@ -2,7 +2,7 @@ use core::cell::{Cell, UnsafeCell}; use core::ptr::NonNull; -use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use core::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering}; use alloc::boxed::Box; use alloc::collections::BTreeMap; @@ -28,6 +28,8 @@ pub unsafe fn init() { has_queued_cancelation: AtomicBool::new(false), flags: PthreadFlags::empty().bits().into(), + //index: FIRST_THREAD_IDX, + // TODO stack_base: core::ptr::null_mut(), stack_size: 0, @@ -37,6 +39,10 @@ pub unsafe fn init() { PTHREAD_SELF.set(obj); } + +//static NEXT_INDEX: AtomicU32 = AtomicU32::new(FIRST_THREAD_IDX + 1); +//const FIRST_THREAD_IDX: usize = 1; + pub unsafe fn terminate_from_main_thread() { for (tid, pthread) in OS_TID_TO_PTHREAD.lock().iter() { // TODO: Cancel? @@ -56,6 +62,10 @@ pub struct Pthread { has_enabled_cancelation: AtomicBool, flags: AtomicUsize, + // Small index (compared to pointer size) used for e.g. recursive mutexes. Zero is reserved, + // so it starts from one. The 31st bit is reserved. + //index: u32, + stack_base: *mut c_void, stack_size: usize, @@ -130,6 +140,7 @@ pub(crate) unsafe fn create(attrs: Option<&header::RlctAttr>, start_routine: ext stack_base, stack_size, os_tid: UnsafeCell::new(OsTid::default()), + //index: NEXT_INDEX.fetch_add(1, Ordering::Relaxed), }; let ptr = Box::into_raw(Box::new(pthread)); @@ -365,3 +376,7 @@ unsafe impl<T> Sync for ForceSendSync<T> {} #[thread_local] static PTHREAD_SELF: Cell<*mut Pthread> = Cell::new(core::ptr::null_mut()); + +/*pub(crate) fn current_thread_index() -> u32 { + current_thread().expect("current thread not present").index +}*/ diff --git a/src/sync/cond.rs b/src/sync/cond.rs index 76aa74f91bc4016019c0b3c611a9241b1d408caf..750dc140834d580089eef2d649d7c97121b9d1d5 100644 --- a/src/sync/cond.rs +++ b/src/sync/cond.rs @@ -30,28 +30,30 @@ impl Cond { pub fn signal(&self) -> Result<(), Errno> { self.wake(1) } - // TODO: Safe version using RlctMutexGuard? - pub unsafe fn timedwait(&self, mutex_ptr: *mut pthread_mutex_t, timeout: Option<×pec>) -> Result<(), Errno> { + pub fn timedwait(&self, mutex: &RlctMutex, timeout: ×pec) -> Result<(), Errno> { + self.wait_inner(mutex, Some(timeout)) + } + fn wait_inner(&self, mutex: &RlctMutex, timeout: Option<×pec>) -> Result<(), Errno> { // TODO: Error checking for certain types (i.e. robust and errorcheck) of mutexes, e.g. if the // mutex is not locked. let current = self.cur.load(Ordering::Relaxed); self.prev.store(current, Ordering::Relaxed); // TODO: ordering? - pthread_mutex_unlock(mutex_ptr); + mutex.unlock(); match timeout { Some(timeout) => { crate::sync::futex_wait(&self.cur, current, timespec::subtract(*timeout, crate::sync::rttime()).as_ref()); - pthread_mutex_timedlock(mutex_ptr, timespec::subtract(*timeout, crate::sync::rttime()).as_ref().map_or(core::ptr::null(), |r| r as *const timespec)); + mutex.lock_with_timeout(timeout); } None => { crate::sync::futex_wait(&self.cur, current, None); - pthread_mutex_lock(mutex_ptr); + mutex.lock(); } } Ok(()) } - pub unsafe fn wait(&self, mutex_ptr: *mut pthread_mutex_t) -> Result<(), Errno> { - self.timedwait(mutex_ptr, None) + pub fn wait(&self, mutex: &RlctMutex) -> Result<(), Errno> { + self.wait_inner(mutex, None) } } diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 0e71f5ee63d68848cf844ff20eb476fbbe1c808f..5abb86e2ba7720cdb558c07e5ce6e4776d1d2cdc 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -1,7 +1,10 @@ pub mod barrier; pub mod cond; +// TODO: Merge with pthread_mutex pub mod mutex; + pub mod once; +pub mod pthread_mutex; pub mod semaphore; pub mod waitval; diff --git a/src/sync/pthread_mutex.rs b/src/sync/pthread_mutex.rs new file mode 100644 index 0000000000000000000000000000000000000000..4b648bc6ce920bfa424ea532db9a9282171a6d3a --- /dev/null +++ b/src/sync/pthread_mutex.rs @@ -0,0 +1,226 @@ +use core::cell::Cell; +use core::sync::atomic::{AtomicU32 as AtomicUint, Ordering}; + +use crate::header::pthread::*; +use crate::pthread::*; +use crate::header::time::timespec; +use crate::header::errno::*; +use crate::header::sys_wait::*; + +use crate::platform::types::*; +use crate::platform::{Pal, Sys}; + +pub struct RlctMutex { + // Actual locking word. + inner: AtomicUint, + recursive_count: AtomicUint, + + ty: Ty, + robust: bool, +} + +const STATE_UNLOCKED: u32 = 0; +const WAITING_BIT: u32 = 1 << 31; +const INDEX_MASK: u32 = !WAITING_BIT; + +// TODO: Lower limit is probably better. +const RECURSIVE_COUNT_MAX_INCLUSIVE: u32 = u32::MAX; +// TODO: How many spins should we do before it becomes more time-economical to enter kernel mode +// via futexes? +const SPIN_COUNT: usize = 0; + +impl RlctMutex { + pub(crate) fn new(attr: &RlctMutexAttr) -> Result<Self, Errno> { + let RlctMutexAttr { prioceiling, protocol, pshared: _, robust, ty } = *attr; + + Ok(Self { + inner: AtomicUint::new(STATE_UNLOCKED), + recursive_count: AtomicUint::new(0), + robust: match robust { + PTHREAD_MUTEX_STALLED => false, + PTHREAD_MUTEX_ROBUST => true, + + _ => return Err(Errno(EINVAL)), + }, + ty: match ty { + PTHREAD_MUTEX_DEFAULT => Ty::Def, + PTHREAD_MUTEX_ERRORCHECK => Ty::Errck, + PTHREAD_MUTEX_RECURSIVE => Ty::Recursive, + PTHREAD_MUTEX_NORMAL => Ty::Normal, + + _ => return Err(Errno(EINVAL)), + } + }) + } + pub fn prioceiling(&self) -> Result<c_int, Errno> { + println!("TODO: Implement pthread_getprioceiling"); + Ok(0) + } + pub fn replace_prioceiling(&self, _: c_int) -> Result<c_int, Errno> { + println!("TODO: Implement pthread_setprioceiling"); + Ok(0) + } + pub fn make_consistent(&self) -> Result<(), Errno> { + println!("TODO: Implement robust mutexes"); + Ok(()) + } + fn lock_inner(&self, deadline: Option<×pec>) -> Result<(), Errno> { + let this_thread = os_tid_invalid_after_fork(); + + let mut spins_left = SPIN_COUNT; + + loop { + let result = self.inner.compare_exchange_weak(STATE_UNLOCKED, this_thread, Ordering::Acquire, Ordering::Relaxed); + + match result { + // CAS succeeded + Ok(_) => { + if self.ty == Ty::Recursive { + self.increment_recursive_count()?; + } + return Ok(()); + }, + // CAS failed, but the mutex was recursive and we already own the lock. + Err(thread) if thread & INDEX_MASK == this_thread && self.ty == Ty::Recursive => { + self.increment_recursive_count()?; + return Ok(()); + } + // CAS failed, but the mutex was error-checking and we already own the lock. + Err(thread) if thread & INDEX_MASK == this_thread && self.ty == Ty::Errck => { + return Err(Errno(EAGAIN)); + } + // CAS spuriously failed, simply retry the CAS. TODO: Use core::hint::spin_loop()? + Err(thread) if thread & INDEX_MASK == 0 => continue, + // CAS failed because some other thread owned the lock. We must now wait. + Err(thread) => { + if spins_left > 0 { + spins_left -= 1; + core::hint::spin_loop(); + continue; + } + + spins_left = SPIN_COUNT; + + let inner = self.inner.fetch_or(WAITING_BIT, Ordering::Relaxed); + + if inner == STATE_UNLOCKED { + continue; + } + + // If the mutex is not robust, simply futex_wait until unblocked. + crate::sync::futex_wait(&self.inner, inner | WAITING_BIT, None); + } + } + } + } + pub fn lock(&self) -> Result<(), Errno> { + self.lock_inner(None) + } + pub fn lock_with_timeout(&self, deadline: ×pec) -> Result<(), Errno> { + self.lock_inner(Some(deadline)) + } + fn increment_recursive_count(&self) -> Result<(), Errno> { + // We don't have to worry about asynchronous signals here, since pthread_mutex_trylock + // is not async-signal-safe. + // + // TODO: Maybe just use Cell? Send/Sync doesn't matter much anyway, and will be + // protected by the lock itself anyway. + + let prev_recursive_count = self.recursive_count.load(Ordering::Relaxed); + + if prev_recursive_count == RECURSIVE_COUNT_MAX_INCLUSIVE { + return Err(Errno(EAGAIN)); + } + + self.recursive_count.store(prev_recursive_count + 1, Ordering::Relaxed); + + Ok(()) + } + pub fn try_lock(&self) -> Result<(), Errno> { + let this_thread = os_tid_invalid_after_fork(); + + // TODO: If recursive, omitting CAS may be faster if it is already owned by this thread. + let result = self.inner.compare_exchange(STATE_UNLOCKED, this_thread, Ordering::Acquire, Ordering::Relaxed); + + if self.ty == Ty::Recursive { + match result { + Err(index) if index & INDEX_MASK != this_thread => return Err(Errno(EBUSY)), + _ => (), + } + + self.increment_recursive_count()?; + + return Ok(()); + } + + match result { + Ok(_) => Ok(()), + Err(index) if index & INDEX_MASK == this_thread && self.ty == Ty::Errck => Err(Errno(EDEADLK)), + Err(_) => Err(Errno(EBUSY)), + } + } + // Safe because we are not protecting any data. + pub fn unlock(&self) -> Result<(), Errno> { + if self.robust || matches!(self.ty, Ty::Recursive | Ty::Errck){ + if self.inner.load(Ordering::Relaxed) & INDEX_MASK != os_tid_invalid_after_fork() { + return Err(Errno(EPERM)); + } + + // TODO: Is this fence correct? + core::sync::atomic::fence(Ordering::Acquire); + } + + if self.ty == Ty::Recursive { + let next = self.recursive_count.load(Ordering::Relaxed) - 1; + self.recursive_count.store(next, Ordering::Relaxed); + + if next > 0 { return Ok(()) } + } + + let was_waiting = self.inner.swap(STATE_UNLOCKED, Ordering::Release) & WAITING_BIT != 0; + + if was_waiting { + let _ = crate::sync::futex_wake(&self.inner, 1); + } + + Ok(()) + } +} + +#[repr(u8)] +#[derive(PartialEq)] +enum Ty { + // The only difference between PTHREAD_MUTEX_NORMAL and PTHREAD_MUTEX_DEFAULT appears to be + // that "normal" mutexes deadlock if locked multiple times on the same thread, whereas + // "default" mutexes are UB in that case. So we can treat them as being the same type. + Normal, + Def, + + Errck, + Recursive, +} + +// Children after fork can only call async-signal-safe functions until they exec. +#[thread_local] +static CACHED_OS_TID_INVALID_AFTER_FORK: Cell<u32> = Cell::new(0); + +// Assumes TIDs are unique between processes, which I only know is true for Redox. +fn os_tid_invalid_after_fork() -> u32 { + // TODO: Coordinate better if using shared == PTHREAD_PROCESS_SHARED, with up to 2^32 separate + // threads within possibly distinct processes, using the mutex. OS thread IDs on Redox are + // pointer-sized, but relibc and POSIX uses int everywhere. + + let value = CACHED_OS_TID_INVALID_AFTER_FORK.get(); + + if value == 0 { + let tid = Sys::gettid(); + + assert_ne!(tid, -1, "failed to obtain current thread ID"); + + CACHED_OS_TID_INVALID_AFTER_FORK.set(tid as u32); + + tid as u32 + } else { + value + } +}