diff --git a/src/sync/semaphore.rs b/src/sync/semaphore.rs index 28bbe4640e669c5fe3add92dccb0c927e2d058f5..a12ceb17269cbb76485c80a7ea4d26fdb97ed5ca 100644 --- a/src/sync/semaphore.rs +++ b/src/sync/semaphore.rs @@ -13,7 +13,6 @@ pub struct Semaphore { lock: AtomicLock, } -//TODO: fix to use futex again impl Semaphore { pub const fn new(value: c_int) -> Self { Self { @@ -23,33 +22,27 @@ impl Semaphore { pub fn post(&self, count: c_int) { self.lock.fetch_add(count, Ordering::SeqCst); - } - - pub fn try_wait(&self) -> Result<(), ()> { - let mut value = self.lock.load(Ordering::SeqCst); - if value > 0 { - match self.lock.compare_exchange( - value, - value - 1, - Ordering::SeqCst, - Ordering::SeqCst - ) { - Ok(_) => Ok(()), - Err(_) => Err(()) - } - } else { - Err(()) - } + self.lock.notify_all(); } pub fn wait(&self, timeout_opt: Option<×pec>) -> Result<(), ()> { - loop { - match self.try_wait() { - Ok(()) => { - return Ok(()); + let value = self.lock.load(Ordering::SeqCst); + if value > 0 { + match self.lock.compare_exchange( + value, + value - 1, + Ordering::SeqCst, + Ordering::SeqCst + ) { + Ok(_) => { + // Acquired + return Ok(()); + } + Err(_) => () } - Err(()) => () + // Try again (as long as value > 0) + continue; } if let Some(timeout) = timeout_opt { let mut time = timespec::default(); @@ -57,10 +50,26 @@ impl Semaphore { if (time.tv_sec > timeout.tv_sec) || (time.tv_sec == timeout.tv_sec && time.tv_nsec >= timeout.tv_nsec) { + //Timeout happened, return error return Err(()) + } else { + // Use futex to wait for the next change, with a relative timeout + let mut relative = timespec { + tv_sec: timeout.tv_sec, + tv_nsec: timeout.tv_nsec, + }; + while relative.tv_nsec < time.tv_nsec { + relative.tv_sec -= 1; + relative.tv_nsec += 1_000_000_000; + } + relative.tv_sec -= time.tv_sec; + relative.tv_nsec -= time.tv_nsec; + self.lock.wait_if(value, Some(&relative)); } + } else { + // Use futex to wait for the next change, without a timeout + self.lock.wait_if(value, None); } - Sys::sched_yield(); } } }