Verified Commit 1fd7d5e3 authored by 4lDO2's avatar 4lDO2 🖖

Almost get pinned tasks to work.

parent 84d39956
use core::cell::UnsafeCell;
use core::convert::TryFrom;
use core::future::Future;
use core::intrinsics::unlikely;
use core::mem::ManuallyDrop;
use core::pin::Pin;
use core::sync::atomic::{self, AtomicBool, AtomicU64, AtomicUsize, Ordering};
use core::{cmp, fmt, mem, task};
use core::{cmp, fmt, mem, ptr, task};
use alloc::boxed::Box;
use alloc::collections::{BinaryHeap, BTreeMap};
......@@ -404,80 +404,191 @@ pub struct Rings {
pub(crate) completion_ring: Either<(RingInfo<CqEntry32>, Runqueue<SqEntry32, CqEntry32>), (RingInfo<CqEntry64>, Runqueue<SqEntry64, CqEntry64>)>,
}
pub struct QueueItem<S: 'static, C: 'static> {
#[cfg(target_pointer_width = "64")]
mod types {
use core::sync::atomic::AtomicU32;
pub type PendingTag = u32;
pub type AtomicPendingTag = AtomicU32;
pub type FastHandleIdx = u32;
}
#[cfg(target_pointer_width = "32")]
mod types {
use core::sync::atomic::AtomicU16;
pub type PendingTag = u16;
pub type AtomicPendingTag = AtomicU16;
pub type FastHandleIdx = u16;
}
pub use types::*;
#[derive(Debug)]
pub struct Runqueue<S: 'static, C: 'static> {
pub(crate) first_vacant_slot: AtomicUsize,
pub(crate) last_vacant_slot: AtomicUsize,
pub(crate) tasks: Pin<Box<[UnsafeCell<TaskSlot<S, C>>]>>,
pub(crate) task_locks: Box<[AtomicUsize]>,
pub(crate) ready: Mutex<BinaryHeap<TaskRef>>,
pub(crate) pending: Mutex<BTreeMap<PendingTag, TaskRef>>,
pub(crate) next_pending_tag: AtomicPendingTag,
pub(crate) tag_overflow: AtomicBool,
}
unsafe impl<S: Send + Sync + 'static, C: Send + Sync + 'static> Send for Runqueue<S, C> {}
unsafe impl<S: Send + Sync + 'static, C: Send + Sync + 'static> Sync for Runqueue<S, C> {}
pub struct TaskGuard<'runqueue, S: 'static, C: 'static> {
index: usize,
runqueue: &'runqueue Runqueue<S, C>,
}
impl<'runqueue, S, C> TaskGuard<'runqueue, S, C>
where
S: 'static,
C: 'static,
{
pub fn task_ref(&self) -> Option<TaskRef> {
Some(TaskRef {
index: self.index,
priority: self.as_mut().as_task_mut()?.priority(),
})
}
pub fn as_mut<'guard>(&'guard mut self) -> Pin<&'guard mut TaskSlot<S, C>> {
unsafe {
let pinned_task_cell = self.runqueue.tasks.as_ref().map_unchecked(|tasks| &tasks[self.index]);
let task_cell = pinned_task_cell.get_ref();
let task: &mut TaskSlot<S, C> = &mut *task_cell.get();
Pin::new_unchecked(task)
}
}
}
impl<'runqueue, S: 'static, C: 'static> Drop for TaskGuard<'runqueue, S, C> {
fn drop(&mut self) {
unsafe { self.runqueue.unlock_task_raw(self.index) }
}
}
#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
pub struct TaskRef {
pub index: usize,
pub priority: Priority,
}
#[derive(Debug)]
pub enum TaskSlot<S: 'static, C: 'static> {
Occupied(Task<S, C>),
Vacant { next: usize },
}
impl<S: 'static, C: 'static> TaskSlot<S, C> {
pub fn as_task_mut(self: Pin<&mut Self>) -> Option<Pin<&mut Task<S, C>>> {
unsafe {
match self.get_unchecked_mut() {
&mut Self::Occupied(ref mut task) => Some(Pin::new_unchecked(task)),
&mut Self::Vacant { .. } => None,
}
}
}
pub fn as_vacant_next(self: Pin<&mut Self>) -> Option<&mut usize> {
unsafe {
match self.get_mut() {
&mut Self::Vacant { ref mut next } => Some(next),
&mut Self::Occupied(_) => None,
}
}
}
pub fn get_ref_if_vacant(self: Pin<&mut Self>) -> Option<&mut Self> {
unsafe {
match self.get_mut() {
// Don't permit occupied entries from being accessed mutably without Pin...
&mut Self::Occupied(_) => None,
// ... but do allow vacant entries from being accessed, to occupy them
this @ &mut Self::Vacant { .. } => Some(this),
}
}
}
/// Replaces an occupied task with a new orphan vacant entry, safely dropping the future
/// inside.
pub fn free(self: Pin<&mut Self>) {
unsafe {
let pointer = self
.as_task_mut()
.expect("expected Task::free to free an occupied entry")
.get_unchecked_mut() as *mut Task<S, C>;
// Since the task and the future inside that task is pinned, we need to drop manually
// with drop_in_place.
ptr::drop_in_place(pointer);
// And, now that it's dropped, we cannot in any way use self again by reference, so we
// need to ptr::write the new value.
ptr::write(self.get_unchecked_mut() as *mut Self, Self::Vacant { next: usize::MAX });
}
}
}
pub struct Task<S: 'static, C: 'static> {
// TODO: Array of structures of structure of arrays? In other words, the priority separated
// from the future?
pub(crate) priority: Priority,
// TODO: Now that the compiler can make some addition crazy optimizations since there is no
// longer a slow vtable, we might also consider storing these queue items in a pinned (possibly
// fixed size array, or a segmented unrolled linked list or indirect array (array of arrays))
// collections, and only pass the indices around in the ready and pending queues. That would
// remove even another level of indirection!
pub(crate) future: Pin<Box<HandleSubmissionFuture<S, C>>>,
pub(crate) future: HandleSubmissionFuture<S, C>,
}
impl<S, C> Task<S, C>
where
S: 'static,
C: 'static,
{
pub fn as_future_mut(self: Pin<&mut Self>) -> Pin<&mut HandleSubmissionFuture<S, C>> {
unsafe { self.map_unchecked_mut(|this| &mut this.future) }
}
pub fn as_priority_mut(self: Pin<&mut Self>) -> &mut Priority {
unsafe { self.map_unchecked_mut(|this| &mut this.priority).get_mut() }
}
pub fn priority(self: Pin<&mut Self>) -> Priority {
*self.as_priority_mut()
}
}
impl<S, C> fmt::Debug for QueueItem<S, C>
impl<S, C> fmt::Debug for Task<S, C>
where
S: 'static,
C: 'static,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let pointer = self.future.as_ref().get_ref() as *const _;
struct OpaqueStr;
impl fmt::Debug for OpaqueStr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "[opaque future of type `HandleSubmissionFuture`]")
}
}
f.debug_struct("QueueItem")
.field("priority", &self.priority)
.field("future", &pointer)
.field("future", &OpaqueStr)
.finish()
}
}
impl<S, C> PartialEq for QueueItem<S, C> {
impl<S, C> PartialEq for Task<S, C> {
fn eq(&self, other: &Self) -> bool {
self.priority == other.priority
}
}
impl<S, C> Eq for QueueItem<S, C> {}
impl<S, C> Eq for Task<S, C> {}
impl<S, C> Ord for QueueItem<S, C> {
impl<S, C> Ord for Task<S, C> {
fn cmp(&self, other: &Self) -> cmp::Ordering {
Ord::cmp(&self.priority, &other.priority)
}
}
impl<S, C> PartialOrd for QueueItem<S, C> {
impl<S, C> PartialOrd for Task<S, C> {
fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
Some(Ord::cmp(self, other))
}
}
#[cfg(target_pointer_width = "64")]
mod types {
use super::*;
use core::sync::atomic::AtomicU32;
pub type PendingTag = u32;
pub type AtomicPendingTag = AtomicU32;
pub type FastHandleIdx = u32;
}
#[cfg(target_pointer_width = "32")]
mod types {
use super::*;
use core::sync::atomic::AtomicU16;
pub type PendingTag = u16;
pub type AtomicPendingTag = AtomicU16;
pub type FastHandleIdx = u16;
}
pub use types::*;
#[derive(Debug)]
pub struct Runqueue<S: 'static, C: 'static> {
pub(crate) ready_futures: Mutex<BinaryHeap<QueueItem<S, C>>>,
pub(crate) pending_futures: Mutex<BTreeMap<PendingTag, QueueItem<S, C>>>,
pub(crate) next_pending_tag: AtomicPendingTag,
pub(crate) tag_overflow: AtomicBool,
}
// The "slow" waking procedure, that works for all handle sizes, but requires storing the waker in
// an Arc.
......@@ -614,8 +725,8 @@ pub mod waker {
use super::*;
pub(crate) fn handle_either<S, C>(runqueue: &Runqueue<S, C>, tag: PendingTag) {
if let Some(item) = runqueue.pending_futures.lock().remove(&tag) {
runqueue.ready_futures.lock().push(item);
if let Some(task_ref) = runqueue.pending.lock().remove(&tag) {
runqueue.ready.lock().push(task_ref);
}
}
/// Create a waker that uses an Arc under the hood.
......@@ -672,7 +783,37 @@ pub mod waker {
.ok_or(Error::new(EBADF))
}
}
pub const TASK_QUOT_MUL: usize = 1;
pub const TASK_QUOT_DIV: usize = 4;
impl<S, C> Runqueue<S, C> {
pub fn new(pending_task_count: usize) -> Self {
assert_ne!(pending_task_count, usize::MAX);
let usize_size = mem::size_of::<usize>();
let lock_word_count = (pending_task_count + usize_size - 1) / usize_size * usize_size;
Self {
first_vacant_slot: AtomicUsize::new(0),
last_vacant_slot: AtomicUsize::new(pending_task_count - 1),
next_pending_tag: Default::default(),
tag_overflow: AtomicBool::new(false),
pending: Mutex::new(BTreeMap::new()),
ready: Mutex::new(BinaryHeap::new()),
// UNOPTIMIZED: Maybe an unsafe cast from Box<[usize]> to Box<[AtomicUsize]>?
task_locks: (0..lock_word_count).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>().into_boxed_slice(),
tasks: Pin::new((1..=pending_task_count).map(|next_index| {
let next = if next_index == pending_task_count {
usize::MAX
} else {
next_index
};
TaskSlot::Vacant { next }
}).map(UnsafeCell::new).collect::<Vec<_>>().into_boxed_slice()),
}
}
pub fn next_tag(&self) -> Result<PendingTag> {
let has_overflown = self.tag_overflow.load(Ordering::Acquire);
......@@ -683,7 +824,7 @@ impl<S, C> Runqueue<S, C> {
}
Ok(next_tag)
} else {
let guard = self.pending_futures.lock();
let guard = self.pending.lock();
let mut last_tag_opt = None;
......@@ -717,6 +858,144 @@ impl<S, C> Runqueue<S, C> {
return Err(Error::new(EBUSY));
}
}
#[inline]
pub fn try_lock_task_raw(&self, at: usize) -> bool {
assert_ne!(at, usize::MAX);
let byte_index = at / mem::size_of::<usize>();
let bit_pos = at % mem::size_of::<usize>();
let bit = 1 << bit_pos;
let prev = self.task_locks[byte_index].fetch_or(bit, Ordering::Acquire);
let already_locked = prev & bit == bit;
!already_locked
}
#[inline]
pub fn lock_task_raw(&self, at: usize) {
assert_ne!(at, usize::MAX);
while !self.try_lock_task_raw(at) {
core::sync::atomic::spin_loop_hint();
}
}
#[inline]
pub unsafe fn unlock_task_raw(&self, at: usize) {
assert_ne!(at, usize::MAX);
let byte_index = at / mem::size_of::<usize>();
let bit_pos = at % mem::size_of::<usize>();
let bit = 1 << bit_pos;
let prev = self.task_locks[byte_index].fetch_and(!bit, Ordering::Release);
assert_eq!(prev & bit, bit, "attempting to release a task lock at {} that wasn't locked", at);
}
#[inline]
pub fn try_lock_task<'runqueue>(&'runqueue self, at: usize) -> Option<TaskGuard<'runqueue, S, C>> {
assert_ne!(at, usize::MAX);
assert!(at < self.tasks.as_ref().get_ref().len());
if self.try_lock_task_raw(at) {
Some(TaskGuard {
index: at,
runqueue: self,
})
} else {
None
}
}
#[inline]
pub fn lock_task<'runqueue>(&'runqueue self, at: usize) -> TaskGuard<'runqueue, S, C> {
assert_ne!(at, usize::MAX);
assert!(at < self.tasks.as_ref().get_ref().len());
self.lock_task_raw(at);
TaskGuard {
index: at,
runqueue: self,
}
}
#[inline]
pub fn lock_task_ref<'runqueue>(&'runqueue self, task_ref: TaskRef) -> TaskGuard<'runqueue, S, C> {
self.lock_task(task_ref.index)
}
fn index_from_raw(index: usize) -> Option<usize> {
if index == usize::MAX {
Some(index)
} else{
None
}
}
fn index_to_raw(index: Option<usize>) -> usize {
match index {
Some(i) => {
assert_ne!(i, usize::MAX);
i
}
None => usize::MAX,
}
}
fn allocate_new_task_slot_inner(&self, current: usize) -> bool {
let current_slot = self.lock_task(current);
let current_slot_next_raw = current_slot
.as_mut()
.as_vacant_next()
.expect("expected the embedded free list to only point to vacant entries");
let current_slot_next = match Self::index_from_raw(*current_slot_next_raw) {
Some(c) => c,
None => return false,
};
debug_assert!(
self.lock_task(current_slot_next)
.as_mut()
.as_vacant_next()
.is_some()
);
match self.first_vacant_slot.compare_exchange_weak(current, current_slot_next, Ordering::Acquire, Ordering::Relaxed) {
Ok(_) => {
*current_slot_next_raw = Self::index_to_raw(None);
return true;
}
Err(newer) => {
drop(current_slot);
return self.allocate_new_task_slot_inner(newer);
}
}
}
pub fn allocate_new_task_slot<'runqueue>(&'runqueue self) -> Option<TaskGuard<'runqueue, S, C>> {
let initial = Self::index_from_raw(self.first_vacant_slot.load(Ordering::Acquire))?;
let index = match self.allocate_new_task_slot_inner(initial) {
true => initial,
false => return None,
};
Some(self.lock_task(index))
}
pub fn free_task_slot<'runqueue>(&'runqueue self, at: usize) {
debug_assert!(
self.lock_task(at)
.as_mut()
.as_vacant_next()
.is_some()
);
let last_index = self.last_vacant_slot.load(Ordering::Acquire);
let last_task_slot = self.lock_task(last_index);
*last_task_slot
.as_mut()
.as_vacant_next()
.expect("expected last vacant entry not to be occupied") = at;
match self.last_vacant_slot.compare_exchange_weak(last_index, at, Ordering::Acquire, Ordering::Relaxed) {
Ok(_) => return,
Err(newer) => {
assert_ne!(newer, at);
self.free_task_slot(at);
}
}
}
}
#[derive(Debug)]
......
use core::convert::{TryFrom, TryInto};
use core::future::Future;
use core::intrinsics::unlikely;
use core::pin::Pin;
use core::ptr::NonNull;
use core::sync::atomic::Ordering;
use core::{fmt, iter, mem, ptr, task};
use alloc::boxed::Box;
use alloc::sync::{Arc, Weak};
use alloc::vec::Vec;
use either::Either;
use spin::RwLock;
......@@ -22,28 +19,24 @@ use crate::syscall::io_uring::IoUringEnterFlags;
use crate::syscall::data::{Event, IoVec, Map2};
use crate::syscall::error::{Error, Result};
use crate::syscall::flag::{EventFlags, MapFlags, O_CLOEXEC, O_SYMLINK};
use crate::syscall::flag::{EventFlags, MapFlags};
use syscall::error::{
EBADF, EBADFD, EFAULT,
EINVAL, EMFILE, ENODEV, ENOMEM, ENOSPC, ENOSYS, EOVERFLOW, ESHUTDOWN,
ESRCH,
EBADF, EBADFD, EFAULT, EINVAL, ENODEV, ENOMEM, ENOSPC, ENOSYS, EOVERFLOW, ESHUTDOWN, ESRCH,
};
pub mod handle;
pub mod scheme;
use self::handle::{PendingTag, QueueItem, RingHandle, RingHandleConsumerState, RingHandleRuntimeState, Runqueue, SecondaryRingRef};
use self::scheme::{Handle, get_handle, get_handle_and_local, IoUringScheme, is_io_uring_scheme};
use self::handle::{PendingTag, RingHandle, RingHandleConsumerState, Runqueue, SecondaryRingRef, Task, TaskGuard, TaskRef, TaskSlot};
use self::scheme::{Handle, get_handle, get_handle_and_local, IoUringScheme};
use crate::context::file::{FileDescription, FileDescriptor};
use crate::context::{self, Context};
use crate::event;
use crate::memory::{self, Frame, PhysicalAddress};
use crate::paging::entry::EntryFlags as PtEntryFlags;
use crate::paging::{ActivePageTable, Page, VirtualAddress};
use crate::scheme::{ContextOrKernel, SchemeInfo};
use crate::sync::wait_condition;
use crate::syscall::scheme::async_scheme::{AsyncScheme, AsyncSchemeExt as _};
use crate::syscall::scheme::iter as consuming_iter;
use crate::syscall::{
......@@ -1122,6 +1115,146 @@ fn register_event_interest(handle: &RingHandle, context: Arc<RwLock<Context>>) -
Ok(())
}
fn poll_or_insert_into_pending_map<S, C>(queue: &Runqueue<S, C>, task_guard: TaskGuard<S, C>, context: &mut task::Context, tag: PendingTag) -> task::Poll<Result<C>>
where
S: 'static,
C: 'static,
{
let task = task_guard
.as_mut()
.as_task_mut()
.expect("expected task guard passed into poll_or_insert_into_pending_map to contain a task");
let task_ref = task_guard
.task_ref()
.expect("expected task guard to be occupied with a pending task");
match task.as_future_mut().poll(context) {
task::Poll::Ready(cqe) => {
// Drop the future and replace the task slot with an unlinked vacant entry.
task_guard.as_mut().free();
// Make sure that there won't be any deadlock when freeing the task.
drop(task_guard);
// And finally, link the vacant entry, making it reallocatable.
queue.free_task_slot(task_ref.index);
task::Poll::Ready(cqe)
}
task::Poll::Pending => {
let prev = queue.pending.lock().insert(tag, task_ref);
assert!(prev.is_none(), "invalid pending tag: already in use");
task::Poll::Pending
},
}
}
fn handle_push_error<T>(result: Result<(), RingPushError<T>>) -> Result<()> {
match result {
Ok(()) => Ok(()),
Err(RingPushError::Full(_)) => Err(Error::new(ENOSPC)),
Err(RingPushError::Shutdown(_)) => Err(Error::new(ESHUTDOWN)),
}
}
fn handle_finished_future<C>(cqe: C, cqinfo: &RingInfo<C>, handled_request_count: &mut usize) -> Result<()> {
let result = unsafe {
cqinfo.ring().push_back_spsc(cqinfo.entries.as_ptr(), cqe)
};
handle_push_error(result)?;
*handled_request_count += 1;
Ok(())
}
fn handle_pending_future<S, C>(queue: &Runqueue<S, C>, tag: &mut PendingTag, waker: &mut task::Waker, internal_ring_handle_num: usize) -> Result<()> {
// Now that the tag has been reserved for this waker, we'll need to
// make a new tag and a new waker.
*tag = queue.next_tag()?;
*waker = crate::io_uring::handle::waker::default_waker(internal_ring_handle_num, *tag)?;
Ok(())
}
unsafe fn handle_submission_entries<S, C>(
handle: &Arc<Handle>,
sqinfo: &RingInfo<S>,
cqinfo: &RingInfo<C>,
queue: &Runqueue<S, C>,
internal_ring_handle_num: usize,
handled_request_count: &mut usize,
ctx: crate::syscall::scheme::Ctx,
) -> Result<()>
where
S: GenericSubmissionEntry + fmt::Debug + Send + Sync + 'static,
C: GenericCompletionEntry + Send + Sync + 'static,
{
// New tags only have to be created when a future returns pending, since only
// then is the future getting involved in the pending map. The downside to this
// though, is that futures can obviously not use the same tags if they are run
// in parallel, so that would imply the existence of one tag per thread.
let mut tag = queue.next_tag()?;
let mut waker = crate::io_uring::handle::waker::default_waker(internal_ring_handle_num, tag)?;
for entry in sqinfo.read_try_iter() {
let mut task_slot = match queue.allocate_new_task_slot() {
Some(s) => s,
None => {
// If we're out of available slots, then that means the ring is
// getting new entries faster than they can be processed. This is
// prevented by limiting the number of pending requests, so that
// the pusher eventually sees a full ring, forming a simple sort of
// congestion control.
break;
}
};
let priority = entry.priority();
let future = handle_submission(Arc::clone(handle), entry, ctx);
let task = Task {
future,
priority,
};