use core::cell::UnsafeCell; use core::convert::TryFrom; use core::intrinsics::unlikely; use core::mem::ManuallyDrop; use core::pin::Pin; use core::sync::atomic::{self, AtomicBool, AtomicU64, AtomicUsize, Ordering}; use core::{cmp, fmt, mem, ptr, task}; use alloc::boxed::Box; use alloc::collections::{BinaryHeap, BTreeMap}; use alloc::sync::{Arc, Weak}; use alloc::vec::Vec; use either::*; use spin::{Mutex, Once}; use crate::event; use crate::io_uring::{HandleSubmissionFuture, RingInfo}; use crate::io_uring::scheme::{Handle, IoUringScheme}; use crate::memory::{self, PhysicalAddress}; use crate::paging::{ActivePageTable, VirtualAddress}; use crate::scheme::ContextOrKernel; use crate::syscall::error::{Error, Result}; use crate::syscall::error::{EBADF, EBADFD, EBUSY, EINVAL}; use crate::syscall::flag::MapFlags; use crate::syscall::io_uring::v1::{ CachePadded, CqEntry32, CqEntry64, IoUringCreateFlags, Priority, SqEntry32, SqEntry64, CQ_ENTRIES_MMAP_OFFSET, CQ_HEADER_MMAP_OFFSET, SQ_ENTRIES_MMAP_OFFSET, SQ_HEADER_MMAP_OFFSET, }; use crate::syscall::io_uring::{IoUringCreateInfo, IoUringVersion}; #[derive(Debug)] pub struct RingHandle { pub(crate) rings: Once, pub(crate) consumer_state: Once, pub(crate) state: Mutex, pub(crate) refcount: AtomicUsize, pub(crate) owner_context: Once, pub(crate) attached_context: Once, pub(crate) pool: Once>, } impl Rings { fn inner_map_phys(info: &RingInfo) -> (PhysicalAddress, PhysicalAddress) { (info.ring_physaddr, info.entries_physaddr) } fn inner_map_virt(info: &RingInfo) -> (VirtualAddress, VirtualAddress) { ( VirtualAddress::new(info.ring.as_ptr() as usize), VirtualAddress::new(info.entries.as_ptr() as usize), ) } pub fn submission_ring_physaddr(&self) -> PhysicalAddress { let (ring_physaddr, _) = self .submission_ring .as_ref() .either(Self::inner_map_phys, Self::inner_map_phys); ring_physaddr } pub fn submission_entries_physaddr(&self) -> PhysicalAddress { let (_, entries_physaddr) = self .submission_ring .as_ref() .either(Self::inner_map_phys, Self::inner_map_phys); entries_physaddr } pub fn completion_ring_physaddr(&self) -> PhysicalAddress { let (ring_physaddr, _) = self .completion_ring .as_ref() .map_left(|(i, _)| i) .map_right(|(i, _)| i) .either(Self::inner_map_phys, Self::inner_map_phys); ring_physaddr } pub fn completion_entries_physaddr(&self) -> PhysicalAddress { let (_, entries_physaddr) = self .completion_ring .as_ref() .map_left(|(i, _)| i) .map_right(|(i, _)| i) .either(Self::inner_map_phys, Self::inner_map_phys); entries_physaddr } pub fn submission_ring_virtaddr(&self) -> VirtualAddress { let (ring_virtaddr, _) = self .submission_ring .as_ref() .either(Self::inner_map_virt, Self::inner_map_virt); ring_virtaddr } pub fn submission_entries_virtaddr(&self) -> VirtualAddress { let (_, entries_virtaddr) = self .submission_ring .as_ref() .either(Self::inner_map_virt, Self::inner_map_virt); entries_virtaddr } pub fn completion_ring_virtaddr(&self) -> VirtualAddress { let (ring_virtaddr, _) = self .completion_ring .as_ref() .map_left(|(i, _)| i) .map_right(|(i, _)| i) .either(Self::inner_map_virt, Self::inner_map_virt); ring_virtaddr } pub fn completion_entries_virtaddr(&self) -> VirtualAddress { let (_, entries_virtaddr) = self .completion_ring .as_ref() .map_left(|(i, _)| i) .map_right(|(i, _)| i) .either(Self::inner_map_virt, Self::inner_map_virt); entries_virtaddr } } impl RingHandle { pub fn runtime_state(&self) -> Option<&RingHandleRuntimeState> { Some(&self.consumer_state.r#try()?.runtime_state) } pub fn submission_entry_count(&self) -> Option { Some(self.runtime_state()?.sq_entry_count) } pub fn completion_entry_count(&self) -> Option { Some(self.runtime_state()?.cq_entry_count) } pub fn sq_entry_size(&self) -> Option { Some( if self .consumer_state.r#try()? .flags .contains(IoUringCreateFlags::BITS_32) { mem::size_of::() } else { mem::size_of::() }, ) } pub fn cq_entry_size(&self) -> Option { Some( if self .consumer_state.r#try()? .flags .contains(IoUringCreateFlags::BITS_32) { mem::size_of::() } else { mem::size_of::() }, ) } pub fn version(&self) -> Option { Some(self.consumer_state.r#try()?.version) } pub fn flags(&self) -> Option { Some(self.consumer_state.r#try()?.flags) } pub fn state(&self) -> RingHandleState { *self.state.lock() } pub fn transition_into_initialized_state(&self, init: &IoUringCreateInfo) -> Result<()> { let flags = IoUringCreateFlags::from_bits(init.flags).ok_or(Error::new(EINVAL))?; let mut state_lock = self.state.lock(); if let &RingHandleState::Initialized { .. } = &*state_lock { return Err(Error::new(EINVAL)); } *state_lock = RingHandleState::Initialized; self.consumer_state.call_once(move || RingHandleConsumerState { runtime_state: RingHandleRuntimeState { sq_entry_count: init.sq_entry_count, cq_entry_count: init.cq_entry_count, last_sq_push_epoch: CachePadded(AtomicUsize::new(0)), last_sq_pop_epoch: CachePadded(AtomicUsize::new(0)), last_cq_push_epoch: CachePadded(AtomicUsize::new(0)), last_cq_pop_epoch: CachePadded(AtomicUsize::new(0)), }, version: init.version, flags, is_owner: true, event_queue: Once::new(), event_seq: AtomicU64::new(0), secondary_rings: Mutex::new(Vec::new()), }); self.refcount.fetch_add(1, Ordering::Relaxed); Ok(()) } pub fn transition_into_attached_state(&self, attached_context: ContextOrKernel) { { let mut state_lock = self.state.lock(); assert_eq!(*state_lock, RingHandleState::Initialized); *state_lock = RingHandleState::Attached; } self.refcount.fetch_add(1, Ordering::Relaxed); assert!(self.attached_context.r#try().is_none()); self.attached_context.call_once(|| attached_context); } pub fn map_mem_kernel( &self, map_offset: usize, map_flags: MapFlags, map_size: usize, ) -> Result<( PhysicalAddress, VirtualAddress, usize, usize, bool, bool, usize, )> { if map_flags.contains(MapFlags::PROT_EXEC) || map_flags.contains(MapFlags::MAP_PRIVATE) { return Err(Error::new(EINVAL)); } let state_lock; let consumer_state: &RingHandleConsumerState = { state_lock = self.state.lock(); match &*state_lock { &RingHandleState::Initialized => { self.consumer_state.r#try().ok_or(Error::new(EBADFD))? } _ => return Err(Error::new(EINVAL)), } }; let (sq_entry_count, cq_entry_count, flags) = ( consumer_state.runtime_state.sq_entry_count, consumer_state.runtime_state.cq_entry_count, consumer_state.flags, ); let mut active_table = unsafe { ActivePageTable::new() }; fn get_ring_addr_pair(info: &RingInfo) -> (PhysicalAddress, *mut u8, bool, usize) { ( info.ring_physaddr, info.ring.as_ptr() as *mut u8, true, mem::size_of::(), ) } fn get_entries_addr_pair(info: &RingInfo) -> (PhysicalAddress, *mut u8, bool, usize) { ( info.entries_physaddr, info.entries.as_ptr() as *mut u8, false, mem::size_of::(), ) } fn get_or_init<'a>(rings_once: &'a Once, active_table: &mut ActivePageTable, sq_entry_count: usize, cq_entry_count: usize, flags: IoUringCreateFlags) -> Result<&'a Rings> { Ok(match rings_once.r#try() { Some(r) => r, None => { // TODO: Use a better spinlock crate (maybe my own, `spinning`), which supports // fallible initialization. Currently, if some other thread were to initialize // the rings simultaneously, one of them would be leaked, since the other // thread wouldn't know that we were initializing it. let rings = IoUringScheme::init_rings( active_table, sq_entry_count, cq_entry_count, flags, )?; rings_once .call_once(|| rings) } }) } let is_sq; let (kernel_physaddr, kernel_virtaddr, is_ring_header, entry_size) = match map_offset { SQ_HEADER_MMAP_OFFSET => { is_sq = true; get_or_init(&self.rings, &mut active_table, sq_entry_count, cq_entry_count, flags)? .submission_ring .as_ref() .either(get_ring_addr_pair, get_ring_addr_pair) } SQ_ENTRIES_MMAP_OFFSET => { is_sq = true; get_or_init(&self.rings, &mut active_table, sq_entry_count, cq_entry_count, flags)? .submission_ring .as_ref() .either(get_entries_addr_pair, get_entries_addr_pair) } CQ_HEADER_MMAP_OFFSET => { is_sq = false; get_or_init(&self.rings, &mut active_table, sq_entry_count, cq_entry_count, flags)? .completion_ring .as_ref() .map_left(|(i, _)| i) .map_right(|(i, _)| i) .either(get_ring_addr_pair, get_ring_addr_pair) } CQ_ENTRIES_MMAP_OFFSET => { is_sq = false; get_or_init(&self.rings, &mut active_table, sq_entry_count, cq_entry_count, flags)? .completion_ring .as_ref() .map_left(|(i, _)| i) .map_right(|(i, _)| i) .either(get_entries_addr_pair, get_entries_addr_pair) } _ => return Err(Error::new(EINVAL)), }; if unlikely(is_ring_header && map_size != memory::PAGE_SIZE) { return Err(Error::new(EINVAL)); } // TODO: Validate entries as well. Ok(( kernel_physaddr, VirtualAddress::new(kernel_virtaddr as usize), sq_entry_count, cq_entry_count, is_ring_header, is_sq, entry_size, )) } } impl RingHandleRuntimeState { /// Returns four booleans which indicate whether ring(s) has been updated. /// /// Order: ((SQ push, SQ pop), (CQ push, CQ pop)). pub fn check_for_update(&self, rings: &Rings) -> ((bool, bool), (bool, bool)) { fn get_ring_epochs(info: &RingInfo) -> (usize, usize) { let ring_ref = unsafe { info.ring.as_ref() }; let push_epoch = ring_ref.push_epoch.load(Ordering::Relaxed); let pull_epoch = ring_ref.pop_epoch.load(Ordering::Relaxed); (push_epoch, pull_epoch) } let (current_sq_push_epoch, current_sq_pop_epoch) = match rings.submission_ring { Either::Left(ref ring) => get_ring_epochs(ring), Either::Right(ref ring) => get_ring_epochs(ring), }; let (current_cq_push_epoch, current_cq_pop_epoch) = match rings.completion_ring { Either::Left((ref ring, _)) => get_ring_epochs(ring), Either::Right((ref ring, _)) => get_ring_epochs(ring), }; // Even though it is slower, we use SeqCst here since there is no synchronization // between different atomic variables otherwise. // // Makes sure that every store to the last_[sc]q-epoch counters happens after the loads // from the rings. // // TODO: Are Acquire-Release fences sufficient here? atomic::fence(Ordering::SeqCst); let prev_sq_push_epoch = self .last_sq_push_epoch .swap(current_sq_push_epoch, Ordering::Relaxed); let prev_sq_pop_epoch = self .last_sq_pop_epoch .swap(current_sq_pop_epoch, Ordering::Relaxed); let prev_cq_push_epoch = self .last_cq_push_epoch .swap(current_cq_push_epoch, Ordering::Relaxed); let prev_cq_pop_epoch = self .last_cq_pop_epoch .swap(current_cq_pop_epoch, Ordering::Relaxed); // TODO: Is this fence also necessary? atomic::fence(Ordering::SeqCst); ( ( prev_sq_push_epoch != current_sq_push_epoch, prev_sq_pop_epoch != current_sq_pop_epoch, ), ( prev_cq_push_epoch != current_cq_push_epoch, prev_cq_pop_epoch != current_cq_pop_epoch, ), ) } } #[derive(Debug)] pub struct Rings { pub(crate) submission_ring: Either, RingInfo>, // TODO: Since we don't longer type erase the submission entries (which are indeed needed by // the futures), the currently only valid combinations of submission and completio types are // 32+32 and 64+64. Maybe this restriction should be loosed (even though it's not possible yet // at the interface level). pub(crate) completion_ring: Either<(RingInfo, Runqueue), (RingInfo, Runqueue)>, } #[cfg(target_pointer_width = "64")] mod types { use core::sync::atomic::AtomicU32; pub type PendingTag = u32; pub type AtomicPendingTag = AtomicU32; pub type FastHandleIdx = u32; } #[cfg(target_pointer_width = "32")] mod types { use core::sync::atomic::AtomicU16; pub type PendingTag = u16; pub type AtomicPendingTag = AtomicU16; pub type FastHandleIdx = u16; } pub use types::*; #[derive(Debug)] pub struct Runqueue { pub(crate) first_vacant_slot: AtomicUsize, pub(crate) last_vacant_slot: AtomicUsize, pub(crate) tasks: Pin>]>>, pub(crate) task_locks: Box<[AtomicUsize]>, pub(crate) ready: Mutex>, pub(crate) pending: Mutex>, pub(crate) next_pending_tag: AtomicPendingTag, pub(crate) tag_overflow: AtomicBool, } unsafe impl Send for Runqueue {} unsafe impl Sync for Runqueue {} pub struct TaskGuard<'runqueue, S: 'static, C: 'static> { index: usize, runqueue: &'runqueue Runqueue, } impl<'runqueue, S, C> TaskGuard<'runqueue, S, C> where S: 'static, C: 'static, { pub fn task_ref(&mut self) -> Option { Some(TaskRef { index: self.index, priority: self.as_mut().as_task_mut()?.priority(), }) } pub fn as_mut<'guard>(&'guard mut self) -> Pin<&'guard mut TaskSlot> { unsafe { let pinned_task_cell = self.runqueue.tasks.as_ref().map_unchecked(|tasks| &tasks[self.index]); let task_cell = pinned_task_cell.get_ref(); let task: &mut TaskSlot = &mut *task_cell.get(); Pin::new_unchecked(task) } } } impl<'runqueue, S: 'static, C: 'static> Drop for TaskGuard<'runqueue, S, C> { fn drop(&mut self) { unsafe { self.runqueue.unlock_task_raw(self.index) } } } #[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct TaskRef { pub index: usize, pub priority: Priority, } #[derive(Debug)] pub enum TaskSlot { Occupied(Task), Vacant { next: usize }, } impl TaskSlot { pub fn as_task_mut(self: Pin<&mut Self>) -> Option>> { unsafe { match self.get_unchecked_mut() { &mut Self::Occupied(ref mut task) => Some(Pin::new_unchecked(task)), &mut Self::Vacant { .. } => None, } } } pub fn as_vacant_next(self: Pin<&mut Self>) -> Option<&mut usize> where S: Unpin, C: Unpin, { match self.get_mut() { &mut Self::Vacant { ref mut next } => Some(next), &mut Self::Occupied(_) => None, } } pub fn get_ref_if_vacant(self: Pin<&mut Self>) -> Option<&mut Self> where S: Unpin, C: Unpin, { match self.get_mut() { // Don't permit occupied entries from being accessed mutably without Pin... &mut Self::Occupied(_) => None, // ... but do allow vacant entries from being accessed, to occupy them this @ &mut Self::Vacant { .. } => Some(this), } } /// Replaces an occupied task with a new orphan vacant entry, safely dropping the future /// inside. pub fn free(self: Pin<&mut Self>) { unsafe { let this = self.get_unchecked_mut(); let this_ptr = this as *mut Self; { let pointer = Pin::new_unchecked(this) .as_task_mut() .expect("expected Task::free to free an occupied entry") .get_unchecked_mut() as *mut Task; // Since the task and the future inside that task is pinned, we need to drop manually // with drop_in_place. ptr::drop_in_place(pointer); } // And, now that it's dropped, we cannot in any way use self again by reference, so we // need to ptr::write the new value. ptr::write(this_ptr, Self::Vacant { next: usize::MAX }); } } } pub struct Task { // TODO: Array of structures of structure of arrays? In other words, the priority separated // from the future? pub(crate) priority: Priority, pub(crate) future: HandleSubmissionFuture, } impl Task where S: 'static, C: 'static, { pub fn as_future_mut(self: Pin<&mut Self>) -> Pin<&mut HandleSubmissionFuture> { unsafe { self.map_unchecked_mut(|this| &mut this.future) } } pub fn as_priority_mut(self: Pin<&mut Self>) -> &mut Priority { unsafe { self.map_unchecked_mut(|this| &mut this.priority).get_mut() } } pub fn priority(self: Pin<&mut Self>) -> Priority { *self.as_priority_mut() } } impl fmt::Debug for Task where S: 'static, C: 'static, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { struct OpaqueStr; impl fmt::Debug for OpaqueStr { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "[opaque future of type `HandleSubmissionFuture`]") } } f.debug_struct("QueueItem") .field("priority", &self.priority) .field("future", &OpaqueStr) .finish() } } impl PartialEq for Task { fn eq(&self, other: &Self) -> bool { self.priority == other.priority } } impl Eq for Task {} impl Ord for Task { fn cmp(&self, other: &Self) -> cmp::Ordering { Ord::cmp(&self.priority, &other.priority) } } impl PartialOrd for Task { fn partial_cmp(&self, other: &Self) -> Option { Some(Ord::cmp(self, other)) } } // The "slow" waking procedure, that works for all handle sizes, but requires storing the waker in // an Arc. pub(crate) mod slow_vtable { use super::*; pub(crate) struct Wrapper(F); impl Wrapper where F: Fn(), { unsafe fn vtable_clone(pointer: *const ()) -> task::RawWaker { let arc_orig = ManuallyDrop::new(Arc::from_raw(pointer as *const F)); let arc_clone = Arc::clone(&arc_orig); mem::forget(arc_orig); task::RawWaker::new(Arc::into_raw(arc_clone) as *const (), &Self::VTABLE) } unsafe fn vtable_wake(pointer: *const ()) { let arc = Arc::from_raw(pointer as *const F); (arc)(); } unsafe fn vtable_wake_by_ref(pointer: *const ()) { let arc_orig = ManuallyDrop::new(Arc::from_raw(pointer as *const F)); (arc_orig)(); mem::forget(arc_orig); } unsafe fn vtable_drop(pointer: *const ()) { let arc = ManuallyDrop::new(Arc::from_raw(pointer as *const F)); drop(ManuallyDrop::into_inner(arc)); } pub(crate) const VTABLE: task::RawWakerVTable = task::RawWakerVTable::new(Self::vtable_clone, Self::vtable_wake, Self::vtable_wake_by_ref, Self::vtable_drop); pub(crate) fn new(f: F) -> task::Waker { unsafe { task::Waker::from_raw(task::RawWaker::new(Arc::into_raw(Arc::new(f)) as *const (), &Self::VTABLE)) } } } } // The "fast" waking producedure, that takes the io_uring handle index as the higher bits, and the // number of the task (TODO: Maybe introduce task groups if waking up multiple futures at once, // where some tasks in the group some may not be able to make progress). // // On a 64-bit system, which the Redox kernel basically only supports, this will result in 32 bits // of the handle number (so 4 billion io_urings in total), and 32 bits for the tag. // // At the moment there is no limit on how many io_urings can be present; therefore, if it were to // exceed 2^32, we'll simply use the slow waker instead. // // As tags may easily overflow if a process reaches 2^32 pending futures, this may allow spurious // wakeup if it were to overflow before a waker gets dropped. Since this doesn't really violate any // logical contracts for futures, this is okay. // // This also limits the number of possible tags to 2^32, but it wouldn't really make sense to have // more pending tasks than that. // // On a 32-bit system however, these numbers would instead be 16-bit each, or 65536 pending tasks // and io_uring handles, respectively. pub(crate) mod fast_vtable { use super::*; #[cfg(target_pointer_width = "64")] pub(crate) fn disassemble_pointer(pointer: usize) -> (u32, u32) { let pointer = pointer as u64; let lo = (pointer & 0xFFFF_FFFF) as u32; let hi = ((pointer >> 32) & 0xFFFF_FFFF) as u32; (lo, hi) } #[cfg(target_pointer_width = "32")] pub(crate) fn disassemble_pointer(pointer: usize) -> (u16, u16) { let pointer = pointer as u32; let lo = (pointer & 0xFFFF) as u16; let hi = ((pointer >> 16) & 0xFFFF) as u16; (lo, hi) } #[cfg(target_pointer_width = "64")] pub(crate) fn assemble_pointer((lo, hi): (u32, u32)) -> usize { let pointer = (u64::from(hi) << 32) | u64::from(lo); pointer as usize } #[cfg(target_pointer_width = "32")] pub(crate) fn assemble_pointer((lo, hi): (u16, u16)) -> usize { let pointer = (u32::from(hi) << 32) | u32::from(lo); pointer as usize } #[cfg_attr(not(any(target_pointer_width = "32", target_pointer_width = "64")), error("invalid pointer width"))] unsafe fn vtable_clone(pointer: *const ()) -> task::RawWaker { task::RawWaker::new(pointer, &VTABLE) } unsafe fn vtable_wake(pointer: *const ()) { vtable_wake_by_ref(pointer) } unsafe fn vtable_wake_by_ref(pointer: *const ()) { let (handle_idx, tag) = disassemble_pointer(pointer as usize); let handle_idx = usize::try_from(handle_idx) .expect("expected handle_idx type to at least be smaller than usize"); let handle = { let handles = crate::io_uring::scheme::handles(); // Note that cloning this may impose higher overhead, but since this frees the handles // early, it reduces lock contention, which helps even more. match handles.get(&handle_idx) { Some(h) => Arc::clone(h), None => return, } }; let rh = match *handle { Handle::Ring(ref rh) => rh, _ => return, }; let rings = match rh.rings.r#try() { Some(r) => r, None => return, }; match rings.completion_ring { Left((_, ref runqueue32)) => waker::handle_either(runqueue32, tag), Right((_, ref runqueue64)) => waker::handle_either(runqueue64, tag), } } unsafe fn vtable_drop(_pointer: *const ()) {} pub(crate) const VTABLE: task::RawWakerVTable = task::RawWakerVTable::new(vtable_clone, vtable_wake, vtable_wake_by_ref, vtable_drop); } pub mod waker { use super::*; pub(crate) fn handle_either(runqueue: &Runqueue, tag: PendingTag) { if let Some(task_ref) = runqueue.pending.lock().remove(&tag) { runqueue.ready.lock().push(task_ref); } } /// Create a waker that uses an Arc under the hood. pub fn slow_waker(handle: Arc, tag: PendingTag) -> task::Waker { slow_vtable::Wrapper::new(move || { let ring_handle = match *handle { Handle::Ring(ref rh) => rh, _ => unreachable!(), }; let rings = ring_handle .rings .r#try() .expect("expected a ring handle that managed to create a waker, to at least have initialized its rings"); match rings.completion_ring { Left((_, ref runqueue)) => handle_either(runqueue, tag), Right((_, ref runqueue)) => handle_either(runqueue, tag), } let owner = &ring_handle .owner_context .r#try() .expect("expected all rings which submissions are being handled by the kernel, to actually be initialized"); match owner { ContextOrKernel::Kernel => (), // TODO: Actually wake up kernel contexts ContextOrKernel::Context(ref context) => { let arc = match context.upgrade() { Some(c) => c, None => return, }; let guard = arc.upgradeable_read(); if guard.ioring_completions_left.load(Ordering::Acquire) > 0 { guard.upgrade().unblock(); } } } }) } pub fn fast_waker(handle_index: usize, tag: PendingTag) -> Option { let handle = FastHandleIdx::try_from(handle_index).ok()?; let pointer = fast_vtable::assemble_pointer((handle, tag)) as *const (); Some(unsafe { task::Waker::from_raw(task::RawWaker::new(pointer, &fast_vtable::VTABLE)) }) } pub fn default_waker(handle_index: usize, tag: PendingTag) -> Result { fast_waker(handle_index, tag) .or_else(move || { let handles = crate::io_uring::scheme::handles(); let handle = Arc::clone(handles.get(&handle_index)?); Some(slow_waker(handle, tag)) }) .ok_or(Error::new(EBADF)) } } pub const TASK_QUOT_MUL: usize = 1; pub const TASK_QUOT_DIV: usize = 4; impl Runqueue { pub fn new(pending_task_count: usize) -> Self where S: Unpin, C: Unpin, { assert_ne!(pending_task_count, usize::MAX); let usize_size = mem::size_of::(); let lock_word_count = (pending_task_count + usize_size - 1) / usize_size * usize_size; Self { first_vacant_slot: AtomicUsize::new(0), last_vacant_slot: AtomicUsize::new(pending_task_count - 1), next_pending_tag: Default::default(), tag_overflow: AtomicBool::new(false), pending: Mutex::new(BTreeMap::new()), ready: Mutex::new(BinaryHeap::new()), // UNOPTIMIZED: Maybe an unsafe cast from Box<[usize]> to Box<[AtomicUsize]>? task_locks: (0..lock_word_count).map(|_| AtomicUsize::new(0)).collect::>().into_boxed_slice(), tasks: Pin::new((1..=pending_task_count).map(|next_index| { let next = if next_index == pending_task_count { usize::MAX } else { next_index }; TaskSlot::Vacant { next } }).map(UnsafeCell::new).collect::>().into_boxed_slice()), } } pub fn next_tag(&self) -> Result { let has_overflown = self.tag_overflow.load(Ordering::Acquire); if !has_overflown { let next_tag = self.next_pending_tag.fetch_add(1, Ordering::Acquire); if next_tag == PendingTag::MAX { self.tag_overflow.store(true, Ordering::Release); } Ok(next_tag) } else { let guard = self.pending.lock(); let mut last_tag_opt = None; // This is the most likely scenario, that the tags have overflown, but that the initial // tag has been dropped for a long time. if !guard.contains_key(&0) { return Ok(0); } // Otherwise, we try to find a tag that isn't already occpuied, trying at least 64 // times before failing eith EBUSY. const MAX_MISSES: usize = 64; for tag in guard.keys().copied().take(MAX_MISSES) { let last_tag = match last_tag_opt { Some(t) => t, None => { last_tag_opt = Some(tag); continue; } }; if tag == last_tag + 1 { // The next tag is adjacent, there is no hole that we can use. last_tag_opt = Some(tag); continue; } else { return Ok(last_tag + 1) } } return Err(Error::new(EBUSY)); } } #[inline] pub fn try_lock_task_raw(&self, at: usize) -> bool { assert_ne!(at, usize::MAX); let byte_index = at / mem::size_of::(); let bit_pos = at % mem::size_of::(); let bit = 1 << bit_pos; let prev = self.task_locks[byte_index].fetch_or(bit, Ordering::Acquire); let already_locked = prev & bit == bit; !already_locked } #[inline] pub fn lock_task_raw(&self, at: usize) { assert_ne!(at, usize::MAX); while !self.try_lock_task_raw(at) { core::sync::atomic::spin_loop_hint(); } } #[inline] pub unsafe fn unlock_task_raw(&self, at: usize) { assert_ne!(at, usize::MAX); let byte_index = at / mem::size_of::(); let bit_pos = at % mem::size_of::(); let bit = 1 << bit_pos; let prev = self.task_locks[byte_index].fetch_and(!bit, Ordering::Release); assert_eq!(prev & bit, bit, "attempting to release a task lock at {} that wasn't locked", at); } #[inline] pub fn try_lock_task<'runqueue>(&'runqueue self, at: usize) -> Option> { assert_ne!(at, usize::MAX); assert!(at < self.tasks.as_ref().get_ref().len()); if self.try_lock_task_raw(at) { Some(TaskGuard { index: at, runqueue: self, }) } else { None } } #[inline] pub fn lock_task<'runqueue>(&'runqueue self, at: usize) -> TaskGuard<'runqueue, S, C> { assert_ne!(at, usize::MAX); assert!(at < self.tasks.as_ref().get_ref().len()); self.lock_task_raw(at); TaskGuard { index: at, runqueue: self, } } #[inline] pub fn lock_task_ref<'runqueue>(&'runqueue self, task_ref: TaskRef) -> TaskGuard<'runqueue, S, C> { self.lock_task(task_ref.index) } fn index_from_raw(index: usize) -> Option { if index == usize::MAX { Some(index) } else{ None } } fn index_to_raw(index: Option) -> usize { match index { Some(i) => { assert_ne!(i, usize::MAX); i } None => usize::MAX, } } fn allocate_new_task_slot_inner(&self, current: usize) -> bool where S: Unpin, C: Unpin, { let mut current_slot = self.lock_task(current); let current_slot_next_raw = current_slot .as_mut() .as_vacant_next() .expect("expected the embedded free list to only point to vacant entries"); let current_slot_next = match Self::index_from_raw(*current_slot_next_raw) { Some(c) => c, None => return false, }; debug_assert!( self.lock_task(current_slot_next) .as_mut() .as_vacant_next() .is_some() ); match self.first_vacant_slot.compare_exchange_weak(current, current_slot_next, Ordering::Acquire, Ordering::Relaxed) { Ok(_) => { *current_slot_next_raw = Self::index_to_raw(None); return true; } Err(newer) => { drop(current_slot); return self.allocate_new_task_slot_inner(newer); } } } pub fn allocate_new_task_slot<'runqueue>(&'runqueue self) -> Option> where S: Unpin, C: Unpin, { let initial = Self::index_from_raw(self.first_vacant_slot.load(Ordering::Acquire))?; let index = match self.allocate_new_task_slot_inner(initial) { true => initial, false => return None, }; Some(self.lock_task(index)) } pub fn free_task_slot<'runqueue>(&'runqueue self, at: usize) where S: Unpin, C: Unpin, { debug_assert!( self.lock_task(at) .as_mut() .as_vacant_next() .is_some() ); let last_index = self.last_vacant_slot.load(Ordering::Acquire); let mut last_task_slot = self.lock_task(last_index); *last_task_slot .as_mut() .as_vacant_next() .expect("expected last vacant entry not to be occupied") = at; match self.last_vacant_slot.compare_exchange_weak(last_index, at, Ordering::Acquire, Ordering::Relaxed) { Ok(_) => return, Err(newer) => { assert_ne!(newer, at); self.free_task_slot(at); } } } } #[derive(Debug)] pub(crate) struct SecondaryRingRef { pub(crate) ring_handle: Weak, pub(crate) read: bool, pub(crate) write: bool, pub(crate) fd_for_consumer: usize, pub(crate) user_data: u64, } #[derive(Debug)] pub struct RingHandleRuntimeState { pub(crate) sq_entry_count: usize, pub(crate) cq_entry_count: usize, pub(crate) last_sq_push_epoch: CachePadded, pub(crate) last_sq_pop_epoch: CachePadded, pub(crate) last_cq_push_epoch: CachePadded, pub(crate) last_cq_pop_epoch: CachePadded, } #[derive(Debug)] pub struct RingHandleConsumerState { pub(crate) version: IoUringVersion, pub(crate) flags: IoUringCreateFlags, pub(crate) is_owner: bool, pub(crate) event_queue: Once, pub(crate) event_seq: AtomicU64, pub(crate) secondary_rings: Mutex>, pub(crate) runtime_state: RingHandleRuntimeState, } impl RingHandleConsumerState { pub fn event_queue_or_init(&self) -> event::EventQueueId { fn init_event_queue() -> event::EventQueueId { let qid = event::next_queue_id(); event::queues_mut().insert(qid, Arc::new(event::EventQueue::new(qid))); qid } *self.event_queue.call_once(init_event_queue) } } #[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] pub enum RingHandleState { /// The io_uring has been opened as a file, and is in its initial state. Start, /// The io_uring has been initialized and is ready to be attached. Initialized, /// The io_uring has been attached to a scheme, and is now ready to be mmapped and used. Attached, }