From dfcf21a1aa15f402f2b151972bc7e78533718f4b Mon Sep 17 00:00:00 2001 From: 4lDO2 <4lDO2@protonmail.com> Date: Thu, 6 Aug 2020 10:51:48 +0200 Subject: [PATCH] Existential type attempt. --- src/io_uring/handle.rs | 83 +++++++++++++++++++++----------- src/io_uring/mod.rs | 106 ++++++++++++++++++++++++++--------------- 2 files changed, 122 insertions(+), 67 deletions(-) diff --git a/src/io_uring/handle.rs b/src/io_uring/handle.rs index 7e20249..e0756a8 100644 --- a/src/io_uring/handle.rs +++ b/src/io_uring/handle.rs @@ -449,7 +449,7 @@ where S: 'static, C: 'static, { - pub fn task_ref(&self) -> Option { + pub fn task_ref(&mut self) -> Option { Some(TaskRef { index: self.index, priority: self.as_mut().as_task_mut()?.priority(), @@ -490,41 +490,50 @@ impl TaskSlot { } } } - pub fn as_vacant_next(self: Pin<&mut Self>) -> Option<&mut usize> { - unsafe { - match self.get_mut() { - &mut Self::Vacant { ref mut next } => Some(next), - &mut Self::Occupied(_) => None, - } + pub fn as_vacant_next(self: Pin<&mut Self>) -> Option<&mut usize> + where + S: Unpin, + C: Unpin, + { + match self.get_mut() { + &mut Self::Vacant { ref mut next } => Some(next), + &mut Self::Occupied(_) => None, } } - pub fn get_ref_if_vacant(self: Pin<&mut Self>) -> Option<&mut Self> { - unsafe { - match self.get_mut() { - // Don't permit occupied entries from being accessed mutably without Pin... - &mut Self::Occupied(_) => None, + pub fn get_ref_if_vacant(self: Pin<&mut Self>) -> Option<&mut Self> + where + S: Unpin, + C: Unpin, + { + match self.get_mut() { + // Don't permit occupied entries from being accessed mutably without Pin... + &mut Self::Occupied(_) => None, - // ... but do allow vacant entries from being accessed, to occupy them - this @ &mut Self::Vacant { .. } => Some(this), - } + // ... but do allow vacant entries from being accessed, to occupy them + this @ &mut Self::Vacant { .. } => Some(this), } } /// Replaces an occupied task with a new orphan vacant entry, safely dropping the future /// inside. pub fn free(self: Pin<&mut Self>) { unsafe { - let pointer = self - .as_task_mut() - .expect("expected Task::free to free an occupied entry") - .get_unchecked_mut() as *mut Task; + let this = self.get_unchecked_mut(); + let this_ptr = this as *mut Self; - // Since the task and the future inside that task is pinned, we need to drop manually - // with drop_in_place. - ptr::drop_in_place(pointer); + { + let pointer = Pin::new_unchecked(this) + .as_task_mut() + .expect("expected Task::free to free an occupied entry") + .get_unchecked_mut() as *mut Task; + + // Since the task and the future inside that task is pinned, we need to drop manually + // with drop_in_place. + ptr::drop_in_place(pointer); + } // And, now that it's dropped, we cannot in any way use self again by reference, so we // need to ptr::write the new value. - ptr::write(self.get_unchecked_mut() as *mut Self, Self::Vacant { next: usize::MAX }); + ptr::write(this_ptr, Self::Vacant { next: usize::MAX }); } } } @@ -788,7 +797,11 @@ pub const TASK_QUOT_MUL: usize = 1; pub const TASK_QUOT_DIV: usize = 4; impl Runqueue { - pub fn new(pending_task_count: usize) -> Self { + pub fn new(pending_task_count: usize) -> Self + where + S: Unpin, + C: Unpin, + { assert_ne!(pending_task_count, usize::MAX); let usize_size = mem::size_of::(); let lock_word_count = (pending_task_count + usize_size - 1) / usize_size * usize_size; @@ -935,8 +948,12 @@ impl Runqueue { None => usize::MAX, } } - fn allocate_new_task_slot_inner(&self, current: usize) -> bool { - let current_slot = self.lock_task(current); + fn allocate_new_task_slot_inner(&self, current: usize) -> bool + where + S: Unpin, + C: Unpin, + { + let mut current_slot = self.lock_task(current); let current_slot_next_raw = current_slot .as_mut() .as_vacant_next() @@ -964,7 +981,11 @@ impl Runqueue { } } } - pub fn allocate_new_task_slot<'runqueue>(&'runqueue self) -> Option> { + pub fn allocate_new_task_slot<'runqueue>(&'runqueue self) -> Option> + where + S: Unpin, + C: Unpin, + { let initial = Self::index_from_raw(self.first_vacant_slot.load(Ordering::Acquire))?; let index = match self.allocate_new_task_slot_inner(initial) { true => initial, @@ -972,7 +993,11 @@ impl Runqueue { }; Some(self.lock_task(index)) } - pub fn free_task_slot<'runqueue>(&'runqueue self, at: usize) { + pub fn free_task_slot<'runqueue>(&'runqueue self, at: usize) + where + S: Unpin, + C: Unpin, + { debug_assert!( self.lock_task(at) .as_mut() @@ -981,7 +1006,7 @@ impl Runqueue { ); let last_index = self.last_vacant_slot.load(Ordering::Acquire); - let last_task_slot = self.lock_task(last_index); + let mut last_task_slot = self.lock_task(last_index); *last_task_slot .as_mut() diff --git a/src/io_uring/mod.rs b/src/io_uring/mod.rs index 854b373..d17c339 100644 --- a/src/io_uring/mod.rs +++ b/src/io_uring/mod.rs @@ -1,6 +1,7 @@ use core::convert::{TryFrom, TryInto}; use core::future::Future; use core::intrinsics::unlikely; +use core::pin::Pin; use core::ptr::NonNull; use core::sync::atomic::Ordering; use core::{fmt, iter, mem, ptr, task}; @@ -755,16 +756,19 @@ where Err(Error::new(ENOSYS)) } -async fn handle_standard_opcode( - consumer_state: &RingHandleConsumerState, +async fn handle_standard_opcode_inner( + handle: Arc, submission_entry: S, opcode: StandardOpcode, - ctx: &crate::syscall::scheme::Ctx, + ctx: crate::syscall::scheme::Ctx, ) -> Result where S: GenericSubmissionEntry + Send + Sync + 'static, C: GenericCompletionEntry + Send + Sync + 'static, { + let handle = handle.as_ring_handle().ok_or(Error::new(EBADF))?; + let runtime_state = handle.consumer_state.r#try().ok_or(Error::new(EBADFD))?; + fn construct( user_data: u64, return_value: ReturnValue, @@ -792,7 +796,7 @@ where |fd| ReturnValue::from_status(fd.into() as u64), ), - StandardOpcode::Close => handle_close(&submission_entry, ctx).await.map_or_else( + StandardOpcode::Close => handle_close(&submission_entry, &ctx).await.map_or_else( |(error, successful_closes)| ReturnValue { status: Error::mux64(Err(error)), flags: 0, @@ -801,8 +805,8 @@ where |_| ReturnValue::zero(), ), - StandardOpcode::Read => construct_simple(handle_read(&submission_entry, ctx).await), - StandardOpcode::Write => construct_simple(handle_write(&submission_entry, ctx).await), + StandardOpcode::Read => construct_simple(handle_read(&submission_entry, &ctx).await), + StandardOpcode::Write => construct_simple(handle_write(&submission_entry, &ctx).await), StandardOpcode::Seek => return Err(Error::new(ENOSYS)), StandardOpcode::Fstat => return Err(Error::new(ENOSYS)), StandardOpcode::Fstatvfs => return Err(Error::new(ENOSYS)), @@ -811,7 +815,7 @@ where StandardOpcode::Munmap => construct_simple(handle_munmap(&submission_entry).await), StandardOpcode::Fsync => return Err(Error::new(ENOSYS)), StandardOpcode::FilesUpdate => { - construct_simple(handle_files_update(consumer_state, &submission_entry)) + construct_simple(handle_files_update(runtime_state, &submission_entry)) } StandardOpcode::RegularSyscall => { construct_simple(handle_regular_syscall(&submission_entry)) @@ -822,10 +826,22 @@ where construct(submission_entry.user_data64(), return_value) } -async fn handle_kernel_opcode( +fn handle_standard_opcode( + handle: Arc, + submission_entry: S, + opcode: StandardOpcode, + ctx: crate::syscall::scheme::Ctx, +) -> HandleStandardOpcodeFuture +where + S: GenericSubmissionEntry + Send + Sync + 'static, + C: GenericCompletionEntry + Send + Sync + 'static, +{ + async move { Err(Error::new(ENOSYS)) }//handle_standard_opcode_inner(handle, submission_entry, opcode, ctx) +} +async fn handle_kernel_opcode_inner( submission_entry: S, opcode: KernelOpcode, - ctx: &crate::syscall::scheme::Ctx, + ctx: crate::syscall::scheme::Ctx, ) -> Result where S: GenericSubmissionEntry, @@ -836,23 +852,33 @@ where _ => return Err(Error::new(ENOSYS)), } } +fn handle_kernel_opcode( + submission_entry: S, + opcode: KernelOpcode, + ctx: crate::syscall::scheme::Ctx, +) -> HandleKernelOpcodeFuture +where + S: GenericSubmissionEntry + Send + Sync + 'static, + C: GenericCompletionEntry + Send + Sync + 'static, +{ + async move { Err(Error::new(ENOSYS)) }//handle_kernel_opcode_inner(submission_entry, opcode, ctx) +} -async fn handle_submission_raw( +fn handle_submission( handle: Arc, submission_entry: S, ctx: crate::syscall::scheme::Ctx, -) -> Result +) -> Result> where S: GenericSubmissionEntry + fmt::Debug + Send + Sync + 'static, C: GenericCompletionEntry + Send + Sync + 'static, { - let handle = handle.as_ring_handle().ok_or(Error::new(EBADF))?; - let runtime_state = handle.consumer_state.r#try().ok_or(Error::new(EBADFD))?; - if let Some(standard_opcode) = StandardOpcode::from_raw(submission_entry.opcode()) { - handle_standard_opcode(runtime_state, submission_entry, standard_opcode, &ctx).await + todo!(); + //HandleSubmissionFuture::StandardOpcode(todo!()/*handle_standard_opcode(runtime_state, submission_entry, standard_opcode, &ctx)*/) } else if let Some(kernel_opcode) = KernelOpcode::from_raw(submission_entry.opcode()) { - handle_kernel_opcode(submission_entry, kernel_opcode, &ctx).await + todo!(); + //HandleSubmissionFuture::KernelOpcode(handle_kernel_opcode(submission_entry, kernel_opcode, &ctx)) } else { println!( "UNKNOWN OPCODE {}, full SQE: {:?}", @@ -862,19 +888,24 @@ where Err(Error::new(ENOSYS)) } } -fn handle_submission( - handle: Arc, - submission_entry: S, - ctx: crate::syscall::scheme::Ctx, -) -> HandleSubmissionFuture -where - S: GenericSubmissionEntry + fmt::Debug + Send + Sync + 'static, - C: GenericCompletionEntry + Send + Sync + 'static, -{ - handle_submission_raw(handle, submission_entry, ctx) + +pub type HandleStandardOpcodeFuture = impl Future> + Send + Sync + 'static; +pub type HandleKernelOpcodeFuture = impl Future> + Send + Sync + 'static; + +pub enum HandleSubmissionFuture { + //StandardOpcode(HandleStandardOpcodeFuture), + KernelOpcode(HandleKernelOpcodeFuture), + //S(S), + //C(C), } -pub type HandleSubmissionFuture = impl Future> + Send + Sync + 'static; +impl Future for HandleSubmissionFuture { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll { + todo!() + } +} fn send_event( info: &RingInfo, @@ -1115,20 +1146,19 @@ fn register_event_interest(handle: &RingHandle, context: Arc>) - Ok(()) } -fn poll_or_insert_into_pending_map(queue: &Runqueue, task_guard: TaskGuard, context: &mut task::Context, tag: PendingTag) -> task::Poll> +fn poll_or_insert_into_pending_map(queue: &Runqueue, mut task_guard: TaskGuard, context: &mut task::Context, tag: PendingTag) -> task::Poll> where - S: 'static, - C: 'static, + S: 'static + Unpin, + C: 'static + Unpin, { - let task = task_guard - .as_mut() - .as_task_mut() - .expect("expected task guard passed into poll_or_insert_into_pending_map to contain a task"); - let task_ref = task_guard .task_ref() .expect("expected task guard to be occupied with a pending task"); + let task = task_guard + .as_mut() + .as_task_mut() + .expect("expected task guard passed into poll_or_insert_into_pending_map to contain a task"); match task.as_future_mut().poll(context) { task::Poll::Ready(cqe) => { @@ -1185,8 +1215,8 @@ unsafe fn handle_submission_entries( ctx: crate::syscall::scheme::Ctx, ) -> Result<()> where - S: GenericSubmissionEntry + fmt::Debug + Send + Sync + 'static, - C: GenericCompletionEntry + Send + Sync + 'static, + S: GenericSubmissionEntry + fmt::Debug + Send + Sync + 'static + Unpin, + C: GenericCompletionEntry + Send + Sync + 'static + Unpin, { // New tags only have to be created when a future returns pending, since only // then is the future getting involved in the pending map. The downside to this @@ -1209,7 +1239,7 @@ where }; let priority = entry.priority(); - let future = handle_submission(Arc::clone(handle), entry, ctx); + let future = handle_submission(Arc::clone(handle), entry, ctx)?; let task = Task { future, -- GitLab