use core::convert::{TryFrom, TryInto}; use core::future::Future; use core::intrinsics::unlikely; use core::pin::Pin; use core::ptr::NonNull; use core::sync::atomic::Ordering; use core::{fmt, iter, mem, ptr, task}; use alloc::sync::{Arc, Weak}; use either::Either; use spin::RwLock; use crate::syscall::io_uring::v1::{ operation::*, CqEntry32, CqEntry64, GenericCompletionEntry, GenericSubmissionEntry, GenericSubmissionEntryExt as _, IoUringCqeFlags, IoUringSqeFlags, KernelOpcode, Ring, RingPushError, SqEntry32, SqEntry64, StandardOpcode, }; use crate::syscall::io_uring::IoUringEnterFlags; use crate::syscall::data::{Event, IoVec, Map2}; use crate::syscall::error::{Error, Result}; use crate::syscall::flag::{EventFlags, MapFlags}; use syscall::error::{ EBADF, EBADFD, EFAULT, EINVAL, ENODEV, ENOMEM, ENOSPC, ENOSYS, EOVERFLOW, ESHUTDOWN, ESRCH, }; pub mod handle; pub mod scheme; use self::handle::{PendingTag, RingHandle, RingHandleConsumerState, Runqueue, SecondaryRingRef, Task, TaskGuard, TaskRef, TaskSlot}; use self::scheme::{Handle, get_handle, get_handle_and_local, IoUringScheme}; use crate::context::{self, Context}; use crate::event; use crate::memory::{self, Frame, PhysicalAddress}; use crate::paging::entry::EntryFlags as PtEntryFlags; use crate::paging::{ActivePageTable, Page, VirtualAddress}; use crate::scheme::{ContextOrKernel, SchemeInfo}; use crate::syscall::scheme::async_scheme::{AsyncScheme, AsyncSchemeExt as _}; use crate::syscall::scheme::iter as consuming_iter; use crate::syscall::{ validate_ioslices, validate_ioslices_mut, validate_slice, validate_slice_mut, }; use crate::FileHandle; pub struct RingInfo { ring_physaddr: PhysicalAddress, ring: NonNull>, entries_physaddr: PhysicalAddress, entries: NonNull, } impl RingInfo { pub fn ring(&self) -> &Ring { unsafe { self.ring.as_ref() } } pub unsafe fn read_try_iter(&self) -> impl Iterator + '_ { core::iter::from_fn(move || self.ring().pop_front_spsc(self.entries.as_ptr()).ok()) } } impl fmt::Debug for RingInfo { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("RingInfo") .field("ring_physaddr", &self.ring_physaddr) .field("ring", &self.ring) .field("entries_physaddr", &self.entries_physaddr) .field("entries", &self.entries) .finish() } } unsafe impl Send for RingInfo {} unsafe impl Sync for RingInfo {} fn init_ring(active_table: &mut ActivePageTable, entry_count: usize) -> Result> { assert!(mem::align_of::>() <= memory::PAGE_SIZE); assert!(mem::size_of::>() <= memory::PAGE_SIZE); assert!(mem::align_of::() <= memory::PAGE_SIZE); assert!(mem::size_of::() <= memory::PAGE_SIZE); // the ring header will always be exactly one page in size. let ring_frame = memory::allocate_frames(1).ok_or(Error::new(ENOMEM))?; let ring_physaddr = ring_frame.start_address(); let entries_size_in_bytes = entry_count * mem::size_of::(); let entries_size_in_frames = (entries_size_in_bytes + memory::PAGE_SIZE - 1) / memory::PAGE_SIZE; let entries_frame = memory::allocate_frames(entries_size_in_frames).ok_or(Error::new(ENOMEM))?; let entries_physaddr = entries_frame.start_address(); let ring = unsafe { let ring_virtaddr = VirtualAddress::new(ring_physaddr.get() + crate::KERNEL_OFFSET); if let Some(already_mapped) = active_table.translate(ring_virtaddr) { println!( "virtual ring address {:p} was already mapped to {:p}", ring_virtaddr.get() as *const u8, already_mapped.get() as *const u8 ); return Err(Error::new(ENOMEM)); } { let ring_mapping = active_table.map_to( Page::containing_address(ring_virtaddr), ring_frame, PtEntryFlags::PRESENT | PtEntryFlags::WRITABLE | PtEntryFlags::NO_EXECUTE, ); ring_mapping.flush(active_table); } let ring_ptr_mut = ring_virtaddr.get() as *mut Ring; ptr::write_bytes::(ring_ptr_mut as *mut u8, 0, memory::PAGE_SIZE); let ring_mut = &mut *ring_ptr_mut; ring_mut.size = entry_count; *ring_mut.sts.get_mut() = 0; NonNull::new(ring_ptr_mut) .expect("somehow a page map with an offset above KERNEL_OFFSET resulted in NULL") }; let entries = unsafe { let entries_virtaddr = VirtualAddress::new(entries_physaddr.get() + crate::KERNEL_OFFSET); for frame_offset in 0..entries_size_in_frames { let entries_part_physaddr = Frame::containing_address(PhysicalAddress::new( entries_physaddr.get() + frame_offset * memory::PAGE_SIZE, )); let entries_part_virtaddr = VirtualAddress::new( entries_part_physaddr.start_address().get() + crate::KERNEL_OFFSET, ); if let Some(already_mapped) = active_table.translate(entries_part_virtaddr) { println!( "virtual ring entries address {:p} was already mapped to {:p}", entries_part_virtaddr.get() as *const u8, already_mapped.get() as *const u8 ); return Err(Error::new(ENOMEM)); } let entries_mapping = active_table.map_to( Page::containing_address(entries_part_virtaddr), entries_part_physaddr, PtEntryFlags::PRESENT | PtEntryFlags::WRITABLE | PtEntryFlags::NO_EXECUTE, ); entries_mapping.flush(active_table); } let entries_mut_ptr = entries_virtaddr.get() as *mut T; ptr::write_bytes::( entries_mut_ptr as *mut u8, 0, entries_size_in_frames * memory::PAGE_SIZE, ); NonNull::new(entries_mut_ptr) .expect("somehow a page map with an offset above KERNEL_OFFSET resulted in NULL") }; Ok(RingInfo { ring_physaddr, entries_physaddr, ring, entries, }) } pub fn attach_to_kernel(ringfd: FileHandle) -> Result { let handle = get_handle(ringfd)?; let ring_handle = handle.as_ring_handle().ok_or(Error::new(EBADF))?; ring_handle.transition_into_attached_state(ContextOrKernel::Kernel); Ok(0) } struct ReturnValue { status: u64, extra: u64, flags: u64, // TODO: should flags be stored elsewhere? } impl ReturnValue { fn zero() -> Self { Self { status: 0, extra: 0, flags: 0, } } fn from_status(status: u64) -> Self { Self { status, ..Self::zero() } } } async fn handle_open(submission_entry: &S) -> Result where S: GenericSubmissionEntry, { let path = validate_slice( submission_entry.addr64() as usize as *const u8, submission_entry.len64() as usize, )?; let flags = submission_entry.offset(); println!( "IO_URING OPEN PATH {}", alloc::string::String::from_utf8_lossy(path).into_owned() ); crate::syscall::fs::open(path, flags.try_into().or(Err(Error::new(EINVAL)))?).await } async fn handle_close( submission_entry: &S, _invoker_ctx: &crate::syscall::scheme::Ctx, ) -> Result<(), (syscall::Error, u64)> where S: GenericSubmissionEntry, { let flags = CloseFlags::from_bits(submission_entry.syscall_flags()).ok_or((Error::new(EINVAL), 0))?; let fd_range = if flags.contains(CloseFlags::CLOSE_MANY) { println!( "IO_URING CLOSE RANGE FROM {} LEN {}", submission_entry.fd64(), submission_entry.len64() ); submission_entry.fd64()..submission_entry.fd64() + submission_entry.len64() } else { println!("IO_URING CLOSE SINGLE {}", submission_entry.fd64()); submission_entry.fd64()..submission_entry.fd64() + 1 }; let mut successful_closes = 0; for fd in fd_range { match crate::syscall::fs::close(FileHandle::from(fd.try_into().or(Err((Error::new(EBADF), successful_closes)))?)).await { Ok(_) => successful_closes += 1, Err(error) => return Err((error, successful_closes)), } } Ok(()) } async fn handle_rw(submission_entry: &S, flags_extract: Fe, proc: P) -> Result where S: GenericSubmissionEntry, Fe: FnOnce(u32) -> Result<(bool, bool)>, P: FnOnce( (bool, bool), usize, usize, usize, u64, Arc>, ) -> Result, Fut: Future>, { let (vectored, change_offset) = flags_extract(submission_entry.syscall_flags())?; let addr = usize::try_from(submission_entry.addr64()).or(Err(Error::new(EFAULT)))?; let len = usize::try_from(submission_entry.len64()).or(Err(Error::new(EOVERFLOW)))?; let fd = usize::try_from(submission_entry.fd64()).or(Err(Error::new(EBADF)))?; let offset = submission_entry.offset(); let (scheme_id, local_fd, desc_lock) = { let contexts = context::contexts(); let context_lock = Arc::clone(contexts.current().ok_or(Error::new(ESRCH))?); let context = context_lock.read(); let desc_lock = Arc::clone( &context .get_file(FileHandle::from(fd)) .ok_or(Error::new(EBADF))? .description, ); let (scheme, number) = { let desc = desc_lock.read(); (desc.scheme, desc.number) }; (scheme, number, desc_lock) }; let scheme = { let schemes = crate::scheme::schemes(); let scheme = schemes.get(scheme_id).ok_or(Error::new(ENODEV))?; Arc::clone(scheme) }; match scheme.context { ContextOrKernel::Kernel => (), ContextOrKernel::Context(_) => return Err(Error::new(ENOSYS)), } proc( (vectored, change_offset), addr, len, local_fd, offset, scheme, )? .await } fn try_to_init_or_get_scheme_ioring( context_weak: &Weak>, scheme: &Arc>, ) -> bool { if let Some(context_arc) = context_weak.upgrade() { context_arc .read() .kernel_consumer_ioring(&context_arc, scheme) .is_ok() } else { false } } async fn handle_read( submission_entry: &S, invoker_ctx: &crate::syscall::scheme::Ctx, ) -> Result where S: GenericSubmissionEntry, { fn flags_extract(syscall_flags: u32) -> Result<(bool, bool)> { let flags = ReadFlags::from_bits(syscall_flags).ok_or(Error::new(EINVAL))?; let vectored = flags.contains(ReadFlags::VECTORED); let change_offset = flags.contains(ReadFlags::CHANGE_OFFSET); Ok((vectored, change_offset)) } let fut = handle_rw( submission_entry, flags_extract, move |(vectored, change_offset), addr, len, local_fd, offset, scheme| { handle_read_rw_with_schemes( (vectored, change_offset), addr, len, local_fd, offset, scheme, invoker_ctx, ) }, ); fut.await.map(|bytes_read| bytes_read as u64) // TODO: Truncate bytes_read if necessary } fn handle_read_rw_with_schemes( (vectored, change_offset): (bool, bool), addr: usize, len: usize, local_fd: usize, offset: u64, scheme: Arc>, invoker_ctx: &crate::syscall::scheme::Ctx, ) -> Result> + '_> { let mut either = if vectored { let _ = validate_ioslices_mut(addr as *const IoVec, len)?; unsafe { Either::Right(Some(consuming_iter(addr as *const IoVec, len))) } } else { Either::Left(validate_slice_mut(addr as *mut u8, len)?) }; Ok(async move { match either { Either::Right(ref mut slices) => { if change_offset { let slices = slices.take().unwrap(); println!( "IO_URING READV LOCAL_FD {}, SLICES AT {:p} LEN {}", local_fd, slices, len ); unsafe { scheme.async_readv( invoker_ctx, local_fd, slices.as_ioslices_mut().remaining_as_mut(), ) } .await } else { let slices = slices.take().unwrap(); println!( "IO_URING PREADV LOCAL_FD {}, SLICES AT {:p} LEN {} OFFSET {}", local_fd, addr as *const IoVec, len, offset ); unsafe { scheme.async_preadv( invoker_ctx, local_fd, slices.as_ioslices_mut().remaining_as_mut(), offset.try_into().or(Err(Error::new(EOVERFLOW)))?, ) } .await } } Either::Left(ref mut slice) => { if change_offset { println!( "IO_URING READ LOCAL_FD {}, SLICE AT {:p} LEN {}", local_fd, slice.as_ptr(), slice.len() ); scheme.async_read(invoker_ctx, local_fd, slice).await } else { println!( "IO_URING PREAD LOCAL_FD {}, SLICE AT {:p} LEN {} OFFSET {}", local_fd, slice.as_ptr(), slice.len(), offset ); scheme .async_pread( invoker_ctx, local_fd, slice, offset.try_into().or(Err(Error::new(EOVERFLOW)))?, ) .await } } } }) } async fn handle_write( submission_entry: &S, invoker_ctx: &crate::syscall::scheme::Ctx, ) -> Result where S: GenericSubmissionEntry, { fn flags_extract(syscall_flags: u32) -> Result<(bool, bool)> { let flags = WriteFlags::from_bits(syscall_flags).ok_or(Error::new(EINVAL))?; let vectored = flags.contains(WriteFlags::VECTORED); let change_offset = flags.contains(WriteFlags::CHANGE_OFFSET); Ok((vectored, change_offset)) } let fut = handle_rw( submission_entry, flags_extract, move |(vectored, change_offset), addr, len, local_fd, offset, scheme| Ok(async move { match scheme.context { ContextOrKernel::Context(ref context_weak) if try_to_init_or_get_scheme_ioring(context_weak, &scheme) => handle_write_rw_with_ioring(context_weak).await, _ => handle_write_rw_with_schemes( (vectored, change_offset), addr, len, local_fd, offset, scheme, invoker_ctx, )?.await, } }), ); fut.await.map(|bytes_written| bytes_written as u64) // TODO: Truncate bytes_written if necessary } fn handle_write_rw_with_schemes( (vectored, change_offset): (bool, bool), addr: usize, len: usize, local_fd: usize, offset: u64, scheme: Arc>, invoker_ctx: &crate::syscall::scheme::Ctx, ) -> Result> + '_> { let either = if vectored { Either::Right(validate_ioslices(addr as *const IoVec, len)?) } else { Either::Left(validate_slice(addr as *const u8, len)?) }; Ok(async move { match either { Either::Right(ref slices) => { if change_offset { println!( "IO_URING WRITEV LOCAL_FD {}, SLICE AT {:p} LEN {}", local_fd, slices.as_ptr(), slices.len() ); scheme.async_writev(invoker_ctx, local_fd, slices).await } else { println!( "IO_URING PWRITEV LOCAL_FD {}, SLICE AT {:p} LEN {} OFFSET {}", local_fd, slices.as_ptr(), slices.len(), offset ); scheme .async_pwritev( invoker_ctx, local_fd, slices, offset.try_into().or(Err(Error::new(EOVERFLOW)))?, ) .await } } Either::Left(ref slice) => { if change_offset { println!( "IO_URING WRITE LOCAL_FD {}, SLICE AT {:p} LEN {}", local_fd, slice.as_ptr(), slice.len() ); scheme.async_write(invoker_ctx, local_fd, slice).await } else { println!( "IO_URING PWRITE LOCAL_FD {}, SLICE AT {:p} LEN {} OFFSET {}", local_fd, slice.as_ptr(), slice.len(), offset ); scheme .async_pwrite( invoker_ctx, local_fd, slice, offset.try_into().or(Err(Error::new(EOVERFLOW)))?, ) .await } } } }) } async fn handle_write_rw_with_ioring(context_weak: &Weak>) -> Result { let context_lock = context_weak.upgrade().ok_or(Error::new(ESRCH))?; let context = context_lock.read(); let handle: Arc = Arc::clone( &context.kernel_consumer_ioring.r#try() .expect("once state somehow changed back to non-initialized (Context::kernel_consumer_ioring)") .as_ref() .expect("ring failure should already have been noticed by `try_to_init_or_get_scheme_ioring(..)`" )); let ring = handle.as_ring_handle().ok_or(Error::new(EBADF))?; assert!(ring .owner_context .r#try() .map(|ctx| ctx.is_kernel()) .unwrap_or(false)); assert!(ring .attached_context .r#try() .map(|ctx| ctx.is_userspace()) .unwrap_or(false)); // TODO: Temporarily increase scheduling priority for context Err(Error::new(ENOSYS)) } fn handle_files_update( handle_consumer_state: &RingHandleConsumerState, submission_entry: &S, ) -> Result where S: GenericSubmissionEntry, { // TODO: Interop between event queues managed using the `event:` scheme, and this. let sqe_flags = IoUringSqeFlags::from_bits(submission_entry.sqe_flags()).ok_or(Error::new(EINVAL))?; let syscall_flags = FilesUpdateFlags::from_bits(submission_entry.syscall_flags()).ok_or(Error::new(EINVAL))?; let event_flags = { let mut flags = EventFlags::empty(); if syscall_flags.contains(FilesUpdateFlags::READ) { flags |= EventFlags::EVENT_READ; } if syscall_flags.contains(FilesUpdateFlags::WRITE) { flags |= EventFlags::EVENT_WRITE; } if syscall_flags.contains(FilesUpdateFlags::IO_URING) { flags |= EventFlags::EVENT_IO_URING; } flags }; let oneshot = !sqe_flags.contains(IoUringSqeFlags::SUBSCRIBE); if oneshot { // TODO: Support this. return Err(Error::new(ENOSYS)); } if event_flags.contains(EventFlags::EVENT_IO_URING) { return register_secondary_iouring(handle_consumer_state, event_flags, submission_entry); } let multiple_fds = syscall_flags.contains(FilesUpdateFlags::MULTI); let mut fd_iter_single; let mut fd_iter_multiple; let fd_iter: &mut dyn Iterator = if multiple_fds { let addr = usize::try_from(submission_entry.addr64()).or(Err(Error::new(EFAULT)))?; let len = usize::try_from(submission_entry.len64()).or(Err(Error::new(EFAULT)))?; let slice = validate_slice(addr as *const u64, len)?; fd_iter_multiple = slice.iter().copied(); &mut fd_iter_multiple } else { fd_iter_single = iter::once(submission_entry.fd64()); &mut fd_iter_single }; let event_queue_id = handle_consumer_state.event_queue_or_init(); let event_queue = { let queues = event::queues(); let queue = queues.get(&event_queue_id).ok_or(Error::new(EBADFD))?; Arc::clone(queue) }; // // Calls event::trigger internally, but there is no context switch unlike when reading events. // // EventQueue::write simply loops over the events, so there is no performance penalty in // repeatedly calling it. // for fd in fd_iter { let fd = usize::try_from(fd).or(Err(Error::new(EBADF)))?; event_queue.write(iter::once(Event { id: fd, // TODO: Should the FD list instead be a list of events, or another struct? data: submission_entry .user_data64() .try_into() .or(Err(Error::new(EOVERFLOW)))?, flags: event_flags, }))?; } Ok(0) } fn register_secondary_iouring(consumer_state: &RingHandleConsumerState, event_flags: EventFlags, submission_entry: &S) -> Result where S: GenericSubmissionEntry { assert!(event_flags.contains(EventFlags::EVENT_IO_URING)); // EVENT_READ in this context, means that an event will trigger when the ring has enough used // entries to be poppable. Meanwhile, EVENT_WRITE means that the event will appear when the // ring has enough free entries to be pushable. let ringfd = usize::try_from(submission_entry.fd64()).or(Err(Error::new(EBADF)))?; let ring_handle = { let handle = get_handle(FileHandle::from(ringfd))?; Arc::downgrade(&handle) }; consumer_state.secondary_rings.lock().push(SecondaryRingRef { ring_handle, fd_for_consumer: ringfd, read: event_flags.contains(EventFlags::EVENT_READ), write: event_flags.contains(EventFlags::EVENT_WRITE), user_data: submission_entry.user_data64().try_into().or(Err(Error::new(EOVERFLOW)))?, }); Ok(0) } async fn handle_dup(submission_entry: &S) -> Result where S: GenericSubmissionEntry + Send + Sync + 'static, { println!("BEGIN DUP"); let flags = DupFlags::from_bits(submission_entry.syscall_flags()).ok_or(Error::new(EINVAL))?; let fd: usize = submission_entry.fd64().try_into().or(Err(Error::new(EBADF)))?; let param = if flags.contains(DupFlags::PARAM) { let pointer = usize::try_from(submission_entry.addr64()).or(Err(Error::new(EFAULT)))?; let len = usize::try_from(submission_entry.len64()).or(Err(Error::new(EINVAL)))?; println!("IO_URING DUP LOCAL_FD {} SLICE AT {:p} LEN {}", fd, pointer as *const u8, len); validate_slice(pointer as *const u8, len)? } else { &[] }; let fh = FileHandle::from(fd); let new_fh = crate::syscall::fs::dup(fh, param).await?; let new_fd64 = u64::try_from(new_fh.into()).or(Err(Error::new(EOVERFLOW)))?; Ok(new_fd64) } async fn handle_mmap(submission_entry: &S) -> Result where S: GenericSubmissionEntry, { let flags = submission_entry.syscall_flags().try_into().or(Err(Error::new(EOVERFLOW)))?; let flags = MapFlags::from_bits(flags).ok_or(Error::new(EINVAL))?; let fd = usize::try_from(submission_entry.fd64()).or(Err(Error::new(EBADF)))?; let address = usize::try_from(submission_entry.addr64()).or(Err(Error::new(EFAULT)))?; let size = usize::try_from(submission_entry.len64()).or(Err(Error::new(ENOMEM)))?; let offset = usize::try_from(submission_entry.offset()).or(Err(Error::new(EOVERFLOW)))?; let map = Map2 { address, flags, offset, size, }; // Used to work around Send constraint when using pointers. fn inner(fd: usize, map: &Map2) -> impl Future> { crate::syscall::fs::file_op(crate::syscall::number::SYS_FMAP2, FileHandle::from(fd), map as *const Map2 as usize, mem::size_of::()) } let pointer = inner(fd, &map).await?; let pointer = u64::try_from(pointer).or(Err(Error::new(EOVERFLOW)))?; Ok(pointer) } async fn handle_munmap(submission_entry: &S) -> Result { println!("TODO: io_uring munmap"); Err(Error::new(ENOSYS)) } fn handle_regular_syscall(_submission_entry: &S) -> Result where S: GenericSubmissionEntry + Send + Sync + 'static, { Err(Error::new(ENOSYS)) } async fn handle_standard_opcode_inner( handle: Arc, submission_entry: S, opcode: StandardOpcode, ctx: crate::syscall::scheme::Ctx, ) -> Result where S: GenericSubmissionEntry + Send + Sync + 'static, C: GenericCompletionEntry + Send + Sync + 'static, { let handle = handle.as_ring_handle().ok_or(Error::new(EBADF))?; let runtime_state = handle.consumer_state.r#try().ok_or(Error::new(EBADFD))?; fn construct( user_data: u64, return_value: ReturnValue, ) -> Result { let extra = if return_value.extra != 0 { Some(return_value.extra) } else { None }; C::construct(user_data, return_value.status, return_value.flags, extra) .ok_or(Error::new(EOVERFLOW)) } fn construct_simple(result: Result) -> ReturnValue { ReturnValue { status: Error::mux64(result), flags: 0, extra: 0, } } let return_value = match opcode { StandardOpcode::NoOp => ReturnValue::zero(), StandardOpcode::Open => handle_open(&submission_entry).await.map_or_else( |error| ReturnValue::from_status(Error::mux64(Err(error))), |fd| ReturnValue::from_status(fd.into() as u64), ), StandardOpcode::Close => handle_close(&submission_entry, &ctx).await.map_or_else( |(error, successful_closes)| ReturnValue { status: Error::mux64(Err(error)), flags: 0, extra: successful_closes, }, |_| ReturnValue::zero(), ), StandardOpcode::Read => construct_simple(handle_read(&submission_entry, &ctx).await), StandardOpcode::Write => construct_simple(handle_write(&submission_entry, &ctx).await), StandardOpcode::Seek => return Err(Error::new(ENOSYS)), StandardOpcode::Fstat => return Err(Error::new(ENOSYS)), StandardOpcode::Fstatvfs => return Err(Error::new(ENOSYS)), StandardOpcode::Dup => construct_simple(handle_dup(&submission_entry).await), StandardOpcode::Mmap => construct_simple(handle_mmap(&submission_entry).await), StandardOpcode::Munmap => construct_simple(handle_munmap(&submission_entry).await), StandardOpcode::Fsync => return Err(Error::new(ENOSYS)), StandardOpcode::FilesUpdate => { construct_simple(handle_files_update(runtime_state, &submission_entry)) } StandardOpcode::RegularSyscall => { construct_simple(handle_regular_syscall(&submission_entry)) } _ => return Err(Error::new(ENOSYS)), }; construct(submission_entry.user_data64(), return_value) } fn handle_standard_opcode( handle: Arc, submission_entry: S, opcode: StandardOpcode, ctx: crate::syscall::scheme::Ctx, ) -> HandleStandardOpcodeFuture where S: GenericSubmissionEntry + Send + Sync + 'static, C: GenericCompletionEntry + Send + Sync + 'static, { async move { Err(Error::new(ENOSYS)) }//handle_standard_opcode_inner(handle, submission_entry, opcode, ctx) } async fn handle_kernel_opcode_inner( submission_entry: S, opcode: KernelOpcode, ctx: crate::syscall::scheme::Ctx, ) -> Result where S: GenericSubmissionEntry, C: GenericCompletionEntry, { match opcode { KernelOpcode::Waitpid => return Err(Error::new(ENOSYS)), _ => return Err(Error::new(ENOSYS)), } } fn handle_kernel_opcode( submission_entry: S, opcode: KernelOpcode, ctx: crate::syscall::scheme::Ctx, ) -> HandleKernelOpcodeFuture where S: GenericSubmissionEntry + Send + Sync + 'static, C: GenericCompletionEntry + Send + Sync + 'static, { async move { Err(Error::new(ENOSYS)) }//handle_kernel_opcode_inner(submission_entry, opcode, ctx) } fn handle_submission( handle: Arc, submission_entry: S, ctx: crate::syscall::scheme::Ctx, ) -> Result> where S: GenericSubmissionEntry + fmt::Debug + Send + Sync + 'static, C: GenericCompletionEntry + Send + Sync + 'static, { if let Some(standard_opcode) = StandardOpcode::from_raw(submission_entry.opcode()) { todo!(); //HandleSubmissionFuture::StandardOpcode(todo!()/*handle_standard_opcode(runtime_state, submission_entry, standard_opcode, &ctx)*/) } else if let Some(kernel_opcode) = KernelOpcode::from_raw(submission_entry.opcode()) { todo!(); //HandleSubmissionFuture::KernelOpcode(handle_kernel_opcode(submission_entry, kernel_opcode, &ctx)) } else { println!( "UNKNOWN OPCODE {}, full SQE: {:?}", submission_entry.opcode(), submission_entry ); Err(Error::new(ENOSYS)) } } pub type HandleStandardOpcodeFuture = impl Future> + Send + Sync + 'static; pub type HandleKernelOpcodeFuture = impl Future> + Send + Sync + 'static; pub enum HandleSubmissionFuture { //StandardOpcode(HandleStandardOpcodeFuture), KernelOpcode(HandleKernelOpcodeFuture), //S(S), //C(C), } impl Future for HandleSubmissionFuture { type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll { todo!() } } fn send_event( info: &RingInfo, event: &Event, cqe_flags: IoUringCqeFlags, seq: u64, ) -> Result<()> where C: GenericCompletionEntry, { // Unfortunately we only support 52 bits of event flags (SqEntry64), or 24 bits // (SqEntry32). let flags = if C::FLAGS_64 { let event_flags = u64::try_from(event.flags.bits()).or(Err(Error::new(EOVERFLOW)))?; if event_flags.checked_shl(8).is_none() { return Err(Error::new(EOVERFLOW)); } (event_flags << 8) | u64::from(cqe_flags.bits()) } else { let event_flags = u32::try_from(event.flags.bits()).or(Err(Error::new(EOVERFLOW)))?; if event_flags.checked_shl(8).is_none() { return Err(Error::new(EOVERFLOW)); } u64::from((event_flags << 8) | u32::from(cqe_flags.bits())) }; let extra = if C::HAS_EXTRA { let seq = if C::EXTRA_64 { seq } else { seq & 0xFFFF_FFFF }; Some(seq) } else { None }; let user_data = event .data .try_into() .or(Err(Error::new(EOVERFLOW)))?; let status = u64::try_from(event.id).or(Err(Error::new(EOVERFLOW)))?; let cqe = C::construct(user_data, status, flags, extra).ok_or(Error::new(EOVERFLOW))?; unsafe { match info.ring().push_back_spsc(info.entries.as_ptr(), cqe) { Ok(()) => (), Err(RingPushError::Shutdown(_)) => return Err(Error::new(ESHUTDOWN)), Err(RingPushError::Full(_)) => return Err(Error::new(ENOSPC)), } } Ok(()) } fn send_secondary_ring_events(consumer_state: &RingHandleConsumerState, completion_ring_info: Either<&RingInfo, &RingInfo>, count: &mut usize) -> Result<()> { let mut secondary_refs = consumer_state.secondary_rings.lock(); let mut i = 0; while i < secondary_refs.len() { let secondary_ref = &secondary_refs[i]; let handle_arc = match secondary_ref.ring_handle.upgrade() { Some(h) => h, None => { // The ring has been destroyed. secondary_refs.remove(i); continue; } }; let (can_read, can_write) = match *handle_arc { Handle::Ring(ref ring_handle) => { // The handle is valid, so we don't need to remove that entry. i += 1; let attached_context = ring_handle.attached_context.r#try().and_then(ContextOrKernel::as_context).and_then(Weak::upgrade); let (runtime_state, rings) = match (ring_handle.runtime_state(), ring_handle.rings.r#try()) { (Some(s), Some(r)) => (s, r), // ring is uninitialized _ => continue, }; let ((sq_pushed, sq_popped), (cq_pushed, cq_popped)) = runtime_state.check_for_update(rings); let can_read = cq_pushed; let can_write = sq_popped; let has_written = sq_pushed; // TODO let _has_read = cq_popped; if let Some(attached) = attached_context { let guard = attached.upgradeable_read(); if has_written /*&& guard.ioring_completions_left.load(Ordering::Acquire) > 0*/ { println!("IO_URING WAKE UP CONTEXT"); guard.upgrade().unblock(); } } (can_read, can_write) } Handle::Producer(ref producer_handle) => { // Valid handle, don't remove. let arc; let rings = match producer_handle.consumer_handle.upgrade() { Some(consumer) => { arc = consumer; if let Handle::Ring(ref rh) = *arc { if let Some(rings) = rh.rings.r#try() { i += 1; rings } else { println!("Consumer rings non-existent"); secondary_refs.remove(i); continue; } } else { println!("Not a ring handle"); secondary_refs.remove(i); continue; } } None => { println!("Consumer handle read"); secondary_refs.remove(i); continue; } }; let ((sq_pushed, _sq_popped), (_cq_pushed, cq_popped)) = producer_handle.runtime_state.check_for_update(rings); let can_read = sq_pushed; let can_write = cq_popped; println!("CAN_READ {} CAN_WRITE {}", can_read, can_write); (can_read, can_write) } ref invalid => { println!("INVALID HANDLE: {:?}, removing it from the secondary refs.", invalid); secondary_refs.remove(i); continue; } }; let mut event_flags = EventFlags::EVENT_IO_URING; // Since all RingHandles are consumers, we automatically know that READ should // correspond to cq_pushed, while WRITE should correspond to sq_popped. if secondary_ref.read && can_read { event_flags |= EventFlags::EVENT_READ; } if secondary_ref.write && can_write { event_flags |= EventFlags::EVENT_WRITE; } let required_mask = EventFlags::EVENT_READ | EventFlags::EVENT_WRITE; if !event_flags.intersects(required_mask) { continue; } let seq = consumer_state.event_seq.fetch_add(1, Ordering::Acquire); let cqe_flags = IoUringCqeFlags::EVENT; let event = Event { id: secondary_ref.fd_for_consumer, data: secondary_ref.user_data.try_into().or(Err(Error::new(EOVERFLOW)))?, flags: event_flags, }; match completion_ring_info { Either::Left(ref info32) => send_event(info32, &event, cqe_flags, seq)?, Either::Right(ref info64) => send_event(info64, &event, cqe_flags, seq)?, } *count += 1; } Ok(()) } fn send_events( handle: &RingHandle, completion_ring_info: Either<&RingInfo, &RingInfo>, ) -> Result> { let consumer_state = handle.consumer_state.r#try().ok_or(Error::new(EBADFD))?; let mut count = 0; send_secondary_ring_events(consumer_state, completion_ring_info, &mut count)?; if let Some(ref event_queue_id) = consumer_state.event_queue.r#try() { let event_queue = { let queues = event::queues(); let queue = queues.get(event_queue_id).ok_or(Error::new(EBADFD))?; Arc::clone(&queue) }; let mut event_buf = [Event::default(); 16]; // TODO: Shall this error be properly handled? count += event_queue.read_nonblocking(&mut event_buf)?; if unlikely(count > event_buf.len()) { println!("internal error: EventQueue::read_nonblocking yielded a higher count than the number of items"); // pretend that there wasn't any existing event queue return Ok(None); } let events = &event_buf[..count]; for event in events { let seq = consumer_state.event_seq.fetch_add(1, Ordering::Acquire); let cqe_flags = IoUringCqeFlags::EVENT; match completion_ring_info { Either::Left(ref info32) => send_event(info32, event, cqe_flags, seq)?, Either::Right(ref info64) => send_event(info64, event, cqe_flags, seq)?, } } Ok(Some(count)) } else { Ok(if count == 0 { None } else { Some(count) }) } } fn register_event_interest(handle: &RingHandle, context: Arc>) -> Result<()> { let consumer_state = handle.consumer_state.r#try().ok_or(Error::new(EBADFD))?; if let Some(ref event_queue_id) = consumer_state.event_queue.r#try() { let queue = { let queues = event::queues(); Arc::clone(queues.get(event_queue_id).ok_or(Error::new(EBADFD))?) }; queue.register_context_interest(context); } Ok(()) } fn poll_or_insert_into_pending_map(queue: &Runqueue, mut task_guard: TaskGuard, context: &mut task::Context, tag: PendingTag) -> task::Poll> where S: 'static + Unpin, C: 'static + Unpin, { let task_ref = task_guard .task_ref() .expect("expected task guard to be occupied with a pending task"); let task = task_guard .as_mut() .as_task_mut() .expect("expected task guard passed into poll_or_insert_into_pending_map to contain a task"); match task.as_future_mut().poll(context) { task::Poll::Ready(cqe) => { // Drop the future and replace the task slot with an unlinked vacant entry. task_guard.as_mut().free(); // Make sure that there won't be any deadlock when freeing the task. drop(task_guard); // And finally, link the vacant entry, making it reallocatable. queue.free_task_slot(task_ref.index); task::Poll::Ready(cqe) } task::Poll::Pending => { let prev = queue.pending.lock().insert(tag, task_ref); assert!(prev.is_none(), "invalid pending tag: already in use"); task::Poll::Pending }, } } fn handle_push_error(result: Result<(), RingPushError>) -> Result<()> { match result { Ok(()) => Ok(()), Err(RingPushError::Full(_)) => Err(Error::new(ENOSPC)), Err(RingPushError::Shutdown(_)) => Err(Error::new(ESHUTDOWN)), } } fn handle_finished_future(cqe: C, cqinfo: &RingInfo, handled_request_count: &mut usize) -> Result<()> { let result = unsafe { cqinfo.ring().push_back_spsc(cqinfo.entries.as_ptr(), cqe) }; handle_push_error(result)?; *handled_request_count += 1; Ok(()) } fn handle_pending_future(queue: &Runqueue, tag: &mut PendingTag, waker: &mut task::Waker, internal_ring_handle_num: usize) -> Result<()> { // Now that the tag has been reserved for this waker, we'll need to // make a new tag and a new waker. *tag = queue.next_tag()?; *waker = crate::io_uring::handle::waker::default_waker(internal_ring_handle_num, *tag)?; Ok(()) } unsafe fn handle_submission_entries( handle: &Arc, sqinfo: &RingInfo, cqinfo: &RingInfo, queue: &Runqueue, internal_ring_handle_num: usize, handled_request_count: &mut usize, ctx: crate::syscall::scheme::Ctx, ) -> Result<()> where S: GenericSubmissionEntry + fmt::Debug + Send + Sync + 'static + Unpin, C: GenericCompletionEntry + Send + Sync + 'static + Unpin, { // New tags only have to be created when a future returns pending, since only // then is the future getting involved in the pending map. The downside to this // though, is that futures can obviously not use the same tags if they are run // in parallel, so that would imply the existence of one tag per thread. let mut tag = queue.next_tag()?; let mut waker = crate::io_uring::handle::waker::default_waker(internal_ring_handle_num, tag)?; for entry in sqinfo.read_try_iter() { let mut task_slot = match queue.allocate_new_task_slot() { Some(s) => s, None => { // If we're out of available slots, then that means the ring is // getting new entries faster than they can be processed. This is // prevented by limiting the number of pending requests, so that // the pusher eventually sees a full ring, forming a simple sort of // congestion control. break; } }; let priority = entry.priority(); let future = handle_submission(Arc::clone(handle), entry, ctx)?; let task = Task { future, priority, }; { let slot_ref_mut = task_slot .as_mut() .get_ref_if_vacant() .expect("expected newly-allocated slot to contain a vacant entry"); *slot_ref_mut = TaskSlot::Occupied(task); } // And here, the mutable borrow ends, pinning the future (and the priority) // forever. let mut context = task::Context::from_waker(&waker); match poll_or_insert_into_pending_map(queue, task_slot, &mut context, tag) { task::Poll::Ready(cqe) => handle_finished_future(cqe?, cqinfo, handled_request_count)?, task::Poll::Pending => handle_pending_future(queue, &mut tag, &mut waker, internal_ring_handle_num)?, } } let mut ready_guard = queue.ready.lock(); // TODO: Right now we're not doing drain_sorted, since that would reorder // futures based on their priority. Implement pipeline barriers correctly, // maybe using "slots" (either as dedicated arrays/binary heaps, or in some // magical advanced data struct), where each slot can make its own progress, // taking barriers into account. for task_ref in ready_guard.drain() { let mut context = task::Context::from_waker(&waker); let task_slot = queue.lock_task_ref(task_ref); match poll_or_insert_into_pending_map(queue, task_slot, &mut context, tag) { task::Poll::Ready(cqe) => handle_finished_future(cqe?, cqinfo, handled_request_count)?, task::Poll::Pending => handle_pending_future(queue, &mut tag, &mut waker, internal_ring_handle_num)?, } } Ok(()) } pub fn enter(ringfd: FileHandle, mut min_complete: usize, flags: IoUringEnterFlags) -> Result { let allow_spurious_wakeup = min_complete == 0; if min_complete == 0 { min_complete = 1 }; let (handle, internal_ring_handle_num) = get_handle_and_local(ringfd)?; let ring_handle = handle.as_ring_handle().ok_or(Error::new(EBADF))?; // If the kernel were to be the consumer of the io_uring, for some reason, we would not be // calling io_uring_enter() at all. let owner_context_lock = match ring_handle.owner_context.r#try().ok_or(Error::new(EBADF))? { ContextOrKernel::Context(ref ctx) => { let strong_ctx = ctx.upgrade().ok_or(Error::new(ESRCH))?; let contexts = context::contexts(); let current = contexts.current().ok_or(Error::new(ESRCH))?; if unlikely(!Arc::ptr_eq(current, &strong_ctx)) { return Err(Error::new(EBADFD))?; } strong_ctx } ContextOrKernel::Kernel => return Err(Error::new(EBADFD)), }; owner_context_lock .read() .ioring_completions_left .store(min_complete, Ordering::SeqCst); let invoker_ctx = owner_context_lock.read().scheme_invoker_ctx(); let attached_context = ring_handle.attached_context.r#try().ok_or(Error::new(EBADF))?; let completion_ring_info: &Either< (RingInfo, Runqueue), (RingInfo, Runqueue), > = &ring_handle .rings .r#try() .ok_or(Error::new(EBADFD))? .completion_ring; let wakeup_on_sq_avail = flags.contains(IoUringEnterFlags::WAKEUP_ON_SQ_AVAIL); let ((sq_pushed, sq_popped), (cq_pushed, _cq_popped)) = ring_handle.runtime_state().ok_or(Error::new(EBADF))?.check_for_update(ring_handle.rings.r#try().ok_or(Error::new(EBADFD))?); // If the kernel is the producer, we will also process unhandled submissions. Some of these may // complete immediately, while others may require a scheme to respond first (and thus also be // context switched to). let mut total_handled_request_count = 0; loop { let (events_sent, handled_request_count) = match attached_context { &ContextOrKernel::Kernel => { let mut handled_request_count = 0; // TODO: Is this error going to be passed on to the syscall, or somehow internally be // handled by the kernel? let events_sent = send_events( ring_handle, completion_ring_info .as_ref() .map_left(|(i, _)| i) .map_right(|(i, _)| i), )?; let submission_ring_info: &Either, RingInfo> = &ring_handle .rings .r#try() .ok_or(Error::new(EBADFD))? .submission_ring; match (submission_ring_info, completion_ring_info) { (Either::Left(ref sqinfo32), Either::Left((ref cqinfo32, ref queue))) => unsafe { handle_submission_entries( &handle, sqinfo32, cqinfo32, queue, internal_ring_handle_num, &mut handled_request_count, invoker_ctx, )? }, (Either::Left(ref sqinfo32), Either::Right((ref cqinfo64, ref queue))) => /*unsafe { handle_submission_entries( &handle, sqinfo32, cqinfo64, queue, internal_ring_handle_num, &mut handled_request_count, invoker_ctx, )? }*/ return Err(Error::new(EINVAL)), (Either::Right(ref sqinfo64), Either::Left((ref cqinfo32, ref queue))) => /*unsafe { handle_submission_entries( &handle, sqinfo64, cqinfo32, queue, internal_ring_handle_num, &mut handled_request_count, invoker_ctx, )? }*/ return Err(Error::new(EINVAL)), (Either::Right(ref sqinfo64), Either::Right((ref cqinfo64, ref queue))) => unsafe { handle_submission_entries( &handle, sqinfo64, cqinfo64, queue, internal_ring_handle_num, &mut handled_request_count, invoker_ctx, )? }, } (events_sent, handled_request_count) } &ContextOrKernel::Context(ref context) => { // Unblock the target context if the consumer sent a submission to it. if sq_pushed { context .upgrade() .ok_or(Error::new(ESRCH))? .write() .unblock(); } if cq_pushed { // If there are any entries to pop, tell the user process about the number of them. let avail_entry_count = match completion_ring_info { Either::Left((ref info32, _)) => info32.ring().available_entry_count_spsc(), Either::Right((ref info64, _)) => { info64.ring().available_entry_count_spsc() } }; (None, avail_entry_count) } else { // TODO: It is probably better if the scheme gets a temporarily increased // scheduling priority, if another process depends on it. For now we will just // block the current context, and give the remaining CPU time to another process. (None, 0) } } }; owner_context_lock.read().ioring_completions_left.fetch_sub( core::cmp::min(min_complete, handled_request_count), Ordering::SeqCst, ); total_handled_request_count += handled_request_count + events_sent.unwrap_or(0); if total_handled_request_count < min_complete && !(allow_spurious_wakeup && cq_pushed) && !(allow_spurious_wakeup && sq_popped && wakeup_on_sq_avail) { { let contexts = context::contexts(); let context_lock = contexts.current().ok_or(Error::new(ESRCH))?; context_lock.write().block("SYS_IORING_ENTER"); if events_sent.is_some() { register_event_interest(handle.as_ring_handle().ok_or(Error::new(EBADF))?, Arc::clone(context_lock))?; } } unsafe { context::switch(); } continue; } else { break; } } Ok(total_handled_request_count) } impl AsyncScheme for IoUringScheme {}