diff --git a/src/sync/barrier.rs b/src/sync/barrier.rs index 31095e9ca5368efe52ec0c56b96d9a67f9e09e13..c07c9d3ca3d44946d94bf1079ece97d2a1b1ef9a 100644 --- a/src/sync/barrier.rs +++ b/src/sync/barrier.rs @@ -5,6 +5,7 @@ use core::sync::atomic::{AtomicU32 as AtomicUint, Ordering}; pub struct Barrier { waited_count: AtomicUint, notified_count: AtomicUint, + cycles_count: AtomicUint, original_count: NonZeroU32, } @@ -18,6 +19,7 @@ impl Barrier { Self { waited_count: AtomicUint::new(0), notified_count: AtomicUint::new(0), + cycles_count: AtomicUint::new(0), original_count: count, } } @@ -25,15 +27,16 @@ impl Barrier { // The barrier wait operation can be divided into two parts: (1) incrementing the wait count where // N-1 waiters wait and one notifies the rest, and (2) notifying all threads that have been // waiting. - let original_count = self.original_count.get(); let mut new = self.waited_count.fetch_add(1, Ordering::Acquire) + 1; + let original_cycle_count = self.cycles_count.load(Ordering::Acquire); loop { let result = match Ord::cmp(&new, &original_count) { cmp::Ordering::Less => { - // new < original_count, i.e. we were one of the threads that incremented the counter, - // but need to continue waiting for the last waiter to notify the others. + // new < original_count, i.e. we were one of the threads that incremented the + // counter, and will return without SERIAL_THREAD later, but need to continue + // waiting for the last waiter to notify the others. loop { let count = self.waited_count.load(Ordering::Acquire); @@ -54,28 +57,35 @@ impl Barrier { WaitResult::NotifiedAll } cmp::Ordering::Greater => { - crate::sync::futex_wait(&self.waited_count, new, None); + let mut next_cycle_count; + + loop { + next_cycle_count = self.cycles_count.load(Ordering::Acquire); - new = self.waited_count.load(Ordering::Acquire); + if next_cycle_count != original_cycle_count { + break; + } + crate::sync::futex_wait(&self.cycles_count, next_cycle_count, None); + } + let difference = next_cycle_count.wrapping_sub(original_cycle_count); + + new = new.saturating_sub(difference * original_cycle_count); continue; } }; - // When the required number of threads have called pthread_barrier_wait so waited_count - // >= original_count (should never be able to exceed that value), we can safely reset - // the counter to zero. + if self.notified_count.fetch_add(1, Ordering::AcqRel) + 1 == original_count { + self.notified_count.store(0, Ordering::Relaxed); + // Cycle count can be incremented nonatomically here, as this branch can only be + // reached once until waited_count is decremented again. + self.cycles_count.store(self.cycles_count.load(Ordering::Acquire).wrapping_add(1), Ordering::Release); - // TODO: Orderings - if self.notified_count.fetch_add(1, Ordering::AcqRel) + 1 >= original_count { - self.notified_count.store(0, Ordering::Release); - let next = self.waited_count.fetch_sub(original_count, Ordering::Release) - original_count; + let _ = self.waited_count.fetch_sub(original_count, Ordering::Relaxed); - if next >= original_count { - let _ = crate::sync::futex_wake(&self.waited_count, original_count as i32); - } + let _ = crate::sync::futex_wake(&self.cycles_count, i32::MAX); } - break result; + return result; } } }