From cf34e2512e8940d910beb5d6b8fc50d6e294bda9 Mon Sep 17 00:00:00 2001 From: 4lDO2 <4lDO2@protonmail.com> Date: Thu, 13 Apr 2023 11:21:13 +0200 Subject: [PATCH] Move condvar impl to a mostly-safe module. --- src/header/pthread/cond.rs | 75 +++++++++++++------------------------ src/header/pthread/mutex.rs | 12 +++--- src/header/time/mod.rs | 23 +++++++++++- src/sync/cond.rs | 57 ++++++++++++++++++++++++++++ src/sync/mod.rs | 14 +++++++ 5 files changed, 125 insertions(+), 56 deletions(-) create mode 100644 src/sync/cond.rs diff --git a/src/header/pthread/cond.rs b/src/header/pthread/cond.rs index 25d2faf5..2f664583 100644 --- a/src/header/pthread/cond.rs +++ b/src/header/pthread/cond.rs @@ -2,75 +2,49 @@ use super::*; -use core::sync::atomic::{AtomicI32 as AtomicInt, Ordering}; +// PTHREAD_COND_INITIALIZER is defined manually in bits_pthread/cbindgen.toml -// PTHREAD_COND_INITIALIZER +fn e(r: Result<(), pthread::Errno>) -> c_int { + r.map_or_else(|pthread::Errno(errno)| errno, |()| 0) +} #[no_mangle] pub unsafe extern "C" fn pthread_cond_broadcast(cond: *mut pthread_cond_t) -> c_int { - wake(cond, i32::MAX) -} - -unsafe fn wake(cond: *mut pthread_cond_t, n: i32) -> c_int { - let cond = &*cond.cast::<RlctCond>(); - - // This is formally correct as long as we don't have more than u32::MAX threads. - let prev = cond.prev.load(Ordering::SeqCst); - cond.cur.store(prev.wrapping_add(1), Ordering::SeqCst); - - crate::sync::futex_wake(&cond.cur, n); - - 0 + e((&*cond.cast::<RlctCond>()).broadcast()) } #[no_mangle] pub unsafe extern "C" fn pthread_cond_destroy(cond: *mut pthread_cond_t) -> c_int { - let _cond = &mut cond.cast::<RlctCond>(); - // No-op + core::ptr::drop_in_place(cond.cast::<RlctCond>()); 0 } #[no_mangle] pub unsafe extern "C" fn pthread_cond_init(cond: *mut pthread_cond_t, _attr: *const pthread_condattr_t) -> c_int { - cond.cast::<RlctCond>().write(RlctCond { - cur: AtomicInt::new(0), - prev: AtomicInt::new(0), - }); + cond.cast::<RlctCond>().write(RlctCond::new()); + 0 } #[no_mangle] pub unsafe extern "C" fn pthread_cond_signal(cond: *mut pthread_cond_t) -> c_int { - wake(cond, 1) + e((&*cond.cast::<RlctCond>()).signal()) } #[no_mangle] pub unsafe extern "C" fn pthread_cond_timedwait(cond: *mut pthread_cond_t, mutex_ptr: *mut pthread_mutex_t, timeout: *const timespec) -> c_int { - // TODO: Error checking for certain types (i.e. robust and errorcheck) of mutexes, e.g. if the - // mutex is not locked. - let cond = &*cond.cast::<RlctCond>(); - let timeout: Option<×pec> = timeout.as_ref(); - - let current = cond.cur.load(Ordering::Relaxed); - cond.prev.store(current, Ordering::SeqCst); // TODO: ordering? - - pthread_mutex_unlock(mutex_ptr); - crate::sync::futex_wait(&cond.cur, current, timeout); - pthread_mutex_lock(mutex_ptr); - - 0 + e((&*cond.cast::<RlctCond>()).timedwait(mutex_ptr, Some(&*timeout))) } #[no_mangle] pub unsafe extern "C" fn pthread_cond_wait(cond: *mut pthread_cond_t, mutex: *mut pthread_mutex_t) -> c_int { - pthread_cond_timedwait(cond, mutex, core::ptr::null()) + e((&*cond.cast::<RlctCond>()).wait(mutex)) } #[no_mangle] pub unsafe extern "C" fn pthread_condattr_destroy(condattr: *mut pthread_condattr_t) -> c_int { - let _condattr = &mut *condattr.cast::<RlctCondAttr>(); - + core::ptr::drop_in_place(condattr.cast::<RlctCondAttr>()); // No-op 0 } @@ -89,12 +63,7 @@ pub unsafe extern "C" fn pthread_condattr_getpshared(condattr: *const pthread_co #[no_mangle] pub unsafe extern "C" fn pthread_condattr_init(condattr: *mut pthread_condattr_t) -> c_int { - condattr.cast::<RlctCondAttr>().write(RlctCondAttr { - // FIXME: system clock - clock: 0, - // Default - pshared: PTHREAD_PROCESS_PRIVATE, - }); + condattr.cast::<RlctCondAttr>().write(RlctCondAttr::default()); 0 } @@ -112,11 +81,19 @@ pub unsafe extern "C" fn pthread_condattr_setpshared(condattr: *mut pthread_cond } pub(crate) struct RlctCondAttr { - pub clock: clockid_t, - pub pshared: c_int, + clock: clockid_t, + pshared: c_int, } -pub(crate) struct RlctCond { - pub cur: AtomicInt, - pub prev: AtomicInt, +pub(crate) type RlctCond = crate::sync::cond::Cond; + +impl Default for RlctCondAttr { + fn default() -> Self { + Self { + // FIXME: system clock + clock: 0, + // Default + pshared: PTHREAD_PROCESS_PRIVATE, + } + } } diff --git a/src/header/pthread/mutex.rs b/src/header/pthread/mutex.rs index 05938fd2..a6570108 100644 --- a/src/header/pthread/mutex.rs +++ b/src/header/pthread/mutex.rs @@ -63,9 +63,10 @@ pub extern "C" fn pthread_mutex_setprioceiling(mutex: *mut pthread_mutex_t, prio todo!(); } -// #[no_mangle] -pub extern "C" fn pthread_mutex_timedlock(mutex: *mut pthread_mutex_t, timespec: *const timespec) -> c_int { - todo!(); +#[no_mangle] +pub unsafe extern "C" fn pthread_mutex_timedlock(mutex: *mut pthread_mutex_t, _timespec: *const timespec) -> c_int { + // TODO + pthread_mutex_lock(mutex) } #[no_mangle] pub unsafe extern "C" fn pthread_mutex_trylock(mutex: *mut pthread_mutex_t) -> c_int { @@ -88,8 +89,7 @@ pub unsafe extern "C" fn pthread_mutex_unlock(mutex: *mut pthread_mutex_t) -> c_ #[no_mangle] pub unsafe extern "C" fn pthread_mutexattr_destroy(attr: *mut pthread_mutexattr_t) -> c_int { // No-op - - core::ptr::drop_in_place(attr); + core::ptr::drop_in_place(attr.cast::<RlctMutexAttr>()); 0 } @@ -164,7 +164,7 @@ pub(crate) struct RlctMutex { /*robust: bool, ty: Ty,*/ - // TODO: Robust mutexes + // TODO: Robust mutexes, errorcheck, recursive mutexes } enum Ty { Normal, diff --git a/src/header/time/mod.rs b/src/header/time/mod.rs index ff0c2a22..e505d3f5 100644 --- a/src/header/time/mod.rs +++ b/src/header/time/mod.rs @@ -13,12 +13,33 @@ pub mod constants; mod strftime; #[repr(C)] -#[derive(Default)] +#[derive(Clone, Copy, Default)] pub struct timespec { pub tv_sec: time_t, pub tv_nsec: c_long, } +impl timespec { + // TODO: Write test + pub fn subtract(later: timespec, earlier: timespec) -> Option<timespec> { + // TODO: Can tv_nsec be negative? + let later_nsec = c_ulong::try_from(later.tv_nsec).ok()?; + let earlier_nsec = c_ulong::try_from(earlier.tv_nsec).ok()?; + + Some(if later_nsec > earlier_nsec { + timespec { + tv_sec: later.tv_sec.checked_sub(earlier.tv_sec)?, + tv_nsec: (later_nsec - earlier_nsec) as i64, + } + } else { + timespec { + tv_sec: later.tv_sec.checked_sub(earlier.tv_sec)?.checked_sub(1)?, + tv_nsec: 1_000_000_000 - (earlier_nsec - later_nsec) as i64, + } + }) + } +} + #[cfg(target_os = "redox")] impl<'a> From<&'a timespec> for syscall::TimeSpec { fn from(tp: ×pec) -> Self { diff --git a/src/sync/cond.rs b/src/sync/cond.rs new file mode 100644 index 00000000..76aa74f9 --- /dev/null +++ b/src/sync/cond.rs @@ -0,0 +1,57 @@ +use crate::header::pthread::*; +use crate::header::bits_pthread::*; +use crate::header::time::timespec; +use crate::pthread::Errno; + +use core::sync::atomic::{AtomicU32 as AtomicUint, Ordering}; + +pub struct Cond { + cur: AtomicUint, + prev: AtomicUint, +} +impl Cond { + pub fn new() -> Self { + Self{ + cur: AtomicUint::new(0), + prev: AtomicUint::new(0), + } + } + fn wake(&self, count: i32) -> Result<(), Errno> { + // This is formally correct as long as we don't have more than u32::MAX threads. + let prev = self.prev.load(Ordering::Relaxed); + self.cur.store(prev.wrapping_add(1), Ordering::Relaxed); + + crate::sync::futex_wake(&self.cur, count); + Ok(()) + } + pub fn broadcast(&self) -> Result<(), Errno> { + self.wake(i32::MAX) + } + pub fn signal(&self) -> Result<(), Errno> { + self.wake(1) + } + // TODO: Safe version using RlctMutexGuard? + pub unsafe fn timedwait(&self, mutex_ptr: *mut pthread_mutex_t, timeout: Option<×pec>) -> Result<(), Errno> { + // TODO: Error checking for certain types (i.e. robust and errorcheck) of mutexes, e.g. if the + // mutex is not locked. + let current = self.cur.load(Ordering::Relaxed); + self.prev.store(current, Ordering::Relaxed); // TODO: ordering? + + pthread_mutex_unlock(mutex_ptr); + + match timeout { + Some(timeout) => { + crate::sync::futex_wait(&self.cur, current, timespec::subtract(*timeout, crate::sync::rttime()).as_ref()); + pthread_mutex_timedlock(mutex_ptr, timespec::subtract(*timeout, crate::sync::rttime()).as_ref().map_or(core::ptr::null(), |r| r as *const timespec)); + } + None => { + crate::sync::futex_wait(&self.cur, current, None); + pthread_mutex_lock(mutex_ptr); + } + } + Ok(()) + } + pub unsafe fn wait(&self, mutex_ptr: *mut pthread_mutex_t) -> Result<(), Errno> { + self.timedwait(mutex_ptr, None) + } +} diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 9e3dd437..0e71f5ee 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -1,4 +1,5 @@ pub mod barrier; +pub mod cond; pub mod mutex; pub mod once; pub mod semaphore; @@ -15,6 +16,7 @@ use crate::{ platform::{types::*, Pal, Sys}, }; use core::{ + mem::MaybeUninit, ops::Deref, sync::atomic::{self, AtomicI32, AtomicU32, AtomicI32 as AtomicInt}, }; @@ -76,6 +78,18 @@ pub fn futex_wake(atomic: &impl FutexAtomicTy, n: i32) -> usize { pub fn futex_wait<T: FutexAtomicTy>(atomic: &T, value: T::Ty, timeout_opt: Option<×pec>) -> bool { unsafe { futex_wait_ptr(atomic.ptr(), value, timeout_opt) } } + +pub fn rttime() -> timespec { + unsafe { + let mut time = MaybeUninit::uninit(); + + // TODO: Handle error + Sys::clock_gettime(crate::header::time::CLOCK_REALTIME, time.as_mut_ptr()); + + time.assume_init() + } +} + pub fn wait_until_generic<F1, F2>(word: &AtomicInt, attempt: F1, mark_long: F2, long: c_int) where F1: Fn(&AtomicInt) -> AttemptStatus, -- GitLab