mod.rs 51.8 KB
Newer Older
1
use core::convert::{TryFrom, TryInto};
4lDO2's avatar
4lDO2 committed
2
use core::future::Future;
3
use core::intrinsics::unlikely;
4lDO2's avatar
4lDO2 committed
4
use core::pin::Pin;
4lDO2's avatar
4lDO2 committed
5
use core::ptr::NonNull;
4lDO2's avatar
4lDO2 committed
6 7
use core::sync::atomic::Ordering;
use core::{fmt, iter, mem, ptr, task};
4lDO2's avatar
4lDO2 committed
8

9
use alloc::sync::{Arc, Weak};
4lDO2's avatar
4lDO2 committed
10

4lDO2's avatar
4lDO2 committed
11
use either::Either;
4lDO2's avatar
4lDO2 committed
12
use spin::RwLock;
4lDO2's avatar
4lDO2 committed
13

4lDO2's avatar
4lDO2 committed
14
use crate::syscall::io_uring::v1::{
4lDO2's avatar
4lDO2 committed
15 16
    operation::*, CqEntry32, CqEntry64, GenericCompletionEntry,
    GenericSubmissionEntry, GenericSubmissionEntryExt as _, IoUringCqeFlags,
17
    IoUringSqeFlags, KernelOpcode, Ring, RingPushError, SqEntry32, SqEntry64, StandardOpcode,
4lDO2's avatar
4lDO2 committed
18
};
4lDO2's avatar
4lDO2 committed
19
use crate::syscall::io_uring::IoUringEnterFlags;
4lDO2's avatar
4lDO2 committed
20

21
use crate::syscall::data::{Event, IoVec, Map2};
4lDO2's avatar
4lDO2 committed
22
use crate::syscall::error::{Error, Result};
4lDO2's avatar
4lDO2 committed
23
use crate::syscall::flag::{EventFlags, MapFlags};
4lDO2's avatar
4lDO2 committed
24 25

use syscall::error::{
4lDO2's avatar
4lDO2 committed
26
    EBADF, EBADFD, EFAULT, EINVAL, ENODEV, ENOMEM, ENOSPC, ENOSYS, EOVERFLOW, ESHUTDOWN, ESRCH,
4lDO2's avatar
4lDO2 committed
27
};
4lDO2's avatar
4lDO2 committed
28

4lDO2's avatar
4lDO2 committed
29 30 31
pub mod handle;
pub mod scheme;

4lDO2's avatar
4lDO2 committed
32 33
use self::handle::{PendingTag, RingHandle, RingHandleConsumerState, Runqueue, SecondaryRingRef, Task, TaskGuard, TaskRef, TaskSlot};
use self::scheme::{Handle, get_handle, get_handle_and_local, IoUringScheme};
4lDO2's avatar
4lDO2 committed
34

35
use crate::context::{self, Context};
4lDO2's avatar
4lDO2 committed
36
use crate::event;
4lDO2's avatar
4lDO2 committed
37 38
use crate::memory::{self, Frame, PhysicalAddress};
use crate::paging::entry::EntryFlags as PtEntryFlags;
4lDO2's avatar
4lDO2 committed
39
use crate::paging::{ActivePageTable, Page, VirtualAddress};
4lDO2's avatar
4lDO2 committed
40
use crate::scheme::{ContextOrKernel, SchemeInfo};
41
use crate::syscall::scheme::async_scheme::{AsyncScheme, AsyncSchemeExt as _};
42 43 44 45
use crate::syscall::scheme::iter as consuming_iter;
use crate::syscall::{
    validate_ioslices, validate_ioslices_mut, validate_slice, validate_slice_mut,
};
4lDO2's avatar
4lDO2 committed
46
use crate::FileHandle;
4lDO2's avatar
4lDO2 committed
47

4lDO2's avatar
4lDO2 committed
48
pub struct RingInfo<T> {
4lDO2's avatar
4lDO2 committed
49 50 51
    ring_physaddr: PhysicalAddress,
    ring: NonNull<Ring<T>>,
    entries_physaddr: PhysicalAddress,
4lDO2's avatar
4lDO2 committed
52
    entries: NonNull<T>,
4lDO2's avatar
4lDO2 committed
53
}
54 55 56 57 58 59 60 61
impl<T> RingInfo<T> {
    pub fn ring(&self) -> &Ring<T> {
        unsafe { self.ring.as_ref() }
    }
    pub unsafe fn read_try_iter(&self) -> impl Iterator<Item = T> + '_ {
        core::iter::from_fn(move || self.ring().pop_front_spsc(self.entries.as_ptr()).ok())
    }
}
4lDO2's avatar
4lDO2 committed
62 63
impl<T> fmt::Debug for RingInfo<T> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
64 65 66 67 68 69 70 71
        f.debug_struct("RingInfo")
            .field("ring_physaddr", &self.ring_physaddr)
            .field("ring", &self.ring)
            .field("entries_physaddr", &self.entries_physaddr)
            .field("entries", &self.entries)
            .finish()
    }
}
4lDO2's avatar
4lDO2 committed
72 73
unsafe impl<T: Send> Send for RingInfo<T> {}
unsafe impl<T: Sync> Sync for RingInfo<T> {}
4lDO2's avatar
4lDO2 committed
74 75


4lDO2's avatar
4lDO2 committed
76 77 78 79 80 81 82 83 84 85 86
fn init_ring<T>(active_table: &mut ActivePageTable, entry_count: usize) -> Result<RingInfo<T>> {
    assert!(mem::align_of::<Ring<T>>() <= memory::PAGE_SIZE);
    assert!(mem::size_of::<Ring<T>>() <= memory::PAGE_SIZE);
    assert!(mem::align_of::<T>() <= memory::PAGE_SIZE);
    assert!(mem::size_of::<T>() <= memory::PAGE_SIZE);

    // the ring header will always be exactly one page in size.
    let ring_frame = memory::allocate_frames(1).ok_or(Error::new(ENOMEM))?;
    let ring_physaddr = ring_frame.start_address();

    let entries_size_in_bytes = entry_count * mem::size_of::<T>();
4lDO2's avatar
4lDO2 committed
87 88 89 90
    let entries_size_in_frames =
        (entries_size_in_bytes + memory::PAGE_SIZE - 1) / memory::PAGE_SIZE;
    let entries_frame =
        memory::allocate_frames(entries_size_in_frames).ok_or(Error::new(ENOMEM))?;
4lDO2's avatar
4lDO2 committed
91 92 93 94
    let entries_physaddr = entries_frame.start_address();

    let ring = unsafe {
        let ring_virtaddr = VirtualAddress::new(ring_physaddr.get() + crate::KERNEL_OFFSET);
95
        if let Some(already_mapped) = active_table.translate(ring_virtaddr) {
4lDO2's avatar
4lDO2 committed
96 97 98 99 100
            println!(
                "virtual ring address {:p} was already mapped to {:p}",
                ring_virtaddr.get() as *const u8,
                already_mapped.get() as *const u8
            );
101 102 103
            return Err(Error::new(ENOMEM));
        }
        {
4lDO2's avatar
4lDO2 committed
104 105 106 107 108
            let ring_mapping = active_table.map_to(
                Page::containing_address(ring_virtaddr),
                ring_frame,
                PtEntryFlags::PRESENT | PtEntryFlags::WRITABLE | PtEntryFlags::NO_EXECUTE,
            );
109 110
            ring_mapping.flush(active_table);
        }
4lDO2's avatar
4lDO2 committed
111 112

        let ring_ptr_mut = ring_virtaddr.get() as *mut Ring<T>;
113
        ptr::write_bytes::<u8>(ring_ptr_mut as *mut u8, 0, memory::PAGE_SIZE);
4lDO2's avatar
4lDO2 committed
114 115 116

        let ring_mut = &mut *ring_ptr_mut;

117
        ring_mut.size = entry_count;
4lDO2's avatar
4lDO2 committed
118 119
        *ring_mut.sts.get_mut() = 0;

4lDO2's avatar
4lDO2 committed
120 121
        NonNull::new(ring_ptr_mut)
            .expect("somehow a page map with an offset above KERNEL_OFFSET resulted in NULL")
4lDO2's avatar
4lDO2 committed
122 123 124 125 126 127
    };

    let entries = unsafe {
        let entries_virtaddr = VirtualAddress::new(entries_physaddr.get() + crate::KERNEL_OFFSET);

        for frame_offset in 0..entries_size_in_frames {
4lDO2's avatar
4lDO2 committed
128 129 130 131 132 133
            let entries_part_physaddr = Frame::containing_address(PhysicalAddress::new(
                entries_physaddr.get() + frame_offset * memory::PAGE_SIZE,
            ));
            let entries_part_virtaddr = VirtualAddress::new(
                entries_part_physaddr.start_address().get() + crate::KERNEL_OFFSET,
            );
4lDO2's avatar
4lDO2 committed
134

135
            if let Some(already_mapped) = active_table.translate(entries_part_virtaddr) {
4lDO2's avatar
4lDO2 committed
136 137 138 139 140
                println!(
                    "virtual ring entries address {:p} was already mapped to {:p}",
                    entries_part_virtaddr.get() as *const u8,
                    already_mapped.get() as *const u8
                );
141 142 143
                return Err(Error::new(ENOMEM));
            }

4lDO2's avatar
4lDO2 committed
144 145 146 147 148
            let entries_mapping = active_table.map_to(
                Page::containing_address(entries_part_virtaddr),
                entries_part_physaddr,
                PtEntryFlags::PRESENT | PtEntryFlags::WRITABLE | PtEntryFlags::NO_EXECUTE,
            );
4lDO2's avatar
4lDO2 committed
149 150 151 152
            entries_mapping.flush(active_table);
        }

        let entries_mut_ptr = entries_virtaddr.get() as *mut T;
4lDO2's avatar
4lDO2 committed
153 154 155 156 157 158 159 160
        ptr::write_bytes::<u8>(
            entries_mut_ptr as *mut u8,
            0,
            entries_size_in_frames * memory::PAGE_SIZE,
        );

        NonNull::new(entries_mut_ptr)
            .expect("somehow a page map with an offset above KERNEL_OFFSET resulted in NULL")
4lDO2's avatar
4lDO2 committed
161 162 163 164 165 166 167 168 169 170 171
    };

    Ok(RingInfo {
        ring_physaddr,
        entries_physaddr,

        ring,
        entries,
    })
}

172
pub fn attach_to_kernel(ringfd: FileHandle) -> Result<usize> {
173
    let handle = get_handle(ringfd)?;
4lDO2's avatar
4lDO2 committed
174 175
    let ring_handle = handle.as_ring_handle().ok_or(Error::new(EBADF))?;
    ring_handle.transition_into_attached_state(ContextOrKernel::Kernel);
176

177 178
    Ok(0)
}
179

180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195
struct ReturnValue {
    status: u64,
    extra: u64,
    flags: u64, // TODO: should flags be stored elsewhere?
}
impl ReturnValue {
    fn zero() -> Self {
        Self {
            status: 0,
            extra: 0,
            flags: 0,
        }
    }
    fn from_status(status: u64) -> Self {
        Self {
            status,
196
            ..Self::zero()
197 198 199
        }
    }
}
4lDO2's avatar
4lDO2 committed
200
async fn handle_open<S>(submission_entry: &S) -> Result<FileHandle>
201
where
202
    S: GenericSubmissionEntry,
203
{
204 205 206 207
    let path = validate_slice(
        submission_entry.addr64() as usize as *const u8,
        submission_entry.len64() as usize,
    )?;
208
    let flags = submission_entry.offset();
209 210 211 212
    println!(
        "IO_URING OPEN PATH {}",
        alloc::string::String::from_utf8_lossy(path).into_owned()
    );
213

4lDO2's avatar
4lDO2 committed
214
    crate::syscall::fs::open(path, flags.try_into().or(Err(Error::new(EINVAL)))?).await
215 216
}

217 218
async fn handle_close<S>(
    submission_entry: &S,
4lDO2's avatar
4lDO2 committed
219 220
    _invoker_ctx: &crate::syscall::scheme::Ctx,
) -> Result<(), (syscall::Error, u64)>
221 222 223
where
    S: GenericSubmissionEntry,
{
4lDO2's avatar
4lDO2 committed
224 225
    let flags =
        CloseFlags::from_bits(submission_entry.syscall_flags()).ok_or((Error::new(EINVAL), 0))?;
226 227

    let fd_range = if flags.contains(CloseFlags::CLOSE_MANY) {
228 229 230 231 232
        println!(
            "IO_URING CLOSE RANGE FROM {} LEN {}",
            submission_entry.fd64(),
            submission_entry.len64()
        );
233 234
        submission_entry.fd64()..submission_entry.fd64() + submission_entry.len64()
    } else {
235
        println!("IO_URING CLOSE SINGLE {}", submission_entry.fd64());
236
        submission_entry.fd64()..submission_entry.fd64() + 1
237 238
    };

239 240 241
    let mut successful_closes = 0;

    for fd in fd_range {
4lDO2's avatar
4lDO2 committed
242 243 244 245
        match crate::syscall::fs::close(FileHandle::from(fd.try_into().or(Err((Error::new(EBADF), successful_closes)))?)).await {
            Ok(_) => successful_closes += 1,
            Err(error) => return Err((error, successful_closes)),
        }
246 247
    }

4lDO2's avatar
4lDO2 committed
248
    Ok(())
249
}
250
async fn handle_rw<S, Fe, P, Fut>(submission_entry: &S, flags_extract: Fe, proc: P) -> Result<usize>
251 252 253
where
    S: GenericSubmissionEntry,
    Fe: FnOnce(u32) -> Result<(bool, bool)>,
254 255 256 257 258 259 260 261
    P: FnOnce(
        (bool, bool),
        usize,
        usize,
        usize,
        u64,
        Arc<SchemeInfo<dyn AsyncScheme + Send + Sync>>,
    ) -> Result<Fut>,
262
    Fut: Future<Output = Result<usize>>,
263 264 265 266 267 268
{
    let (vectored, change_offset) = flags_extract(submission_entry.syscall_flags())?;

    let addr = usize::try_from(submission_entry.addr64()).or(Err(Error::new(EFAULT)))?;
    let len = usize::try_from(submission_entry.len64()).or(Err(Error::new(EOVERFLOW)))?;
    let fd = usize::try_from(submission_entry.fd64()).or(Err(Error::new(EBADF)))?;
269
    let offset = submission_entry.offset();
270 271 272 273 274 275

    let (scheme_id, local_fd, desc_lock) = {
        let contexts = context::contexts();
        let context_lock = Arc::clone(contexts.current().ok_or(Error::new(ESRCH))?);
        let context = context_lock.read();

276 277 278 279 280 281
        let desc_lock = Arc::clone(
            &context
                .get_file(FileHandle::from(fd))
                .ok_or(Error::new(EBADF))?
                .description,
        );
282 283 284 285 286 287 288 289
        let (scheme, number) = {
            let desc = desc_lock.read();
            (desc.scheme, desc.number)
        };
        (scheme, number, desc_lock)
    };

    let scheme = {
4lDO2's avatar
4lDO2 committed
290
        let schemes = crate::scheme::schemes();
291 292 293 294 295 296 297 298 299
        let scheme = schemes.get(scheme_id).ok_or(Error::new(ENODEV))?;
        Arc::clone(scheme)
    };

    match scheme.context {
        ContextOrKernel::Kernel => (),
        ContextOrKernel::Context(_) => return Err(Error::new(ENOSYS)),
    }

300 301 302 303 304 305 306 307 308
    proc(
        (vectored, change_offset),
        addr,
        len,
        local_fd,
        offset,
        scheme,
    )?
    .await
309
}
310 311 312 313 314 315

fn try_to_init_or_get_scheme_ioring(
    context_weak: &Weak<RwLock<Context>>,
    scheme: &Arc<SchemeInfo<dyn AsyncScheme + Send + Sync>>,
) -> bool {
    if let Some(context_arc) = context_weak.upgrade() {
4lDO2's avatar
4lDO2 committed
316
        context_arc
317 318
            .read()
            .kernel_consumer_ioring(&context_arc, scheme)
4lDO2's avatar
4lDO2 committed
319
            .is_ok()
320
    } else {
4lDO2's avatar
4lDO2 committed
321
        false
322 323 324 325 326 327 328
    }
}

async fn handle_read<S>(
    submission_entry: &S,
    invoker_ctx: &crate::syscall::scheme::Ctx,
) -> Result<u64>
329
where
330
    S: GenericSubmissionEntry,
331 332 333 334 335 336 337 338
{
    fn flags_extract(syscall_flags: u32) -> Result<(bool, bool)> {
        let flags = ReadFlags::from_bits(syscall_flags).ok_or(Error::new(EINVAL))?;
        let vectored = flags.contains(ReadFlags::VECTORED);
        let change_offset = flags.contains(ReadFlags::CHANGE_OFFSET);

        Ok((vectored, change_offset))
    }
339 340 341 342 343 344 345 346 347 348 349 350 351 352 353
    let fut = handle_rw(
        submission_entry,
        flags_extract,
        move |(vectored, change_offset), addr, len, local_fd, offset, scheme| {
            handle_read_rw_with_schemes(
                (vectored, change_offset),
                addr,
                len,
                local_fd,
                offset,
                scheme,
                invoker_ctx,
            )
        },
    );
354

355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371
    fut.await.map(|bytes_read| bytes_read as u64) // TODO: Truncate bytes_read if necessary
}
fn handle_read_rw_with_schemes(
    (vectored, change_offset): (bool, bool),
    addr: usize,
    len: usize,
    local_fd: usize,
    offset: u64,
    scheme: Arc<SchemeInfo<dyn AsyncScheme + Send + Sync>>,
    invoker_ctx: &crate::syscall::scheme::Ctx,
) -> Result<impl Future<Output = Result<usize>> + '_> {
    let mut either = if vectored {
        let _ = validate_ioslices_mut(addr as *const IoVec, len)?;
        unsafe { Either::Right(Some(consuming_iter(addr as *const IoVec, len))) }
    } else {
        Either::Left(validate_slice_mut(addr as *mut u8, len)?)
    };
372

373 374 375 376
    Ok(async move {
        match either {
            Either::Right(ref mut slices) => {
                if change_offset {
377
                    let slices = slices.take().unwrap();
378 379 380 381 382 383 384 385 386 387 388 389
                    println!(
                        "IO_URING READV LOCAL_FD {}, SLICES AT {:p} LEN {}",
                        local_fd, slices, len
                    );
                    unsafe {
                        scheme.async_readv(
                            invoker_ctx,
                            local_fd,
                            slices.as_ioslices_mut().remaining_as_mut(),
                        )
                    }
                    .await
390 391
                } else {
                    let slices = slices.take().unwrap();
392 393 394 395 396 397 398 399 400 401 402 403 404
                    println!(
                        "IO_URING PREADV LOCAL_FD {}, SLICES AT {:p} LEN {} OFFSET {}",
                        local_fd, addr as *const IoVec, len, offset
                    );
                    unsafe {
                        scheme.async_preadv(
                            invoker_ctx,
                            local_fd,
                            slices.as_ioslices_mut().remaining_as_mut(),
                            offset.try_into().or(Err(Error::new(EOVERFLOW)))?,
                        )
                    }
                    .await
405
                }
406 407 408 409 410 411 412 413 414
            }
            Either::Left(ref mut slice) => {
                if change_offset {
                    println!(
                        "IO_URING READ LOCAL_FD {}, SLICE AT {:p} LEN {}",
                        local_fd,
                        slice.as_ptr(),
                        slice.len()
                    );
415 416
                    scheme.async_read(invoker_ctx, local_fd, slice).await
                } else {
417 418 419 420 421 422 423 424 425 426 427 428 429 430 431
                    println!(
                        "IO_URING PREAD LOCAL_FD {}, SLICE AT {:p} LEN {} OFFSET {}",
                        local_fd,
                        slice.as_ptr(),
                        slice.len(),
                        offset
                    );
                    scheme
                        .async_pread(
                            invoker_ctx,
                            local_fd,
                            slice,
                            offset.try_into().or(Err(Error::new(EOVERFLOW)))?,
                        )
                        .await
432 433
                }
            }
434 435
        }
    })
436
}
437 438 439 440
async fn handle_write<S>(
    submission_entry: &S,
    invoker_ctx: &crate::syscall::scheme::Ctx,
) -> Result<u64>
441
where
442
    S: GenericSubmissionEntry,
443 444 445 446 447 448 449 450 451
{
    fn flags_extract(syscall_flags: u32) -> Result<(bool, bool)> {
        let flags = WriteFlags::from_bits(syscall_flags).ok_or(Error::new(EINVAL))?;
        let vectored = flags.contains(WriteFlags::VECTORED);
        let change_offset = flags.contains(WriteFlags::CHANGE_OFFSET);

        Ok((vectored, change_offset))
    }

452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474
    let fut = handle_rw(
        submission_entry,
        flags_extract,
        move |(vectored, change_offset), addr, len, local_fd, offset, scheme| Ok(async move {
            match scheme.context {
                ContextOrKernel::Context(ref context_weak)
                    if try_to_init_or_get_scheme_ioring(context_weak, &scheme) =>
                        handle_write_rw_with_ioring(context_weak).await,

                _ => handle_write_rw_with_schemes(
                    (vectored, change_offset),
                    addr,
                    len,
                    local_fd,
                    offset,
                    scheme,
                    invoker_ctx,
                )?.await,
            }
        }),
    );
    fut.await.map(|bytes_written| bytes_written as u64) // TODO: Truncate bytes_written if necessary
}
475

476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500
fn handle_write_rw_with_schemes(
    (vectored, change_offset): (bool, bool),
    addr: usize,
    len: usize,
    local_fd: usize,
    offset: u64,
    scheme: Arc<SchemeInfo<dyn AsyncScheme + Send + Sync>>,
    invoker_ctx: &crate::syscall::scheme::Ctx,
) -> Result<impl Future<Output = Result<usize>> + '_> {
    let either = if vectored {
        Either::Right(validate_ioslices(addr as *const IoVec, len)?)
    } else {
        Either::Left(validate_slice(addr as *const u8, len)?)
    };

    Ok(async move {
        match either {
            Either::Right(ref slices) => {
                if change_offset {
                    println!(
                        "IO_URING WRITEV LOCAL_FD {}, SLICE AT {:p} LEN {}",
                        local_fd,
                        slices.as_ptr(),
                        slices.len()
                    );
501 502
                    scheme.async_writev(invoker_ctx, local_fd, slices).await
                } else {
503 504 505 506 507 508 509 510 511 512 513 514 515 516 517
                    println!(
                        "IO_URING PWRITEV LOCAL_FD {}, SLICE AT {:p} LEN {} OFFSET {}",
                        local_fd,
                        slices.as_ptr(),
                        slices.len(),
                        offset
                    );
                    scheme
                        .async_pwritev(
                            invoker_ctx,
                            local_fd,
                            slices,
                            offset.try_into().or(Err(Error::new(EOVERFLOW)))?,
                        )
                        .await
518
                }
519 520 521 522 523 524 525 526 527
            }
            Either::Left(ref slice) => {
                if change_offset {
                    println!(
                        "IO_URING WRITE LOCAL_FD {}, SLICE AT {:p} LEN {}",
                        local_fd,
                        slice.as_ptr(),
                        slice.len()
                    );
528 529
                    scheme.async_write(invoker_ctx, local_fd, slice).await
                } else {
530 531 532 533 534 535 536 537 538 539 540 541 542 543 544
                    println!(
                        "IO_URING PWRITE LOCAL_FD {}, SLICE AT {:p} LEN {} OFFSET {}",
                        local_fd,
                        slice.as_ptr(),
                        slice.len(),
                        offset
                    );
                    scheme
                        .async_pwrite(
                            invoker_ctx,
                            local_fd,
                            slice,
                            offset.try_into().or(Err(Error::new(EOVERFLOW)))?,
                        )
                        .await
545 546
                }
            }
547 548
        }
    })
549
}
550 551 552
async fn handle_write_rw_with_ioring(context_weak: &Weak<RwLock<Context>>) -> Result<usize> {
    let context_lock = context_weak.upgrade().ok_or(Error::new(ESRCH))?;
    let context = context_lock.read();
4lDO2's avatar
4lDO2 committed
553
    let handle: Arc<Handle> = Arc::clone(
554 555 556 557 558
        &context.kernel_consumer_ioring.r#try()
            .expect("once state somehow changed back to non-initialized (Context::kernel_consumer_ioring)")
            .as_ref()
            .expect("ring failure should already have been noticed by `try_to_init_or_get_scheme_ioring(..)`"
    ));
4lDO2's avatar
4lDO2 committed
559
    let ring = handle.as_ring_handle().ok_or(Error::new(EBADF))?;
560

561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576
    assert!(ring
        .owner_context
        .r#try()
        .map(|ctx| ctx.is_kernel())
        .unwrap_or(false));
    assert!(ring
        .attached_context
        .r#try()
        .map(|ctx| ctx.is_userspace())
        .unwrap_or(false));

    // TODO: Temporarily increase scheduling priority for context
    Err(Error::new(ENOSYS))
}

fn handle_files_update<S>(
577
    handle_consumer_state: &RingHandleConsumerState,
578 579
    submission_entry: &S,
) -> Result<u64>
4lDO2's avatar
4lDO2 committed
580
where
581
    S: GenericSubmissionEntry,
4lDO2's avatar
4lDO2 committed
582 583 584
{
    // TODO: Interop between event queues managed using the `event:` scheme, and this.

585 586 587 588
    let sqe_flags =
        IoUringSqeFlags::from_bits(submission_entry.sqe_flags()).ok_or(Error::new(EINVAL))?;
    let syscall_flags =
        FilesUpdateFlags::from_bits(submission_entry.syscall_flags()).ok_or(Error::new(EINVAL))?;
4lDO2's avatar
4lDO2 committed
589 590 591 592 593 594 595 596 597

    let event_flags = {
        let mut flags = EventFlags::empty();
        if syscall_flags.contains(FilesUpdateFlags::READ) {
            flags |= EventFlags::EVENT_READ;
        }
        if syscall_flags.contains(FilesUpdateFlags::WRITE) {
            flags |= EventFlags::EVENT_WRITE;
        }
598 599 600
        if syscall_flags.contains(FilesUpdateFlags::IO_URING) {
            flags |= EventFlags::EVENT_IO_URING;
        }
4lDO2's avatar
4lDO2 committed
601 602 603
        flags
    };

604
    let oneshot = !sqe_flags.contains(IoUringSqeFlags::SUBSCRIBE);
605 606 607 608 609 610

    if oneshot {
        // TODO: Support this.
        return Err(Error::new(ENOSYS));
    }

611
    if event_flags.contains(EventFlags::EVENT_IO_URING) {
612
        return register_secondary_iouring(handle_consumer_state, event_flags, submission_entry);
613 614
    }

4lDO2's avatar
4lDO2 committed
615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632
    let multiple_fds = syscall_flags.contains(FilesUpdateFlags::MULTI);

    let mut fd_iter_single;
    let mut fd_iter_multiple;

    let fd_iter: &mut dyn Iterator<Item = u64> = if multiple_fds {
        let addr = usize::try_from(submission_entry.addr64()).or(Err(Error::new(EFAULT)))?;
        let len = usize::try_from(submission_entry.len64()).or(Err(Error::new(EFAULT)))?;

        let slice = validate_slice(addr as *const u64, len)?;
        fd_iter_multiple = slice.iter().copied();

        &mut fd_iter_multiple
    } else {
        fd_iter_single = iter::once(submission_entry.fd64());
        &mut fd_iter_single
    };

633
    let event_queue_id = handle_consumer_state.event_queue_or_init();
4lDO2's avatar
4lDO2 committed
634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652

    let event_queue = {
        let queues = event::queues();
        let queue = queues.get(&event_queue_id).ok_or(Error::new(EBADFD))?;
        Arc::clone(queue)
    };

    //
    // Calls event::trigger internally, but there is no context switch unlike when reading events.
    //
    // EventQueue::write simply loops over the events, so there is no performance penalty in
    // repeatedly calling it.
    //

    for fd in fd_iter {
        let fd = usize::try_from(fd).or(Err(Error::new(EBADF)))?;
        event_queue.write(iter::once(Event {
            id: fd,
            // TODO: Should the FD list instead be a list of events, or another struct?
653 654 655 656
            data: submission_entry
                .user_data64()
                .try_into()
                .or(Err(Error::new(EOVERFLOW)))?,
4lDO2's avatar
4lDO2 committed
657 658 659 660
            flags: event_flags,
        }))?;
    }

661 662
    Ok(0)
}
663
fn register_secondary_iouring<S>(consumer_state: &RingHandleConsumerState, event_flags: EventFlags, submission_entry: &S) -> Result<u64>
664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679
where
    S: GenericSubmissionEntry
{
    assert!(event_flags.contains(EventFlags::EVENT_IO_URING));

    // EVENT_READ in this context, means that an event will trigger when the ring has enough used
    // entries to be poppable. Meanwhile, EVENT_WRITE means that the event will appear when the
    // ring has enough free entries to be pushable.

    let ringfd = usize::try_from(submission_entry.fd64()).or(Err(Error::new(EBADF)))?;

    let ring_handle = {
        let handle = get_handle(FileHandle::from(ringfd))?;
        Arc::downgrade(&handle)
    };

680
    consumer_state.secondary_rings.lock().push(SecondaryRingRef {
681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748
        ring_handle,
        fd_for_consumer: ringfd,
        read: event_flags.contains(EventFlags::EVENT_READ),
        write: event_flags.contains(EventFlags::EVENT_WRITE),
        user_data: submission_entry.user_data64().try_into().or(Err(Error::new(EOVERFLOW)))?,
    });

    Ok(0)
}

async fn handle_dup<S>(submission_entry: &S) -> Result<u64>
where
    S: GenericSubmissionEntry + Send + Sync + 'static,
{
    println!("BEGIN DUP");
    let flags = DupFlags::from_bits(submission_entry.syscall_flags()).ok_or(Error::new(EINVAL))?;
    let fd: usize = submission_entry.fd64().try_into().or(Err(Error::new(EBADF)))?;

    let param = if flags.contains(DupFlags::PARAM) {
        let pointer = usize::try_from(submission_entry.addr64()).or(Err(Error::new(EFAULT)))?;
        let len = usize::try_from(submission_entry.len64()).or(Err(Error::new(EINVAL)))?;

        println!("IO_URING DUP LOCAL_FD {} SLICE AT {:p} LEN {}", fd, pointer as *const u8, len);

        validate_slice(pointer as *const u8, len)?
    } else {
        &[]
    };

    let fh = FileHandle::from(fd);

    let new_fh = crate::syscall::fs::dup(fh, param).await?;
    let new_fd64 = u64::try_from(new_fh.into()).or(Err(Error::new(EOVERFLOW)))?;
    Ok(new_fd64)
}


async fn handle_mmap<S>(submission_entry: &S) -> Result<u64>
where
    S: GenericSubmissionEntry,
{
    let flags = submission_entry.syscall_flags().try_into().or(Err(Error::new(EOVERFLOW)))?;
    let flags = MapFlags::from_bits(flags).ok_or(Error::new(EINVAL))?;

    let fd = usize::try_from(submission_entry.fd64()).or(Err(Error::new(EBADF)))?;
    let address = usize::try_from(submission_entry.addr64()).or(Err(Error::new(EFAULT)))?;
    let size = usize::try_from(submission_entry.len64()).or(Err(Error::new(ENOMEM)))?;
    let offset = usize::try_from(submission_entry.offset()).or(Err(Error::new(EOVERFLOW)))?;

    let map = Map2 {
        address,
        flags,
        offset,
        size,
    };

    // Used to work around Send constraint when using pointers.
    fn inner(fd: usize, map: &Map2) -> impl Future<Output = Result<usize>> {
        crate::syscall::fs::file_op(crate::syscall::number::SYS_FMAP2, FileHandle::from(fd), map as *const Map2 as usize, mem::size_of::<Map2>())
    }

    let pointer = inner(fd, &map).await?;
    let pointer = u64::try_from(pointer).or(Err(Error::new(EOVERFLOW)))?;
    Ok(pointer)
}

async fn handle_munmap<S>(submission_entry: &S) -> Result<u64> {
    println!("TODO: io_uring munmap");
4lDO2's avatar
4lDO2 committed
749 750 751
    Err(Error::new(ENOSYS))
}

752
fn handle_regular_syscall<S>(_submission_entry: &S) -> Result<u64>
4lDO2's avatar
4lDO2 committed
753
where
754
    S: GenericSubmissionEntry + Send + Sync + 'static,
4lDO2's avatar
4lDO2 committed
755 756 757 758
{
    Err(Error::new(ENOSYS))
}

4lDO2's avatar
4lDO2 committed
759 760
async fn handle_standard_opcode_inner<S, C>(
    handle: Arc<Handle>,
761 762
    submission_entry: S,
    opcode: StandardOpcode,
4lDO2's avatar
4lDO2 committed
763
    ctx: crate::syscall::scheme::Ctx,
764
) -> Result<C>
765
where
766 767
    S: GenericSubmissionEntry + Send + Sync + 'static,
    C: GenericCompletionEntry + Send + Sync + 'static,
768
{
4lDO2's avatar
4lDO2 committed
769 770 771
    let handle = handle.as_ring_handle().ok_or(Error::new(EBADF))?;
    let runtime_state = handle.consumer_state.r#try().ok_or(Error::new(EBADFD))?;

772 773 774 775 776 777 778 779 780 781 782
    fn construct<C: GenericCompletionEntry>(
        user_data: u64,
        return_value: ReturnValue,
    ) -> Result<C> {
        let extra = if return_value.extra != 0 {
            Some(return_value.extra)
        } else {
            None
        };
        C::construct(user_data, return_value.status, return_value.flags, extra)
            .ok_or(Error::new(EOVERFLOW))
783
    }
4lDO2's avatar
4lDO2 committed
784 785
    fn construct_simple(result: Result<u64>) -> ReturnValue {
        ReturnValue {
4lDO2's avatar
4lDO2 committed
786
            status: Error::mux64(result),
787 788
            flags: 0,
            extra: 0,
4lDO2's avatar
4lDO2 committed
789
        }
790
    }
791 792

    let return_value = match opcode {
4lDO2's avatar
4lDO2 committed
793
        StandardOpcode::NoOp => ReturnValue::zero(),
794 795 796 797 798
        StandardOpcode::Open => handle_open(&submission_entry).await.map_or_else(
            |error| ReturnValue::from_status(Error::mux64(Err(error))),
            |fd| ReturnValue::from_status(fd.into() as u64),
        ),

4lDO2's avatar
4lDO2 committed
799
        StandardOpcode::Close => handle_close(&submission_entry, &ctx).await.map_or_else(
800 801 802 803 804 805 806
            |(error, successful_closes)| ReturnValue {
                status: Error::mux64(Err(error)),
                flags: 0,
                extra: successful_closes,
            },
            |_| ReturnValue::zero(),
        ),
807

4lDO2's avatar
4lDO2 committed
808 809
        StandardOpcode::Read => construct_simple(handle_read(&submission_entry, &ctx).await),
        StandardOpcode::Write => construct_simple(handle_write(&submission_entry, &ctx).await),
810 811 812
        StandardOpcode::Seek => return Err(Error::new(ENOSYS)),
        StandardOpcode::Fstat => return Err(Error::new(ENOSYS)),
        StandardOpcode::Fstatvfs => return Err(Error::new(ENOSYS)),
813 814 815
        StandardOpcode::Dup => construct_simple(handle_dup(&submission_entry).await),
        StandardOpcode::Mmap => construct_simple(handle_mmap(&submission_entry).await),
        StandardOpcode::Munmap => construct_simple(handle_munmap(&submission_entry).await),
816
        StandardOpcode::Fsync => return Err(Error::new(ENOSYS)),
817
        StandardOpcode::FilesUpdate => {
4lDO2's avatar
4lDO2 committed
818
            construct_simple(handle_files_update(runtime_state, &submission_entry))
819 820 821 822
        }
        StandardOpcode::RegularSyscall => {
            construct_simple(handle_regular_syscall(&submission_entry))
        }
823 824

        _ => return Err(Error::new(ENOSYS)),
825 826
    };

4lDO2's avatar
4lDO2 committed
827
    construct(submission_entry.user_data64(), return_value)
828
}
4lDO2's avatar
4lDO2 committed
829 830 831 832 833 834 835 836 837 838 839 840 841
fn handle_standard_opcode<S, C>(
    handle: Arc<Handle>,
    submission_entry: S,
    opcode: StandardOpcode,
    ctx: crate::syscall::scheme::Ctx,
) -> HandleStandardOpcodeFuture<S, C>
where
    S: GenericSubmissionEntry + Send + Sync + 'static,
    C: GenericCompletionEntry + Send + Sync + 'static,
{
    async move { Err(Error::new(ENOSYS)) }//handle_standard_opcode_inner(handle, submission_entry, opcode, ctx)
}
async fn handle_kernel_opcode_inner<S, C>(
4lDO2's avatar
4lDO2 committed
842
    submission_entry: S,
843
    opcode: KernelOpcode,
4lDO2's avatar
4lDO2 committed
844
    ctx: crate::syscall::scheme::Ctx,
845
) -> Result<C>
846 847 848 849 850 851 852 853 854
where
    S: GenericSubmissionEntry,
    C: GenericCompletionEntry,
{
    match opcode {
        KernelOpcode::Waitpid => return Err(Error::new(ENOSYS)),
        _ => return Err(Error::new(ENOSYS)),
    }
}
4lDO2's avatar
4lDO2 committed
855 856 857 858 859 860 861 862 863 864 865
fn handle_kernel_opcode<S, C>(
    submission_entry: S,
    opcode: KernelOpcode,
    ctx: crate::syscall::scheme::Ctx,
) -> HandleKernelOpcodeFuture<S, C>
where
    S: GenericSubmissionEntry + Send + Sync + 'static,
    C: GenericCompletionEntry + Send + Sync + 'static,
{
    async move { Err(Error::new(ENOSYS)) }//handle_kernel_opcode_inner(submission_entry, opcode, ctx)
}
866

4lDO2's avatar
4lDO2 committed
867
fn handle_submission<S, C>(
868 869 870
    handle: Arc<Handle>,
    submission_entry: S,
    ctx: crate::syscall::scheme::Ctx,
4lDO2's avatar
4lDO2 committed
871
) -> Result<HandleSubmissionFuture<S, C>>
872
where
873
    S: GenericSubmissionEntry + fmt::Debug + Send + Sync + 'static,
4lDO2's avatar
4lDO2 committed
874
    C: GenericCompletionEntry + Send + Sync + 'static,
875 876
{
    if let Some(standard_opcode) = StandardOpcode::from_raw(submission_entry.opcode()) {
4lDO2's avatar
4lDO2 committed
877 878
        todo!();
        //HandleSubmissionFuture::StandardOpcode(todo!()/*handle_standard_opcode(runtime_state, submission_entry, standard_opcode, &ctx)*/)
879
    } else if let Some(kernel_opcode) = KernelOpcode::from_raw(submission_entry.opcode()) {
4lDO2's avatar
4lDO2 committed
880 881
        todo!();
        //HandleSubmissionFuture::KernelOpcode(handle_kernel_opcode(submission_entry, kernel_opcode, &ctx))
882
    } else {
883 884 885 886 887
        println!(
            "UNKNOWN OPCODE {}, full SQE: {:?}",
            submission_entry.opcode(),
            submission_entry
        );
888 889 890
        Err(Error::new(ENOSYS))
    }
}
4lDO2's avatar
4lDO2 committed
891 892 893 894 895 896 897 898 899

pub type HandleStandardOpcodeFuture<S, C> = impl Future<Output = Result<C>> + Send + Sync + 'static;
pub type HandleKernelOpcodeFuture<S, C> = impl Future<Output = Result<C>> + Send + Sync + 'static;

pub enum HandleSubmissionFuture<S, C> {
    //StandardOpcode(HandleStandardOpcodeFuture<S, C>),
    KernelOpcode(HandleKernelOpcodeFuture<S, C>),
    //S(S),
    //C(C),
900 901
}

4lDO2's avatar
4lDO2 committed
902 903 904 905 906 907 908
impl<S, C> Future for HandleSubmissionFuture<S, C> {
    type Output = Result<C>;

    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
        todo!()
    }
}
909

910 911 912 913 914 915
fn send_event<C>(
    info: &RingInfo<C>,
    event: &Event,
    cqe_flags: IoUringCqeFlags,
    seq: u64,
) -> Result<()>
4lDO2's avatar
4lDO2 committed
916
where
917
    C: GenericCompletionEntry,
4lDO2's avatar
4lDO2 committed
918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939
{
    // Unfortunately we only support 52 bits of event flags (SqEntry64), or 24 bits
    // (SqEntry32).
    let flags = if C::FLAGS_64 {
        let event_flags = u64::try_from(event.flags.bits()).or(Err(Error::new(EOVERFLOW)))?;

        if event_flags.checked_shl(8).is_none() {
            return Err(Error::new(EOVERFLOW));
        }

        (event_flags << 8) | u64::from(cqe_flags.bits())
    } else {
        let event_flags = u32::try_from(event.flags.bits()).or(Err(Error::new(EOVERFLOW)))?;

        if event_flags.checked_shl(8).is_none() {
            return Err(Error::new(EOVERFLOW));
        }

        u64::from((event_flags << 8) | u32::from(cqe_flags.bits()))
    };

    let extra = if C::HAS_EXTRA {
940
        let seq = if C::EXTRA_64 { seq } else { seq & 0xFFFF_FFFF };
4lDO2's avatar
4lDO2 committed
941 942 943 944 945
        Some(seq)
    } else {
        None
    };

946
    let user_data = event
4lDO2's avatar
4lDO2 committed
947
        .data
948 949
        .try_into()
        .or(Err(Error::new(EOVERFLOW)))?;
4lDO2's avatar
4lDO2 committed
950 951 952 953 954 955 956 957 958 959 960 961 962 963 964
    let status = u64::try_from(event.id).or(Err(Error::new(EOVERFLOW)))?;

    let cqe = C::construct(user_data, status, flags, extra).ok_or(Error::new(EOVERFLOW))?;

    unsafe {
        match info.ring().push_back_spsc(info.entries.as_ptr(), cqe) {
            Ok(()) => (),
            Err(RingPushError::Shutdown(_)) => return Err(Error::new(ESHUTDOWN)),
            Err(RingPushError::Full(_)) => return Err(Error::new(ENOSPC)),
        }
    }

    Ok(())
}

965 966
fn send_secondary_ring_events(consumer_state: &RingHandleConsumerState, completion_ring_info: Either<&RingInfo<CqEntry32>, &RingInfo<CqEntry64>>, count: &mut usize) -> Result<()> {
    let mut secondary_refs = consumer_state.secondary_rings.lock();
967

968
    let mut i = 0;
969

970
    while i < secondary_refs.len() {
971

972
        let secondary_ref = &secondary_refs[i];
973

974 975 976 977 978 979 980 981
        let handle_arc = match secondary_ref.ring_handle.upgrade() {
            Some(h) => h,
            None => {
                // The ring has been destroyed.
                secondary_refs.remove(i);
                continue;
            }
        };
982

983 984 985 986
        let (can_read, can_write) = match *handle_arc {
            Handle::Ring(ref ring_handle) => {
                // The handle is valid, so we don't need to remove that entry.
                i += 1;
987

988
                let attached_context = ring_handle.attached_context.r#try().and_then(ContextOrKernel::as_context).and_then(Weak::upgrade);
989

990 991
                let (runtime_state, rings) = match (ring_handle.runtime_state(), ring_handle.rings.r#try()) {
                    (Some(s), Some(r)) => (s, r),
992

993 994 995
                    // ring is uninitialized
                    _ => continue,
                };
996

997 998 999 1000 1001 1002
                let ((sq_pushed, sq_popped), (cq_pushed, cq_popped)) = runtime_state.check_for_update(rings);
                let can_read = cq_pushed;
                let can_write = sq_popped;
                let has_written = sq_pushed;
                // TODO
                let _has_read = cq_popped;
1003

1004 1005
                if let Some(attached) = attached_context {
                    let guard = attached.upgradeable_read();
1006

4lDO2's avatar
4lDO2 committed
1007
                    if has_written /*&& guard.ioring_completions_left.load(Ordering::Acquire) > 0*/ {
1008 1009 1010
                        println!("IO_URING WAKE UP CONTEXT");
                        guard.upgrade().unblock();
                    }
1011
                }
1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027

                (can_read, can_write)
            }
            Handle::Producer(ref producer_handle) => {
                // Valid handle, don't remove.

                let arc;

                let rings = match producer_handle.consumer_handle.upgrade() {
                    Some(consumer) => {
                        arc = consumer;

                        if let Handle::Ring(ref rh) = *arc {
                            if let Some(rings) = rh.rings.r#try() {
                                i += 1;
                                rings
1028
                            } else {
1029
                                println!("Consumer rings non-existent");
1030 1031 1032
                                secondary_refs.remove(i);
                                continue;
                            }
1033 1034
                        } else {
                            println!("Not a ring handle");
1035 1036 1037
                            secondary_refs.remove(i);
                            continue;
                        }
1038 1039 1040 1041 1042 1043 1044
                    }
                    None => {
                        println!("Consumer handle read");
                        secondary_refs.remove(i);
                        continue;
                    }
                };
1045

1046 1047 1048 1049
                let ((sq_pushed, _sq_popped), (_cq_pushed, cq_popped)) = producer_handle.runtime_state.check_for_update(rings);
                let can_read = sq_pushed;
                let can_write = cq_popped;
                println!("CAN_READ {} CAN_WRITE {}", can_read, can_write);
1050

1051
                (can_read, can_write)
1052
            }
1053 1054 1055
            ref invalid => {
                println!("INVALID HANDLE: {:?}, removing it from the secondary refs.", invalid);
                secondary_refs.remove(i);
1056 1057
                continue;
            }
1058 1059
        };
        let mut event_flags = EventFlags::EVENT_IO_URING;
1060

1061 1062 1063 1064 1065 1066 1067 1068
        // Since all RingHandles are consumers, we automatically know that READ should
        // correspond to cq_pushed, while WRITE should correspond to sq_popped.
        if secondary_ref.read && can_read {
            event_flags |= EventFlags::EVENT_READ;
        }
        if secondary_ref.write && can_write {
            event_flags |= EventFlags::EVENT_WRITE;
        }
1069

1070 1071 1072 1073
        let required_mask = EventFlags::EVENT_READ | EventFlags::EVENT_WRITE;
        if !event_flags.intersects(required_mask) {
            continue;
        }
1074

1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086
        let seq = consumer_state.event_seq.fetch_add(1, Ordering::Acquire);
        let cqe_flags = IoUringCqeFlags::EVENT;

        let event = Event {
            id: secondary_ref.fd_for_consumer,
            data: secondary_ref.user_data.try_into().or(Err(Error::new(EOVERFLOW)))?,
            flags: event_flags,
        };

        match completion_ring_info {
            Either::Left(ref info32) => send_event(info32, &event, cqe_flags, seq)?,
            Either::Right(ref info64) => send_event(info64, &event, cqe_flags, seq)?,
1087
        }
1088
        *count += 1;
1089
    }
1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101
    Ok(())
}

fn send_events(
    handle: &RingHandle,
    completion_ring_info: Either<&RingInfo<CqEntry32>, &RingInfo<CqEntry64>>,
) -> Result<Option<usize>> {
    let consumer_state = handle.consumer_state.r#try().ok_or(Error::new(EBADFD))?;

    let mut count = 0;

    send_secondary_ring_events(consumer_state, completion_ring_info, &mut count)?;
1102

1103
    if let Some(ref event_queue_id) = consumer_state.event_queue.r#try() {
4lDO2's avatar
4lDO2 committed
1104 1105 1106 1107 1108 1109 1110 1111
        let event_queue = {
            let queues = event::queues();
            let queue = queues.get(event_queue_id).ok_or(Error::new(EBADFD))?;
            Arc::clone(&queue)
        };

        let mut event_buf = [Event::default(); 16];

1112
        // TODO: Shall this error be properly handled?
1113
        count += event_queue.read_nonblocking(&mut event_buf)?;
4lDO2's avatar
4lDO2 committed
1114

1115 1116 1117 1118 1119 1120
        if unlikely(count > event_buf.len()) {
            println!("internal error: EventQueue::read_nonblocking yielded a higher count than the number of items");
            // pretend that there wasn't any existing event queue
            return Ok(None);
        }
        let events = &event_buf[..count];
4lDO2's avatar
4lDO2 committed
1121

1122
        for event in events {
1123
            let seq = consumer_state.event_seq.fetch_add(1, Ordering::Acquire);
4lDO2's avatar
4lDO2 committed
1124
            let cqe_flags = IoUringCqeFlags::EVENT;
4lDO2's avatar
4lDO2 committed
1125

1126 1127 1128
            match completion_ring_info {
                Either::Left(ref info32) => send_event(info32, event, cqe_flags, seq)?,
                Either::Right(ref info64) => send_event(info64, event, cqe_flags, seq)?,
4lDO2's avatar
4lDO2 committed
1129 1130
            }
        }
1131 1132
        Ok(Some(count))
    } else {
1133
        Ok(if count == 0 { None } else { Some(count) })
1134 1135
    }
}