Skip to content
Snippets Groups Projects
Verified Commit 1d989d5c authored by Jacob Lorentzon's avatar Jacob Lorentzon
Browse files

Improve barrier.

Maybe it would be better to simply stick to a mutex and condvar, as
libstd does. Optimization is for later anyway.
parent 4353ef33
No related branches found
No related tags found
1 merge request!380Replace pthreads-emb with a native implementation
...@@ -5,6 +5,7 @@ use core::sync::atomic::{AtomicU32 as AtomicUint, Ordering}; ...@@ -5,6 +5,7 @@ use core::sync::atomic::{AtomicU32 as AtomicUint, Ordering};
pub struct Barrier { pub struct Barrier {
waited_count: AtomicUint, waited_count: AtomicUint,
notified_count: AtomicUint, notified_count: AtomicUint,
cycles_count: AtomicUint,
original_count: NonZeroU32, original_count: NonZeroU32,
} }
...@@ -18,6 +19,7 @@ impl Barrier { ...@@ -18,6 +19,7 @@ impl Barrier {
Self { Self {
waited_count: AtomicUint::new(0), waited_count: AtomicUint::new(0),
notified_count: AtomicUint::new(0), notified_count: AtomicUint::new(0),
cycles_count: AtomicUint::new(0),
original_count: count, original_count: count,
} }
} }
...@@ -25,15 +27,16 @@ impl Barrier { ...@@ -25,15 +27,16 @@ impl Barrier {
// The barrier wait operation can be divided into two parts: (1) incrementing the wait count where // The barrier wait operation can be divided into two parts: (1) incrementing the wait count where
// N-1 waiters wait and one notifies the rest, and (2) notifying all threads that have been // N-1 waiters wait and one notifies the rest, and (2) notifying all threads that have been
// waiting. // waiting.
let original_count = self.original_count.get(); let original_count = self.original_count.get();
let mut new = self.waited_count.fetch_add(1, Ordering::Acquire) + 1; let mut new = self.waited_count.fetch_add(1, Ordering::Acquire) + 1;
let original_cycle_count = self.cycles_count.load(Ordering::Acquire);
loop { loop {
let result = match Ord::cmp(&new, &original_count) { let result = match Ord::cmp(&new, &original_count) {
cmp::Ordering::Less => { cmp::Ordering::Less => {
// new < original_count, i.e. we were one of the threads that incremented the counter, // new < original_count, i.e. we were one of the threads that incremented the
// but need to continue waiting for the last waiter to notify the others. // counter, and will return without SERIAL_THREAD later, but need to continue
// waiting for the last waiter to notify the others.
loop { loop {
let count = self.waited_count.load(Ordering::Acquire); let count = self.waited_count.load(Ordering::Acquire);
...@@ -54,28 +57,35 @@ impl Barrier { ...@@ -54,28 +57,35 @@ impl Barrier {
WaitResult::NotifiedAll WaitResult::NotifiedAll
} }
cmp::Ordering::Greater => { cmp::Ordering::Greater => {
crate::sync::futex_wait(&self.waited_count, new, None); let mut next_cycle_count;
loop {
next_cycle_count = self.cycles_count.load(Ordering::Acquire);
new = self.waited_count.load(Ordering::Acquire); if next_cycle_count != original_cycle_count {
break;
}
crate::sync::futex_wait(&self.cycles_count, next_cycle_count, None);
}
let difference = next_cycle_count.wrapping_sub(original_cycle_count);
new = new.saturating_sub(difference * original_cycle_count);
continue; continue;
} }
}; };
// When the required number of threads have called pthread_barrier_wait so waited_count if self.notified_count.fetch_add(1, Ordering::AcqRel) + 1 == original_count {
// >= original_count (should never be able to exceed that value), we can safely reset self.notified_count.store(0, Ordering::Relaxed);
// the counter to zero. // Cycle count can be incremented nonatomically here, as this branch can only be
// reached once until waited_count is decremented again.
self.cycles_count.store(self.cycles_count.load(Ordering::Acquire).wrapping_add(1), Ordering::Release);
// TODO: Orderings let _ = self.waited_count.fetch_sub(original_count, Ordering::Relaxed);
if self.notified_count.fetch_add(1, Ordering::AcqRel) + 1 >= original_count {
self.notified_count.store(0, Ordering::Release);
let next = self.waited_count.fetch_sub(original_count, Ordering::Release) - original_count;
if next >= original_count { let _ = crate::sync::futex_wake(&self.cycles_count, i32::MAX);
let _ = crate::sync::futex_wake(&self.waited_count, original_count as i32);
}
} }
break result; return result;
} }
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment