Verified Commit 376e1ca0 authored by 4lDO2's avatar 4lDO2 🖖

Asyncify EventScheme, for reading.

parent f79df63b
......@@ -246,9 +246,14 @@ pub struct Context {
pub files: Arc<Mutex<Vec<Option<FileDescriptor>>>>,
/// A counter that is used to check whether there has been a signal requiring that syscalls be
/// restarted, when checking again for a possible signal. This typically happens when a
/// WaitQueue or WaitMap turns out to be empty twice, which might be the case when multiple
/// WaitConditions are waited for in parallel.
pub restarting_signal_epoch: AtomicUsize,
/// WaitQueue or WaitMap turns out to be empty twice, before and after a blocking read, which
/// happens when a context is spuriously awoken by a signal.
///
/// In the earlier single-pending-syscall-per-context model, the WaitCondition would only need
/// to check whether the awake was caused by a raw unblock, or by the wake function of the wait
/// condition. This may however not necessarily be the case when multiple WaitConditions are
/// waited for, in parallel.
pub(crate) restarting_signal_epoch: AtomicUsize,
/// Signal actions
pub actions: Arc<Mutex<Vec<(SigAction, usize)>>>,
/// The pointer to the user-space registers, saved after certain
......
use alloc::sync::Arc;
use core::mem;
use syscall::flag::{PTRACE_FLAG_IGNORE, PTRACE_STOP_SIGNAL, SIG_DFL, SIG_IGN, SIGCHLD, SIGCONT, SIGKILL, SIGSTOP, SIGTSTP, SIGTTIN, SIGTTOU};
use syscall::ptrace_event;
use core::sync::atomic;
use alloc::sync::Arc;
use crate::context::{contexts, switch, Status, WaitpidKey};
use crate::context::{Context, contexts, switch, Status, WaitpidKey};
use crate::start::usermode;
use crate::ptrace;
use crate::syscall::flag::{PTRACE_FLAG_IGNORE, PTRACE_STOP_SIGNAL, SIG_DFL, SIG_IGN, SIGCHLD, SIGCONT, SIGKILL, SIGSTOP, SIGTSTP, SIGTTIN, SIGTTOU};
use crate::syscall::ptrace_event;
/// A newtype wrapper for "signal epochs", that allow syscall handlers to simply get the current
/// signal epoch when it needs to block, regardless of whether it is going to block directly
/// (WaitCondition::wait), or indirectly (asynchronously).
///
/// When a context is awoken, it will hopefully only continue to poll the syscalls that were awoken
/// (that is, proper `task::Waker` usage), but otherwise the signal epoch can be retrieved again.
/// If it is larger than the previous epoch, there has been a signal, and unless the syscall can
/// continue to make progress, it will EINTR.
// TODO: Currently there is no good way to handle signal overflows. These are extremely unlikely,
// since there must be 16 quintillion signals before such a condition happens, but it should still
// be handled, especially for 32-bit platforms where that limit can easily be reached. The most
// viable solution may be to force everything to complete before restarting every future/syscall,
// or to force the inner value to be 64-bit, if AtomicU64 exists.
#[derive(Clone, Copy, Debug, Eq, Ord, Hash, PartialEq, PartialOrd)]
pub struct SignalEpoch {
inner: usize,
}
impl Context {
pub fn current_signal_epoch(&self) -> SignalEpoch {
SignalEpoch { inner: self.restarting_signal_epoch.load(atomic::Ordering::Acquire) }
}
}
pub fn is_user_handled(handler: Option<extern "C" fn(usize)>) -> bool {
let handler = handler.map(|ptr| ptr as usize).unwrap_or(0);
handler != SIG_DFL && handler != SIG_IGN
......
use alloc::sync::Arc;
use alloc::collections::BTreeMap;
use core::borrow::Borrow;
use core::sync::atomic::{AtomicUsize, Ordering};
use core::task;
use alloc::sync::Arc;
use alloc::collections::BTreeMap;
use spin::{Once, RwLock, RwLockReadGuard, RwLockWriteGuard};
use crate::context::{self, Context};
use crate::context::signal::SignalEpoch;
use crate::scheme::{self, SchemeId};
use crate::sync::WaitQueue;
use crate::syscall::data::Event;
use crate::syscall::error::{Error, Result, EBADF, EINTR, ESRCH};
use crate::syscall::flag::EventFlags;
......@@ -36,6 +41,12 @@ impl EventQueue {
let blocking = false;
self.queue.receive_into_sync(events, blocking, "EventQueue::read_nonblocking").ok_or(Error::new(EINTR))
}
pub fn poll_read(&self, events: &mut [Event], signal_epoch: SignalEpoch, cx: &mut task::Context<'_>) -> task::Poll<Option<usize>> {
const FAIL_ON_SIGNAL: bool = true;
self.queue
.poll_receive_into(events, FAIL_ON_SIGNAL, signal_epoch, cx)
}
pub fn write<I, T>(&self, events: I) -> Result<usize>
where
......
use core::ptr::NonNull;
use core::{mem, slice, task};
use alloc::sync::Arc;
use core::{mem, slice};
use crate::event::{EventQueue, EventQueueId, next_queue_id, queues, queues_mut};
use crate::syscall::data::Event;
use crate::syscall::error::*;
use crate::syscall::scheme::{async_scheme::AsyncScheme, Scheme};
use crate::context;
use crate::syscall::data::{Event, IoSlice, IoSliceMut, IoVec, Packet};
use crate::syscall::error::{Error, Result};
use crate::syscall::error::{EBADF, EINTR, EOVERFLOW, ESRCH};
use crate::syscall::number::{SYS_READ, SYS_READV};
use crate::syscall::scheme::{ConsumingSliceIter, Scheme};
use crate::syscall::scheme::async_scheme::AsyncScheme;
pub struct EventScheme;
......@@ -17,6 +24,9 @@ impl Scheme for EventScheme {
}
fn read(&self, id: usize, buf: &mut [u8]) -> Result<usize> {
self.readv(id, ConsumingSliceIter::single(&mut IoSliceMut::new(buf)))
}
fn readv(&self, id: usize, bufs: ConsumingSliceIter<IoSliceMut>) -> Result<usize> {
let id = EventQueueId::from(id);
let queue = {
......@@ -25,52 +35,168 @@ impl Scheme for EventScheme {
handle.clone()
};
let event_buf = unsafe { slice::from_raw_parts_mut(buf.as_mut_ptr() as *mut Event, buf.len()/mem::size_of::<Event>()) };
Ok(queue.read_sync(event_buf)? * mem::size_of::<Event>())
let mut total_bytes_read = 0usize;
for mut buf in bufs {
// TODO: Use some crate like zerocopy that allows casting slices of different types.
let event_buf = unsafe { slice::from_raw_parts_mut(buf.as_mut_ptr() as *mut Event, buf.len() / mem::size_of::<Event>()) };
let bytes_read = queue.read_sync(event_buf)? * mem::size_of::<Event>();
total_bytes_read = total_bytes_read
.checked_add(bytes_read)
.ok_or(Error::new(EOVERFLOW))?;
}
Ok(total_bytes_read)
}
fn write(&self, id: usize, buf: &[u8]) -> Result<usize> {
self.writev(id, &[IoSlice::new(buf)])
}
fn writev(&self, id: usize, bufs: &[IoSlice]) -> Result<usize> {
let id = EventQueueId::from(id);
let queue = {
let handles = queues();
let handle = handles.get(&id).ok_or(Error::new(EBADF))?;
handle.clone()
Arc::clone(&handle)
};
let event_buf = unsafe { slice::from_raw_parts(buf.as_ptr() as *const Event, buf.len()/mem::size_of::<Event>()) };
Ok(queue.write(event_buf)? * mem::size_of::<Event>())
let mut total_bytes_written = 0usize;
for buf in bufs.iter() {
let event_buf = unsafe { slice::from_raw_parts(buf.as_ptr() as *const Event, buf.len() / mem::size_of::<Event>()) };
// TODO: Limit the length of the queues in some way, by allocating a small amount of
// memory initially, and then growing like a vec, up to a certain limit. Writing would
// then block if the event queue is full, constructing a very simple congestion control
// mechanism.
// TODO: `core::mem::drop(core::mem::replace(&mut event_queues, io_uring))`, right?
let bytes_written = queue.write(event_buf)? * mem::size_of::<Event>();
total_bytes_written = total_bytes_written
.checked_add(bytes_written)
.ok_or(Error::new(EOVERFLOW))?;
}
Ok(total_bytes_written)
}
fn fcntl(&self, id: usize, _cmd: usize, _arg: usize) -> Result<usize> {
let id = EventQueueId::from(id);
let handles = queues();
handles.get(&id).ok_or(Error::new(EBADF)).and(Ok(0))
if ! handles.contains_key(&id) {
return Err(Error::new(EBADF));
}
Ok(0)
}
fn fpath(&self, _id: usize, buf: &mut [u8]) -> Result<usize> {
let mut i = 0;
let scheme_path = b"event:";
while i < buf.len() && i < scheme_path.len() {
buf[i] = scheme_path[i];
i += 1;
}
Ok(i)
const SCHEME_PATH: &'static [u8] = b"event:";
let bytes_to_copy = core::cmp::min(buf.len(), SCHEME_PATH.len());
buf[..bytes_to_copy].copy_from_slice(&SCHEME_PATH[..bytes_to_copy]);
Ok(bytes_to_copy)
}
fn fsync(&self, id: usize) -> Result<usize> {
let id = EventQueueId::from(id);
let handles = queues();
handles.get(&id).ok_or(Error::new(EBADF)).and(Ok(0))
if ! handles.contains_key(&id) {
return Err(Error::new(EBADF));
}
Ok(0)
}
fn close(&self, id: usize) -> Result<usize> {
let id = EventQueueId::from(id);
queues_mut().remove(&id).ok_or(Error::new(EBADF)).and(Ok(0))
let _ = queues_mut().remove(&id).ok_or(Error::new(EBADF))?;
Ok(0)
}
}
impl EventScheme {
fn poll_readv(&self, id: usize, slices: ConsumingSliceIter<IoSliceMut>, cx: &mut task::Context<'_>) -> task::Poll<Result<usize>> {
let id = EventQueueId::from(id);
let queue = {
let handles = queues();
let handle = handles.get(&id).ok_or(Error::new(EBADF))?;
Arc::clone(&handle)
};
let current_signal_epoch = {
let contexts = context::contexts();
let context_lock = contexts.current().ok_or(Error::new(ESRCH))?;
let context = context_lock.read();
context.current_signal_epoch()
};
let mut total_bytes_read = 0usize;
let mut empty = true;
let mut signal = false;
for slice in slices {
if ! slice.is_empty() { empty = false };
let event_buf = unsafe { slice::from_raw_parts_mut(slice.as_ptr() as *mut Event, slice.len() / mem::size_of::<Event>()) };
match queue.poll_read(event_buf, current_signal_epoch, cx) {
task::Poll::Ready(Some(bytes_read)) => total_bytes_read = total_bytes_read.checked_add(bytes_read).ok_or(Error::new(EOVERFLOW))?,
task::Poll::Ready(None) => {
signal = true;
break;
},
task::Poll::Pending => break,
}
}
if total_bytes_read != 0 || empty {
task::Poll::Ready(Ok(total_bytes_read))
} else if signal {
task::Poll::Ready(Err(Error::new(EINTR)))
} else {
task::Poll::Pending
}
}
}
impl AsyncScheme for EventScheme {
// TODO
unsafe fn poll_handle(&self, packet: &mut Packet, cx: &mut task::Context) -> task::Poll<()> {
match packet.a {
SYS_READ => {
let mut single = IoSliceMut::new(slice::from_raw_parts_mut(packet.c as *mut u8, packet.d));
let slices = ConsumingSliceIter::single(&mut single);
let id = packet.b;
let result = match self.poll_readv(id, slices, cx) {
task::Poll::Ready(result) => result,
task::Poll::Pending => return task::Poll::Pending,
};
packet.a = Error::mux(result);
task::Poll::Ready(())
}
SYS_READV => {
debug_assert!(!(packet.c as *const IoVec).is_null());
let slices = ConsumingSliceIter::new(NonNull::new_unchecked(packet.c as *mut IoVec), packet.d).as_ioslices_mut();
let id = packet.b;
let result = match self.poll_readv(id, slices, cx) {
task::Poll::Ready(result) => result,
task::Poll::Pending => return task::Poll::Pending,
};
packet.a = Error::mux(result);
task::Poll::Ready(())
}
// Only read, and readv (as well write, and writev in the future) are blocking; all of
// the other syscalls that `event:` implements can simply do the regular handler.
_ => {
self.handle(packet);
return task::Poll::Ready(());
}
}
}
}
......@@ -106,7 +106,8 @@ impl UserInner {
let contexts = context::contexts();
let context_lock = contexts.current().ok_or(Error::new(ESRCH))?;
let context = context_lock.read();
context.restarting_signal_epoch.load(Ordering::Acquire)
context.current_signal_epoch()
};
match self.done.poll_receive(&syscall_id, fail_on_signal, current_signal_epoch, cx) {
......@@ -212,11 +213,13 @@ impl UserInner {
let count_opt = if block {
if let Some(cx_unwrapped) = cx.take() {
let current_signal_epoch = {
let contexts = context::contexts();
let context_lock = contexts.current().ok_or(Error::new(ESRCH))?;
let context = context_lock.read();
context.restarting_signal_epoch.load(Ordering::Acquire)
context.current_signal_epoch()
};
match self.todo.poll_receive_into(packet_buf, fail_on_signal, current_signal_epoch, cx_unwrapped) {
......
use core::future::Future;
use core::num::NonZeroUsize;
use core::pin::Pin;
use core::sync::atomic::{AtomicUsize, Ordering};
use core::{mem, task};
use alloc::sync::{Arc, Weak};
......@@ -10,11 +6,12 @@ use alloc::vec::Vec;
use spin::{Mutex, MutexGuard, RwLock};
use crate::context::{self, Context};
use crate::context::signal::SignalEpoch;
#[derive(Debug)]
struct WakerWrapper {
waker: task::Waker,
signal_epoch: usize,
signal_epoch: SignalEpoch,
}
#[derive(Debug)]
......@@ -30,10 +27,10 @@ impl WaitCondition {
}
pub fn register_context_interest(&self, context: &Arc<RwLock<Context>>) {
let waker = waker(&context);
let signal_epoch = context.read().restarting_signal_epoch.load(Ordering::Acquire);
let signal_epoch = context.read().current_signal_epoch();
self.add_waker(&waker, signal_epoch)
}
pub fn add_waker(&self, waker: &task::Waker, signal_epoch: usize) {
pub fn add_waker(&self, waker: &task::Waker, signal_epoch: SignalEpoch) {
let mut wakers_guard = self.wakers.lock();
if let Some(i) = wakers_guard.iter().position(|wrapper| wrapper.waker.will_wake(&waker)) {
......@@ -52,7 +49,7 @@ impl WaitCondition {
/// [`add_waker`] or [`register_context_interest`]. The wait condition will internally try to
/// find a registered waker that matches the waker and context specified, and compare its epoch
/// from when it was registered, with the current epoch, from `current_context`.
pub fn signal_has_occurred(&self, waker: &task::Waker, signal_epoch: usize) -> bool {
pub fn signal_has_occurred(&self, waker: &task::Waker, signal_epoch: SignalEpoch) -> bool {
let mut signal = false;
{
......@@ -117,7 +114,7 @@ impl WaitCondition {
unsafe { context::switch(); }
let current_signal_epoch = current_context.read().restarting_signal_epoch.load(Ordering::Acquire);
let current_signal_epoch = current_context.read().current_signal_epoch();
!self.signal_has_occurred(&current_waker, current_signal_epoch)
}
}
......
use core::sync::atomic::Ordering;
use core::{mem, task};
use alloc::collections::BTreeMap;
use alloc::sync::Arc;
use spin::{Mutex, RwLock};
use spin::Mutex;
use crate::context::Context;
use crate::context::signal::SignalEpoch;
use crate::sync::WaitCondition;
#[derive(Debug)]
......@@ -38,7 +36,7 @@ impl<K, V> WaitMap<K, V> where K: Clone + Ord {
}
}
}
pub fn poll_receive(&self, key: &K, fail_on_signal: bool, current_signal_epoch: usize, cx: &mut task::Context<'_>) -> task::Poll<Option<V>> {
pub fn poll_receive(&self, key: &K, fail_on_signal: bool, current_signal_epoch: SignalEpoch, cx: &mut task::Context<'_>) -> task::Poll<Option<V>> {
if let Some(value) = self.inner.lock().remove(key) {
task::Poll::Ready(Some(value))
} else {
......
use core::sync::atomic::Ordering;
use core::task;
use alloc::collections::VecDeque;
......@@ -7,6 +6,7 @@ use alloc::sync::Arc;
use spin::{Mutex, RwLock};
use crate::context::Context;
use crate::context::signal::SignalEpoch;
use crate::sync::WaitCondition;
#[derive(Debug)]
......@@ -48,7 +48,7 @@ impl<T> WaitQueue<T> {
}
}
}
pub fn poll_receive(&self, fail_on_signal: bool, current_signal_epoch: usize, cx: &mut task::Context<'_>) -> task::Poll<Option<T>> {
pub fn poll_receive(&self, fail_on_signal: bool, current_signal_epoch: SignalEpoch, cx: &mut task::Context<'_>) -> task::Poll<Option<T>> {
match self.inner.lock().pop_front() {
Some(t) => task::Poll::Ready(Some(t)),
None => {
......@@ -85,7 +85,7 @@ impl<T> WaitQueue<T> {
self.receive_into_raw(&mut i, buf);
Some(i)
}
pub fn poll_receive_into(&self, buf: &mut [T], fail_on_signal: bool, current_signal_epoch: usize, cx: &mut task::Context<'_>) -> task::Poll<Option<usize>> {
pub fn poll_receive_into(&self, buf: &mut [T], fail_on_signal: bool, current_signal_epoch: SignalEpoch, cx: &mut task::Context<'_>) -> task::Poll<Option<usize>> {
let mut i = 0usize;
if buf.is_empty() { return task::Poll::Ready(Some(0)) }
......
Subproject commit 6b80756dcd3616de964b5e26fa4029a95247215d
Subproject commit ba68b2be264b98461ecaac83abdeb45117f8827c
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment