Skip to content
Snippets Groups Projects
Verified Commit b455e2e3 authored by Jacob Lorentzon's avatar Jacob Lorentzon
Browse files

Misc improvements, move barrier to safe module.

parent 31620310
No related branches found
No related tags found
No related merge requests found
use crate::header::errno::*;
use core::sync::atomic::{AtomicU32 as AtomicUint, AtomicI32 as AtomicInt, Ordering};
use core::num::NonZeroU32;
use crate::sync::barrier::*;
use super::*;
pub(crate) struct RlctBarrier {
pub count: AtomicUint,
pub original_count: c_uint,
pub epoch: AtomicInt,
}
pub(crate) type RlctBarrier = Barrier;
#[derive(Clone, Copy)]
pub(crate) struct RlctBarrierAttr {
pub pshared: c_int,
pshared: c_int,
}
impl Default for RlctBarrierAttr {
fn default() -> Self {
// pshared = PTHREAD_PROCESS_PRIVATE is default according to POSIX.
Self { pshared: PTHREAD_PROCESS_PRIVATE }
}
}
// Not async-signal-safe.
#[no_mangle]
pub unsafe extern "C" fn pthread_barrier_destroy(barrier: *mut pthread_barrier_t) -> c_int {
// Behavior is undefined if any thread is currently waiting.
// Behavior is undefined if any thread is currently waiting when this is called.
// No-op, currently.
core::ptr::drop_in_place(barrier.cast::<RlctBarrier>());
0
}
// Not async-signal-safe.
#[no_mangle]
pub unsafe extern "C" fn pthread_barrier_init(barrier: *mut pthread_barrier_t, attr: *const pthread_barrierattr_t, count: c_uint) -> c_int {
let attr = attr.cast::<RlctBarrierAttr>().as_ref();
let attr = attr.cast::<RlctBarrierAttr>().as_ref().copied().unwrap_or_default();
if count == 0 {
let Some(count) = NonZeroU32::new(count) else {
return EINVAL;
}
};
barrier.cast::<RlctBarrier>().write(RlctBarrier {
count: AtomicUint::new(0),
original_count: count,
epoch: AtomicInt::new(0),
});
barrier.cast::<RlctBarrier>().write(RlctBarrier::new(count));
0
}
fn unlikely(condition: bool) -> bool { condition }
// Not async-signal-safe.
#[no_mangle]
pub unsafe extern "C" fn pthread_barrier_wait(barrier: *mut pthread_barrier_t) -> c_int {
let barrier = &*barrier.cast::<RlctBarrier>();
// TODO: Orderings
let mut cached = barrier.count.load(Ordering::SeqCst);
loop {
let new = if cached == barrier.original_count - 1 { 0 } else { cached + 1 };
match barrier.count.compare_exchange_weak(cached, new, Ordering::SeqCst, Ordering::SeqCst) {
Ok(_) => if new == 0 {
// We reached COUNT waits, and will thus be the thread notifying every other
// waiter.
todo!();
return PTHREAD_BARRIER_SERIAL_THREAD;
} else {
// We increased the wait count, but this was not sufficient. We will thus have to
// wait for the epoch to tick up.
todo!();
return 0;
}
Err(value) => {
cached = value;
core::hint::spin_loop();
}
}
match barrier.wait() {
WaitResult::NotifiedAll => PTHREAD_BARRIER_SERIAL_THREAD,
WaitResult::Waited => 0,
}
}
// Not async-signal-safe.
#[no_mangle]
pub unsafe extern "C" fn pthread_barrierattr_init(attr: *mut pthread_barrierattr_t) -> c_int {
// PTHREAD_PROCESS_PRIVATE is default according to POSIX.
core::ptr::write(attr.cast::<RlctBarrierAttr>(), RlctBarrierAttr { pshared: PTHREAD_PROCESS_PRIVATE });
core::ptr::write(attr.cast::<RlctBarrierAttr>(), RlctBarrierAttr::default());
0
}
// Not async-signal-safe.
#[no_mangle]
pub unsafe extern "C" fn pthread_barrierattr_setpshared(attr: *mut pthread_barrierattr_t, pshared: c_int) -> c_int {
(*attr.cast::<RlctBarrierAttr>()).pshared = pshared;
0
}
// Not async-signal-safe.
#[no_mangle]
pub unsafe extern "C" fn pthread_barrierattr_getpshared(attr: *const pthread_barrierattr_t, pshared: *mut c_int) -> c_int {
core::ptr::write(pshared, (*attr.cast::<RlctBarrierAttr>()).pshared);
0
}
......@@ -7,6 +7,13 @@ use crate::platform::{self, Pal, Sys, types::*};
use crate::header::{sched::*, time::timespec};
use crate::pthread;
fn e(result: Result<(), pthread::Errno>) -> i32 {
match result {
Ok(()) => 0,
Err(pthread::Errno(error)) => error,
}
}
#[derive(Clone, Copy)]
pub(crate) struct RlctAttr {
pub detachstate: c_uchar,
......@@ -25,7 +32,7 @@ pub const PTHREAD_CANCEL_ASYNCHRONOUS: c_int = 0;
pub const PTHREAD_CANCEL_ENABLE: c_int = 1;
pub const PTHREAD_CANCEL_DEFERRED: c_int = 2;
pub const PTHREAD_CANCEL_DISABLE: c_int = 3;
pub const PTHREAD_CANCELED: *mut c_void = core::ptr::null_mut();
pub const PTHREAD_CANCELED: *mut c_void = (!0_usize) as *mut c_void;
pub const PTHREAD_CREATE_DETACHED: c_int = 0;
pub const PTHREAD_CREATE_JOINABLE: c_int = 1;
......@@ -101,19 +108,34 @@ pub unsafe extern "C" fn pthread_exit(retval: *mut c_void) -> ! {
pthread::exit_current_thread(pthread::Retval(retval))
}
// #[no_mangle]
pub extern "C" fn pthread_getconcurrency() -> c_int {
todo!()
#[no_mangle]
pub unsafe extern "C" fn pthread_getconcurrency() -> c_int {
// Redox and Linux threads are 1:1, not M:N.
1
}
// #[no_mangle]
pub extern "C" fn pthread_getcpuclockid(thread: pthread_t, clock: *mut clockid_t) -> c_int {
todo!()
#[no_mangle]
pub unsafe extern "C" fn pthread_getcpuclockid(thread: pthread_t, clock_out: *mut clockid_t) -> c_int {
match pthread::get_cpu_clkid(&*thread.cast()) {
Ok(clock) => {
clock_out.write(clock);
0
}
Err(pthread::Errno(error)) => error,
}
}
// #[no_mangle]
pub extern "C" fn pthread_getschedparam(thread: pthread_t, policy: *mut clockid_t, param: *mut sched_param) -> c_int {
todo!()
#[no_mangle]
pub unsafe extern "C" fn pthread_getschedparam(thread: pthread_t, policy_out: *mut c_int, param_out: *mut sched_param) -> c_int {
match pthread::get_sched_param(&*thread.cast()) {
Ok((policy, param)) => {
policy_out.write(policy);
param_out.write(param);
0
}
Err(pthread::Errno(error)) => error,
}
}
pub mod tls;
......@@ -143,22 +165,40 @@ pub use self::rwlock::*;
pub unsafe extern "C" fn pthread_self() -> pthread_t {
pthread::current_thread().unwrap_unchecked() as *const _ as *mut _
}
pub extern "C" fn pthread_setcancelstate(state: c_int, oldstate: *mut c_int) -> c_int {
todo!();
#[no_mangle]
pub unsafe extern "C" fn pthread_setcancelstate(state: c_int, oldstate: *mut c_int) -> c_int {
match pthread::set_cancel_state(state) {
Ok(old) => {
oldstate.write(old);
0
}
Err(pthread::Errno(error)) => error,
}
}
pub extern "C" fn pthread_setcanceltype(ty: c_int, oldty: *mut c_int) -> c_int {
todo!();
#[no_mangle]
pub unsafe extern "C" fn pthread_setcanceltype(ty: c_int, oldty: *mut c_int) -> c_int {
match pthread::set_cancel_type(ty) {
Ok(old) => {
oldty.write(old);
0
}
Err(pthread::Errno(error)) => error,
}
}
#[no_mangle]
pub extern "C" fn pthread_setconcurrency(concurrency: c_int) -> c_int {
todo!();
// Redox and Linux threads are 1:1, not M:N.
0
}
pub extern "C" fn pthread_setschedparam(thread: pthread_t, policy: c_int, param: *const sched_param) -> c_int {
todo!();
#[no_mangle]
pub unsafe extern "C" fn pthread_setschedparam(thread: pthread_t, policy: c_int, param: *const sched_param) -> c_int {
e(pthread::set_sched_param(&*thread.cast(), policy, &*param))
}
pub extern "C" fn pthread_setschedprio(thread: pthread_t, prio: c_int) -> c_int {
todo!();
#[no_mangle]
pub unsafe extern "C" fn pthread_setschedprio(thread: pthread_t, prio: c_int) -> c_int {
e(pthread::set_sched_priority(&*thread.cast(), prio))
}
pub mod spin;
......
use super::*;
use crate::header::errno::EBUSY;
use crate::header::errno::*;
use core::sync::atomic::AtomicI32 as AtomicInt;
// PTHREAD_MUTEX_INITIALIZER
// PTHREAD_MUTEX_INITIALIZER is defined in bits_pthread/cbindgen.toml
#[repr(u8)]
enum State {
Unlocked,
Locked,
Waiting,
}
// #[no_mangle]
pub extern "C" fn pthread_mutex_consistent(mutex: *mut pthread_mutex_t) -> c_int {
todo!();
pub unsafe extern "C" fn pthread_mutex_consistent(mutex: *mut pthread_mutex_t) -> c_int {
let mutex = &*mutex.cast::<RlctMutex>();
todo!()
}
#[no_mangle]
pub unsafe extern "C" fn pthread_mutex_destroy(mutex: *mut pthread_mutex_t) -> c_int {
let _mutex: &pthread_mutex_t = &*mutex;
let _mutex = &mut *mutex.cast::<RlctMutex>();
0
}
......@@ -23,11 +32,20 @@ pub extern "C" fn pthread_mutex_getprioceiling(mutex: *const pthread_mutex_t, pr
#[no_mangle]
pub unsafe extern "C" fn pthread_mutex_init(mutex: *mut pthread_mutex_t, attr: *const pthread_mutexattr_t) -> c_int {
let attr = attr.cast::<RlctMutexAttr>().as_ref();
let attr = attr.cast::<RlctMutexAttr>().as_ref().copied().unwrap_or_default();
// TODO: attr
mutex.cast::<RlctMutex>().write(RlctMutex {
inner: crate::sync::mutex::UNLOCKED.into(),
/*robust: attr.robust != 0,
ty: match attr.ty {
PTHREAD_MUTEX_DEFAULT => Ty::Def,
PTHREAD_MUTEX_ERRORCHECK => Ty::Errck,
PTHREAD_MUTEX_RECURSIVE => Ty::Recursive,
PTHREAD_MUTEX_NORMAL => Ty::Normal,
_ => return EINVAL,
}*/
});
0
}
......@@ -68,7 +86,10 @@ pub unsafe extern "C" fn pthread_mutex_unlock(mutex: *mut pthread_mutex_t) -> c_
}
#[no_mangle]
pub extern "C" fn pthread_mutexattr_destroy(_attr: *mut pthread_mutexattr_t) -> c_int {
pub unsafe extern "C" fn pthread_mutexattr_destroy(attr: *mut pthread_mutexattr_t) -> c_int {
// No-op
core::ptr::drop_in_place(attr);
0
}
......@@ -102,14 +123,7 @@ pub unsafe extern "C" fn pthread_mutexattr_gettype(attr: *const pthread_mutexatt
}
#[no_mangle]
pub unsafe extern "C" fn pthread_mutexattr_init(attr: *mut pthread_mutexattr_t) -> c_int {
attr.cast::<RlctMutexAttr>().write(RlctMutexAttr {
robust: PTHREAD_MUTEX_STALLED,
pshared: PTHREAD_PROCESS_PRIVATE,
protocol: PTHREAD_PRIO_NONE,
// TODO
prioceiling: 0,
ty: PTHREAD_MUTEX_DEFAULT,
});
attr.cast::<RlctMutexAttr>().write(RlctMutexAttr::default());
0
}
......@@ -144,10 +158,23 @@ pub unsafe extern "C" fn pthread_mutexattr_settype(attr: *mut pthread_mutexattr_
#[repr(C)]
pub(crate) struct RlctMutex {
pub inner: AtomicInt,
// Actual locking word. Allows the states UNLOCKED, LOCKED, and WAITING, a substate of LOCKED.
inner: AtomicInt,
/*robust: bool,
ty: Ty,*/
// TODO: Robust mutexes
}
enum Ty {
Normal,
Def,
Errck,
Recursive,
}
#[repr(C)]
#[derive(Clone, Copy)]
pub(crate) struct RlctMutexAttr {
pub prioceiling: c_int,
pub protocol: c_int,
......@@ -155,4 +182,15 @@ pub(crate) struct RlctMutexAttr {
pub robust: c_int,
pub ty: c_int,
}
impl Default for RlctMutexAttr {
fn default() -> Self {
Self {
robust: PTHREAD_MUTEX_STALLED,
pshared: PTHREAD_PROCESS_PRIVATE,
protocol: PTHREAD_PRIO_NONE,
// TODO
prioceiling: 0,
ty: PTHREAD_MUTEX_DEFAULT,
}
}
}
......@@ -24,7 +24,8 @@ const MAIN_PTHREAD_ID: usize = 1;
pub unsafe fn init() {
let obj = Box::into_raw(Box::new(Pthread {
waitval: Waitval::new(),
wants_cancel: AtomicBool::new(false),
has_enabled_cancelation: AtomicBool::new(false),
has_queued_cancelation: AtomicBool::new(false),
flags: PthreadFlags::empty().bits().into(),
// TODO
......@@ -51,7 +52,8 @@ bitflags::bitflags! {
pub struct Pthread {
waitval: Waitval<Retval>,
wants_cancel: AtomicBool,
has_queued_cancelation: AtomicBool,
has_enabled_cancelation: AtomicBool,
flags: AtomicUsize,
stack_base: *mut c_void,
......@@ -123,7 +125,8 @@ pub(crate) unsafe fn create(attrs: Option<&header::RlctAttr>, start_routine: ext
let pthread = Pthread {
waitval: Waitval::new(),
flags: flags.bits().into(),
wants_cancel: AtomicBool::new(false),
has_enabled_cancelation: AtomicBool::new(false),
has_queued_cancelation: AtomicBool::new(false),
stack_base,
stack_size,
os_tid: UnsafeCell::new(OsTid::default()),
......@@ -211,7 +214,7 @@ unsafe extern "C" fn new_thread_shim(
pub unsafe fn join(thread: &Pthread) -> Result<Retval, Errno> {
// We don't have to return EDEADLK, but unlike e.g. pthread_t lifetime checking, it's a
// relatively easy check.
if core::ptr::eq(thread, current_thread().unwrap_unchecked()) {
if core::ptr::eq(thread, current_thread().expect("current thread not present")) {
return Err(Errno(EDEADLK));
}
......@@ -243,7 +246,9 @@ pub fn current_thread() -> Option<&'static Pthread> {
}
pub unsafe fn testcancel() {
if current_thread().unwrap_unchecked().wants_cancel.load(Ordering::Acquire) {
let this_thread = current_thread().expect("current thread not present");
if this_thread.has_queued_cancelation.load(Ordering::Acquire) && this_thread.has_enabled_cancelation.load(Ordering::Acquire) {
cancel_current_thread();
}
}
......@@ -289,12 +294,65 @@ unsafe fn cancel_current_thread() {
}
pub unsafe fn cancel(thread: &Pthread) -> Result<(), Errno> {
thread.wants_cancel.store(true, Ordering::Release);
// TODO: What order should these atomic bools be accessed in?
thread.has_queued_cancelation.store(true, Ordering::Release);
if thread.has_enabled_cancelation.load(Ordering::Acquire) {
Sys::rlct_kill(thread.os_tid.get().read(), SIGRT_RLCT_CANCEL)?;
}
Sys::rlct_kill(thread.os_tid.get().read(), SIGRT_RLCT_CANCEL)?;
Ok(())
}
pub fn set_sched_param(_thread: &Pthread, _policy: c_int, _param: &sched_param) -> Result<(), Errno> {
// TODO
Ok(())
}
pub fn set_sched_priority(_thread: &Pthread, _prio: c_int) -> Result<(), Errno> {
// TODO
Ok(())
}
pub fn set_cancel_state(state: c_int) -> Result<c_int, Errno> {
let this_thread = current_thread().expect("current thread not present");
let was_cancelable = match state {
header::PTHREAD_CANCEL_ENABLE => {
let old = this_thread.has_enabled_cancelation.swap(true, Ordering::Release);
if this_thread.has_queued_cancelation.load(Ordering::Acquire) {
unsafe { cancel_current_thread(); }
}
old
},
header::PTHREAD_CANCEL_DISABLE => this_thread.has_enabled_cancelation.swap(false, Ordering::Release),
_ => return Err(Errno(EINVAL)),
};
Ok(match was_cancelable {
true => header::PTHREAD_CANCEL_ENABLE,
false => header::PTHREAD_CANCEL_DISABLE,
})
}
pub fn set_cancel_type(ty: c_int) -> Result<c_int, Errno> {
let this_thread = current_thread().expect("current thread not present");
// TODO
match ty {
header::PTHREAD_CANCEL_DEFERRED => (),
header::PTHREAD_CANCEL_ASYNCHRONOUS => (),
_ => return Err(Errno(EINVAL)),
}
Ok(header::PTHREAD_CANCEL_DEFERRED)
}
pub fn get_cpu_clkid(thread: &Pthread) -> Result<clockid_t, Errno> {
// TODO
Err(Errno(ENOENT))
}
pub fn get_sched_param(thread: &Pthread) -> Result<(clockid_t, sched_param), Errno> {
todo!()
}
// TODO: Hash map?
// TODO: RwLock to improve perf?
......
use core::cmp;
use core::num::NonZeroU32;
use core::sync::atomic::{AtomicU32 as AtomicUint, Ordering};
pub struct Barrier {
waited_count: AtomicUint,
notified_count: AtomicUint,
original_count: NonZeroU32,
}
pub enum WaitResult {
Waited,
NotifiedAll,
}
impl Barrier {
pub fn new(count: NonZeroU32) -> Self {
Self {
waited_count: AtomicUint::new(0),
notified_count: AtomicUint::new(0),
original_count: count,
}
}
pub fn wait(&self) -> WaitResult {
// The barrier wait operation can be divided into two parts: (1) incrementing the wait count where
// N-1 waiters wait and one notifies the rest, and (2) notifying all threads that have been
// waiting.
let original_count = self.original_count.get();
loop {
let new = self.waited_count.fetch_add(1, Ordering::Acquire) + 1;
match Ord::cmp(&new, &original_count) {
cmp::Ordering::Less => {
// new < original_count, i.e. we were one of the threads that incremented the counter,
// but need to continue waiting for the last waiter to notify the others.
loop {
let count = self.waited_count.load(Ordering::Acquire);
if count >= original_count { break }
let _ = crate::sync::futex_wait(&self.waited_count, count, None);
}
// When the required number of threads have called pthread_barrier_wait so waited_count
// >= original_count (should never be able to exceed that value), we can safely reset
// the counter to zero.
if self.notified_count.fetch_add(1, Ordering::Relaxed) + 1 >= original_count {
self.waited_count.store(0, Ordering::Relaxed);
}
return WaitResult::Waited;
}
cmp::Ordering::Equal => {
// new == original_count, i.e. we were the one thread doing the last increment, and we
// will be responsible for waking up all other waiters.
crate::sync::futex_wake(&self.waited_count, i32::MAX);
if self.notified_count.fetch_add(1, Ordering::Relaxed) + 1 >= original_count {
self.waited_count.store(0, Ordering::Relaxed);
}
return WaitResult::NotifiedAll;
}
// FIXME: Starvation?
cmp::Ordering::Greater => {
let mut cached = new;
while cached >= original_count {
// new > original_count, i.e. we are waiting on a barrier that is already finished, but
// which has not yet awoken all its waiters and re-initialized the self. The
// simplest way to handle this is to wait for waited_count to return to zero, and
// start over.
crate::sync::futex_wait(&self.waited_count, cached, None);
cached = self.waited_count.load(Ordering::Acquire);
}
}
}
}
}
}
pub mod barrier;
pub mod mutex;
pub mod once;
pub mod semaphore;
......@@ -15,7 +16,7 @@ use crate::{
};
use core::{
ops::Deref,
sync::atomic::{self, AtomicI32 as AtomicInt},
sync::atomic::{self, AtomicI32, AtomicU32, AtomicI32 as AtomicInt},
};
const FUTEX_WAIT: c_int = 0;
......@@ -28,19 +29,52 @@ pub enum AttemptStatus {
Other,
}
pub unsafe fn futex_wake_ptr(ptr: *mut i32, n: i32) -> usize {
pub trait FutexTy {
fn conv(self) -> i32;
}
pub trait FutexAtomicTy {
type Ty: FutexTy;
fn as_mut_ptr(&self) -> *mut Self::Ty;
}
impl FutexTy for u32 {
fn conv(self) -> i32 {
self as i32
}
}
impl FutexTy for i32 {
fn conv(self) -> i32 {
self
}
}
impl FutexAtomicTy for AtomicU32 {
type Ty = u32;
fn as_mut_ptr(&self) -> *mut u32 {
AtomicU32::as_mut_ptr(self)
}
}
impl FutexAtomicTy for AtomicI32 {
type Ty = i32;
fn as_mut_ptr(&self) -> *mut i32 {
AtomicI32::as_mut_ptr(self)
}
}
pub unsafe fn futex_wake_ptr(ptr: *mut impl FutexTy, n: i32) -> usize {
// TODO: unwrap_unchecked?
Sys::futex(ptr, FUTEX_WAKE, n, 0).unwrap() as usize
Sys::futex(ptr.cast(), FUTEX_WAKE, n, 0).unwrap() as usize
}
pub unsafe fn futex_wait_ptr(ptr: *mut i32, value: i32, timeout_opt: Option<&timespec>) -> bool {
pub unsafe fn futex_wait_ptr<T: FutexTy>(ptr: *mut T, value: T, timeout_opt: Option<&timespec>) -> bool {
// TODO: unwrap_unchecked?
Sys::futex(ptr, FUTEX_WAIT, value, timeout_opt.map_or(0, |t| t as *const _ as usize)) == Ok(0)
Sys::futex(ptr.cast(), FUTEX_WAIT, value.conv(), timeout_opt.map_or(0, |t| t as *const _ as usize)) == Ok(0)
}
pub fn futex_wake(atomic: &AtomicInt, n: i32) -> usize {
unsafe { futex_wake_ptr(atomic.as_ptr(), n) }
pub fn futex_wake(atomic: &impl FutexAtomicTy, n: i32) -> usize {
unsafe { futex_wake_ptr(atomic.as_mut_ptr(), n) }
}
pub fn futex_wait(atomic: &AtomicInt, value: i32, timeout_opt: Option<&timespec>) -> bool {
unsafe { futex_wait_ptr(atomic.as_ptr(), value, timeout_opt) }
pub fn futex_wait<T: FutexAtomicTy>(atomic: &T, value: T::Ty, timeout_opt: Option<&timespec>) -> bool {
unsafe { futex_wait_ptr(atomic.as_mut_ptr(), value, timeout_opt) }
}
pub fn wait_until_generic<F1, F2>(word: &AtomicInt, attempt: F1, mark_long: F2, long: c_int)
where
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment