Skip to content
Snippets Groups Projects
Verified Commit 38f89cb4 authored by Jacob Lorentzon's avatar Jacob Lorentzon
Browse files

Improved barrier implementation.

parent cf34e251
No related branches found
No related tags found
No related merge requests found
...@@ -27,11 +27,10 @@ impl Barrier { ...@@ -27,11 +27,10 @@ impl Barrier {
// waiting. // waiting.
let original_count = self.original_count.get(); let original_count = self.original_count.get();
let mut new = self.waited_count.fetch_add(1, Ordering::Acquire) + 1;
loop { loop {
let new = self.waited_count.fetch_add(1, Ordering::Acquire) + 1; let result = match Ord::cmp(&new, &original_count) {
match Ord::cmp(&new, &original_count) {
cmp::Ordering::Less => { cmp::Ordering::Less => {
// new < original_count, i.e. we were one of the threads that incremented the counter, // new < original_count, i.e. we were one of the threads that incremented the counter,
// but need to continue waiting for the last waiter to notify the others. // but need to continue waiting for the last waiter to notify the others.
...@@ -44,43 +43,39 @@ impl Barrier { ...@@ -44,43 +43,39 @@ impl Barrier {
let _ = crate::sync::futex_wait(&self.waited_count, count, None); let _ = crate::sync::futex_wait(&self.waited_count, count, None);
} }
// When the required number of threads have called pthread_barrier_wait so waited_count WaitResult::Waited
// >= original_count (should never be able to exceed that value), we can safely reset
// the counter to zero.
if self.notified_count.fetch_add(1, Ordering::Relaxed) + 1 >= original_count {
self.waited_count.store(0, Ordering::Relaxed);
}
return WaitResult::Waited;
} }
cmp::Ordering::Equal => { cmp::Ordering::Equal => {
// new == original_count, i.e. we were the one thread doing the last increment, and we // new == original_count, i.e. we were the one thread doing the last increment, and we
// will be responsible for waking up all other waiters. // will be responsible for waking up all other waiters.
crate::sync::futex_wake(&self.waited_count, i32::MAX); crate::sync::futex_wake(&self.waited_count, original_count as i32 - 1);
if self.notified_count.fetch_add(1, Ordering::Relaxed) + 1 >= original_count {
self.waited_count.store(0, Ordering::Relaxed);
}
return WaitResult::NotifiedAll; WaitResult::NotifiedAll
} }
// FIXME: Starvation?
cmp::Ordering::Greater => { cmp::Ordering::Greater => {
let mut cached = new; crate::sync::futex_wait(&self.waited_count, new, None);
while cached >= original_count {
// new > original_count, i.e. we are waiting on a barrier that is already finished, but
// which has not yet awoken all its waiters and re-initialized the self. The
// simplest way to handle this is to wait for waited_count to return to zero, and
// start over.
crate::sync::futex_wait(&self.waited_count, cached, None); new = self.waited_count.load(Ordering::Acquire);
cached = self.waited_count.load(Ordering::Acquire); continue;
} }
};
// When the required number of threads have called pthread_barrier_wait so waited_count
// >= original_count (should never be able to exceed that value), we can safely reset
// the counter to zero.
// TODO: Orderings
if self.notified_count.fetch_add(1, Ordering::AcqRel) + 1 >= original_count {
self.notified_count.store(0, Ordering::Release);
let next = self.waited_count.fetch_sub(original_count, Ordering::Release) - original_count;
if next >= original_count {
let _ = crate::sync::futex_wake(&self.waited_count, original_count as i32);
} }
} }
break result;
} }
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment