From 38f89cb4bb0b5fb5390e859ade1b98484bebbb24 Mon Sep 17 00:00:00 2001 From: 4lDO2 <4lDO2@protonmail.com> Date: Thu, 13 Apr 2023 15:14:38 +0200 Subject: [PATCH] Improved barrier implementation. --- src/sync/barrier.rs | 51 ++++++++++++++++++++------------------------- 1 file changed, 23 insertions(+), 28 deletions(-) diff --git a/src/sync/barrier.rs b/src/sync/barrier.rs index b17786e24..31095e9ca 100644 --- a/src/sync/barrier.rs +++ b/src/sync/barrier.rs @@ -27,11 +27,10 @@ impl Barrier { // waiting. let original_count = self.original_count.get(); + let mut new = self.waited_count.fetch_add(1, Ordering::Acquire) + 1; loop { - let new = self.waited_count.fetch_add(1, Ordering::Acquire) + 1; - - match Ord::cmp(&new, &original_count) { + let result = match Ord::cmp(&new, &original_count) { cmp::Ordering::Less => { // new < original_count, i.e. we were one of the threads that incremented the counter, // but need to continue waiting for the last waiter to notify the others. @@ -44,43 +43,39 @@ impl Barrier { let _ = crate::sync::futex_wait(&self.waited_count, count, None); } - // When the required number of threads have called pthread_barrier_wait so waited_count - // >= original_count (should never be able to exceed that value), we can safely reset - // the counter to zero. - - if self.notified_count.fetch_add(1, Ordering::Relaxed) + 1 >= original_count { - self.waited_count.store(0, Ordering::Relaxed); - } - - return WaitResult::Waited; + WaitResult::Waited } cmp::Ordering::Equal => { // new == original_count, i.e. we were the one thread doing the last increment, and we // will be responsible for waking up all other waiters. - crate::sync::futex_wake(&self.waited_count, i32::MAX); - - if self.notified_count.fetch_add(1, Ordering::Relaxed) + 1 >= original_count { - self.waited_count.store(0, Ordering::Relaxed); - } + crate::sync::futex_wake(&self.waited_count, original_count as i32 - 1); - return WaitResult::NotifiedAll; + WaitResult::NotifiedAll } - // FIXME: Starvation? cmp::Ordering::Greater => { - let mut cached = new; - while cached >= original_count { - // new > original_count, i.e. we are waiting on a barrier that is already finished, but - // which has not yet awoken all its waiters and re-initialized the self. The - // simplest way to handle this is to wait for waited_count to return to zero, and - // start over. + crate::sync::futex_wait(&self.waited_count, new, None); - crate::sync::futex_wait(&self.waited_count, cached, None); + new = self.waited_count.load(Ordering::Acquire); - cached = self.waited_count.load(Ordering::Acquire); - } + continue; + } + }; + + // When the required number of threads have called pthread_barrier_wait so waited_count + // >= original_count (should never be able to exceed that value), we can safely reset + // the counter to zero. + + // TODO: Orderings + if self.notified_count.fetch_add(1, Ordering::AcqRel) + 1 >= original_count { + self.notified_count.store(0, Ordering::Release); + let next = self.waited_count.fetch_sub(original_count, Ordering::Release) - original_count; + + if next >= original_count { + let _ = crate::sync::futex_wake(&self.waited_count, original_count as i32); } } + break result; } } } -- GitLab