From 5abf16a51ec40e8b641ee7e09bfd8ba1a046dbc0 Mon Sep 17 00:00:00 2001
From: 4lDO2 <4lDO2@protonmail.com>
Date: Wed, 21 Jun 2023 16:30:01 +0200
Subject: [PATCH] Improved pipe scheme.

---
 src/scheme/mod.rs  |  15 +-
 src/scheme/pipe.rs | 375 +++++++++++++++++++++++----------------------
 src/syscall/fs.rs  |  11 +-
 3 files changed, 205 insertions(+), 196 deletions(-)

diff --git a/src/scheme/mod.rs b/src/scheme/mod.rs
index 909385d2..86c1d80c 100644
--- a/src/scheme/mod.rs
+++ b/src/scheme/mod.rs
@@ -106,7 +106,7 @@ impl<'a> Iterator for SchemeIter<'a> {
 
 /// Scheme list type
 pub struct SchemeList {
-    map: BTreeMap<SchemeId, Arc<dyn KernelScheme + Send + Sync>>,
+    map: BTreeMap<SchemeId, Arc<dyn KernelScheme>>,
     names: BTreeMap<SchemeNamespace, BTreeMap<Box<str>, SchemeId>>,
     next_ns: usize,
     next_id: usize
@@ -136,6 +136,7 @@ impl SchemeList {
         //anonymous mmap's are implemented
         self.insert(ns, "memory", |_| Arc::new(MemoryScheme::new())).unwrap();
         self.insert(ns, "thisproc", |_| Arc::new(ProcScheme::restricted())).unwrap();
+        self.insert(ns, "pipe", |scheme_id| PipeScheme::new(scheme_id)).unwrap();
     }
 
     /// Initialize a new namespace
@@ -148,6 +149,7 @@ impl SchemeList {
         self.insert(ns, "event", |_| Arc::new(EventScheme)).unwrap();
         self.insert(ns, "itimer", |_| Arc::new(ITimerScheme::new())).unwrap();
         self.insert(ns, "memory", |_| Arc::new(MemoryScheme::new())).unwrap();
+        self.insert(ns, "pipe", |scheme_id| PipeScheme::new(scheme_id)).unwrap();
         self.insert(ns, "sys", |_| Arc::new(SysScheme::new())).unwrap();
         self.insert(ns, "time", |scheme_id| Arc::new(TimeScheme::new(scheme_id))).unwrap();
 
@@ -172,9 +174,6 @@ impl SchemeList {
         if let Some(scheme) = self::live::DiskScheme::new().map(Arc::new) {
             self.insert(ns, "disk/live", move |_| scheme.clone()).unwrap();
         }
-
-        // Pipe is special and needs to be in the root namespace
-        self.insert(ns, "pipe", |scheme_id| Arc::new(PipeScheme::new(scheme_id))).unwrap();
     }
 
     pub fn make_ns(&mut self, from: SchemeNamespace, names: &[&str]) -> Result<SchemeNamespace> {
@@ -199,7 +198,7 @@ impl SchemeList {
         Ok(to)
     }
 
-    pub fn iter(&self) -> ::alloc::collections::btree_map::Iter<SchemeId, Arc<dyn KernelScheme + Send + Sync>> {
+    pub fn iter(&self) -> ::alloc::collections::btree_map::Iter<SchemeId, Arc<dyn KernelScheme>> {
         self.map.iter()
     }
 
@@ -210,11 +209,11 @@ impl SchemeList {
     }
 
     /// Get the nth scheme.
-    pub fn get(&self, id: SchemeId) -> Option<&Arc<dyn KernelScheme + Send + Sync>> {
+    pub fn get(&self, id: SchemeId) -> Option<&Arc<dyn KernelScheme>> {
         self.map.get(&id)
     }
 
-    pub fn get_name(&self, ns: SchemeNamespace, name: &str) -> Option<(SchemeId, &Arc<dyn KernelScheme + Send + Sync>)> {
+    pub fn get_name(&self, ns: SchemeNamespace, name: &str) -> Option<(SchemeId, &Arc<dyn KernelScheme>)> {
         if let Some(names) = self.names.get(&ns) {
             if let Some(&id) = names.get(name) {
                 return self.get(id).map(|scheme| (id, scheme));
@@ -225,7 +224,7 @@ impl SchemeList {
 
     /// Create a new scheme.
     pub fn insert<F>(&mut self, ns: SchemeNamespace, name: &str, scheme_fn: F) -> Result<SchemeId>
-        where F: Fn(SchemeId) -> Arc<dyn KernelScheme + Send + Sync>
+        where F: Fn(SchemeId) -> Arc<dyn KernelScheme>
     {
         if let Some(names) = self.names.get(&ns) {
             if names.contains_key(name) {
diff --git a/src/scheme/pipe.rs b/src/scheme/pipe.rs
index e9d6a7d1..3543af32 100644
--- a/src/scheme/pipe.rs
+++ b/src/scheme/pipe.rs
@@ -1,106 +1,208 @@
-use alloc::sync::{Arc, Weak};
+use core::sync::atomic::{AtomicUsize, Ordering, AtomicBool};
+
+use alloc::sync::Arc;
 use alloc::collections::{BTreeMap, VecDeque};
-use core::sync::atomic::{AtomicUsize, Ordering};
+
 use spin::{Mutex, Once, RwLock};
 
 use crate::event;
 use crate::scheme::SchemeId;
 use crate::sync::WaitCondition;
-use crate::syscall::error::{Error, Result, EAGAIN, EBADF, EINTR, EINVAL, EPIPE, ESPIPE};
+use crate::syscall::error::{Error, Result, EAGAIN, EBADF, EINTR, EINVAL, ENOENT, EPIPE, ESPIPE};
 use crate::syscall::flag::{EventFlags, EVENT_READ, EVENT_WRITE, F_GETFL, F_SETFL, O_ACCMODE, O_NONBLOCK, MODE_FIFO};
 use crate::syscall::scheme::Scheme;
 use crate::syscall::data::Stat;
 
-/// Pipes list
+use super::KernelScheme;
+
 // TODO: Preallocate a number of scheme IDs, since there can only be *one* root namespace, and
 // therefore only *one* pipe scheme.
-static PIPE_SCHEME_ID: Once<SchemeId> = Once::new();
-static PIPE_NEXT_ID: AtomicUsize = AtomicUsize::new(0);
-static PIPES: RwLock<(BTreeMap<usize, Arc<PipeRead>>, BTreeMap<usize, Arc<PipeWrite>>)> = RwLock::new((BTreeMap::new(), BTreeMap::new()));
+static THE_PIPE_SCHEME: Once<(SchemeId, Arc<dyn KernelScheme>)> = Once::new();
+static PIPE_NEXT_ID: AtomicUsize = AtomicUsize::new(1);
+
+// TODO: SLOB?
+static PIPES: RwLock<BTreeMap<usize, Arc<Pipe>>> = RwLock::new(BTreeMap::new());
+
+pub fn pipe_scheme_id() -> SchemeId {
+    THE_PIPE_SCHEME.get().expect("pipe scheme must be initialized").0
+}
+
+const MAX_QUEUE_SIZE: usize = 65536;
+
+// In almost all places where Rust (and LLVM) uses pointers, they are limited to nonnegative isize,
+// so this is fine.
+const WRITE_NOT_READ_BIT: usize = 1 << (usize::BITS - 1);
 
-pub fn pipe_scheme_id() -> Option<SchemeId> {
-    PIPE_SCHEME_ID.get().copied()
+fn from_raw_id(id: usize) -> (bool, usize) {
+    (id & WRITE_NOT_READ_BIT != 0, id & !WRITE_NOT_READ_BIT)
 }
 
-pub fn pipe(flags: usize) -> (usize, usize) {
-    let mut pipes = PIPES.write();
-    let scheme_id = *PIPE_SCHEME_ID.get().expect("pipe scheme not initialized when calling pipe()");
-    let read_id = PIPE_NEXT_ID.fetch_add(1, Ordering::Relaxed);
-    let write_id = PIPE_NEXT_ID.fetch_add(1, Ordering::Relaxed);
-    let read = PipeRead::new(scheme_id, write_id, flags);
-    let write = PipeWrite::new(&read, read_id, flags);
-    pipes.0.insert(read_id, Arc::new(read));
-    pipes.1.insert(write_id, Arc::new(write));
-    (read_id, write_id)
+pub fn pipe(flags: usize) -> Result<(usize, usize)> {
+    let id = PIPE_NEXT_ID.fetch_add(1, Ordering::Relaxed);
+
+    PIPES.write().insert(id, Arc::new(Pipe {
+        read_flags: AtomicUsize::new(flags),
+        write_flags: AtomicUsize::new(flags),
+        queue: Mutex::new(VecDeque::new()),
+        read_condition: WaitCondition::new(),
+        write_condition: WaitCondition::new(),
+        writer_is_alive: AtomicBool::new(true),
+        reader_is_alive: AtomicBool::new(true),
+        has_run_dup: AtomicBool::new(false),
+    }));
+
+    Ok((id, id | WRITE_NOT_READ_BIT))
 }
 
 pub struct PipeScheme;
 
 impl PipeScheme {
-    pub fn new(scheme_id: SchemeId) -> PipeScheme {
-        let mut called = false;
-        PIPE_SCHEME_ID.call_once(|| {
-            called = true;
-            scheme_id
-        });
-        assert!(called, "calling PipeScheme::new more than once");
-        PipeScheme
+    pub fn new(scheme_id: SchemeId) -> Arc<dyn KernelScheme> {
+        Arc::clone(&THE_PIPE_SCHEME.call_once(|| {
+            (scheme_id, Arc::new(Self))
+        }).1)
     }
 }
 
 impl Scheme for PipeScheme {
-    fn read(&self, id: usize, buf: &mut [u8]) -> Result<usize> {
-        // Clone to prevent deadlocks
-        let pipe = {
-            let pipes = PIPES.read();
-            pipes.0.get(&id).cloned().ok_or(Error::new(EBADF))?
-        };
+    fn open(&self, path: &str, flags: usize, _uid: u32, _gid: u32) -> Result<usize> {
+        if !path.trim_start_matches('/').is_empty() {
+            return Err(Error::new(ENOENT));
+        }
+
+        let (read_id, _) = pipe(flags)?;
 
-        pipe.read(buf)
+        Ok(read_id)
     }
+    fn read(&self, id: usize, buf: &mut [u8]) -> Result<usize> {
+        let (is_write_not_read, key) = from_raw_id(id);
+
+        if is_write_not_read {
+            return Err(Error::new(EBADF));
+        }
+        let pipe = Arc::clone(PIPES.read().get(&key).ok_or(Error::new(EBADF))?);
 
+        loop {
+            let mut vec = pipe.queue.lock();
+
+            let (s1, s2) = vec.as_slices();
+            let s1_count = core::cmp::min(buf.len(), s1.len());
+
+            let (s1_dst, s2_buf) = buf.split_at_mut(s1_count);
+            s1_dst.copy_from_slice(&s1[..s1_count]);
+
+            let s2_count = core::cmp::min(s2_buf.len(), s2.len());
+            s2_buf[..s2_count].copy_from_slice(&s2[..s2_count]);
+
+            let bytes_read = s1_count + s2_count;
+            let _ = vec.drain(..bytes_read);
+
+            if bytes_read > 0 {
+                event::trigger(pipe_scheme_id(), key | WRITE_NOT_READ_BIT, EVENT_WRITE);
+                pipe.write_condition.notify();
+
+                return Ok(bytes_read);
+            } else if buf.is_empty() {
+                return Ok(0);
+            }
+
+            if !pipe.writer_is_alive.load(Ordering::SeqCst) {
+                return Ok(0);
+            } else if pipe.read_flags.load(Ordering::SeqCst) & O_NONBLOCK == O_NONBLOCK {
+                return Err(Error::new(EAGAIN));
+            } else if !pipe.read_condition.wait(vec, "PipeRead::read") {
+                return Err(Error::new(EINTR));
+            }
+        }
+    }
     fn write(&self, id: usize, buf: &[u8]) -> Result<usize> {
-        // Clone to prevent deadlocks
-        let pipe = {
-            let pipes = PIPES.read();
-            pipes.1.get(&id).cloned().ok_or(Error::new(EBADF))?
-        };
+        let (is_write_not_read, key) = from_raw_id(id);
+
+        if !is_write_not_read {
+            return Err(Error::new(EBADF));
+        }
+        let pipe = Arc::clone(PIPES.read().get(&key).ok_or(Error::new(EBADF))?);
+
+        loop {
+            let mut vec = pipe.queue.lock();
+
+            let bytes_left = MAX_QUEUE_SIZE.saturating_sub(vec.len());
+            let byte_count = core::cmp::min(bytes_left, buf.len());
+
+            vec.extend(buf[..byte_count].iter());
 
-        pipe.write(buf)
+            if byte_count > 0 {
+                event::trigger(pipe_scheme_id(), key, EVENT_READ);
+                pipe.read_condition.notify();
+
+                return Ok(byte_count);
+            } else if buf.is_empty() {
+                return Ok(0);
+            }
+
+            if !pipe.reader_is_alive.load(Ordering::SeqCst) {
+                return Err(Error::new(EPIPE));
+            } else if pipe.write_flags.load(Ordering::SeqCst) & O_NONBLOCK == O_NONBLOCK {
+                return Err(Error::new(EAGAIN));
+            } else if !pipe.write_condition.wait(vec, "PipeWrite::write") {
+                return Err(Error::new(EINTR));
+            }
+        }
     }
 
-    fn fcntl(&self, id: usize, cmd: usize, arg: usize) -> Result<usize> {
-        let pipes = PIPES.read();
+    fn dup(&self, old_id: usize, buf: &[u8]) -> Result<usize> {
+        let (is_writer_not_reader, key) = from_raw_id(old_id);
 
-        if let Some(pipe) = pipes.0.get(&id) {
-            return pipe.fcntl(cmd, arg);
+        if is_writer_not_reader {
+            return Err(Error::new(EBADF));
         }
+        if buf != b"write" {
+            return Err(Error::new(EINVAL));
+        }
+
+        let pipe = Arc::clone(PIPES.read().get(&key).ok_or(Error::new(EBADF))?);
 
-        if let Some(pipe) = pipes.1.get(&id) {
-            return pipe.fcntl(cmd, arg);
+        if pipe.has_run_dup.swap(true, Ordering::SeqCst) {
+            return Err(Error::new(EBADF));
         }
 
-        Err(Error::new(EBADF))
+        Ok(key | WRITE_NOT_READ_BIT)
     }
 
-    fn fevent(&self, id: usize, flags: EventFlags) -> Result<EventFlags> {
-        let pipes = PIPES.read();
-
-        if let Some(pipe) = pipes.0.get(&id) {
-            if flags == EVENT_READ {
-                // TODO: Return correct flags
-                if pipe.vec.lock().is_empty() {
-                    return Ok(EventFlags::empty());
-                } else {
-                    return Ok(EVENT_READ);
-                }
-            }
+    fn fcntl(&self, id: usize, cmd: usize, arg: usize) -> Result<usize> {
+        let (is_writer_not_reader, key) = from_raw_id(id);
+        let pipe = Arc::clone(PIPES.read().get(&key).ok_or(Error::new(EBADF))?);
+
+        let flags = if is_writer_not_reader { &pipe.write_flags } else { &pipe.read_flags };
+
+        match cmd {
+            F_GETFL => Ok(flags.load(Ordering::SeqCst)),
+            F_SETFL => {
+                flags.store(arg & !O_ACCMODE, Ordering::SeqCst);
+                Ok(0)
+            },
+            _ => Err(Error::new(EINVAL))
         }
+    }
+
+    fn fevent(&self, id: usize, flags: EventFlags) -> Result<EventFlags> {
+        let (is_writer_not_reader, key) = from_raw_id(id);
+        let pipe = Arc::clone(PIPES.read().get(&key).ok_or(Error::new(EBADF))?);
 
-        if let Some(_pipe) = pipes.1.get(&id) {
-            if flags == EVENT_WRITE {
+        if is_writer_not_reader && flags == EVENT_WRITE {
+            // TODO: Return correct flags
+            if pipe.queue.lock().len() >= MAX_QUEUE_SIZE {
+                return Ok(EventFlags::empty());
+            } else {
                 return Ok(EVENT_WRITE);
             }
+        } else if flags == EVENT_READ {
+            // TODO: Return correct flags
+            if pipe.queue.lock().is_empty() {
+                return Ok(EventFlags::empty());
+            } else {
+                return Ok(EVENT_READ);
+            }
         }
 
         Err(Error::new(EBADF))
@@ -127,141 +229,48 @@ impl Scheme for PipeScheme {
     }
 
     fn close(&self, id: usize) -> Result<usize> {
-        let mut pipes = PIPES.write();
-
-        drop(pipes.0.remove(&id));
-        drop(pipes.1.remove(&id));
-
-        Ok(0)
-    }
-
-    fn seek(&self, _id: usize, _pos: isize, _whence: usize) -> Result<isize> {
-        Err(Error::new(ESPIPE))
-    }
-}
+        let (is_write_not_read, key) = from_raw_id(id);
 
-/// Read side of a pipe
-pub struct PipeRead {
-    scheme_id: SchemeId,
-    write_id: usize,
-    flags: AtomicUsize,
-    condition: Arc<WaitCondition>,
-    vec: Arc<Mutex<VecDeque<u8>>>
-}
+        let pipe = Arc::clone(PIPES.read().get(&key).ok_or(Error::new(EBADF))?);
+        let scheme_id = pipe_scheme_id();
 
-impl PipeRead {
-    pub fn new(scheme_id: SchemeId, write_id: usize, flags: usize) -> Self {
-        PipeRead {
-            scheme_id,
-            write_id,
-            flags: AtomicUsize::new(flags),
-            condition: Arc::new(WaitCondition::new()),
-            vec: Arc::new(Mutex::new(VecDeque::new())),
-        }
-    }
+        let can_remove = if is_write_not_read {
+            event::trigger(scheme_id, key, EVENT_READ);
 
-    fn fcntl(&self, cmd: usize, arg: usize) -> Result<usize> {
-        match cmd {
-            F_GETFL => Ok(self.flags.load(Ordering::SeqCst)),
-            F_SETFL => {
-                self.flags.store(arg & ! O_ACCMODE, Ordering::SeqCst);
-                Ok(0)
-            },
-            _ => Err(Error::new(EINVAL))
-        }
-    }
+            pipe.read_condition.notify();
+            pipe.writer_is_alive.store(false, Ordering::SeqCst);
 
-    fn read(&self, buf: &mut [u8]) -> Result<usize> {
-        loop {
-            let mut vec = self.vec.lock();
-
-            let mut i = 0;
-            while i < buf.len() {
-                if let Some(b) = vec.pop_front() {
-                    buf[i] = b;
-                    i += 1;
-                } else {
-                    break;
-                }
-            }
-
-            if i > 0 {
-                event::trigger(self.scheme_id, self.write_id, EVENT_WRITE);
-
-                return Ok(i);
-            }
+            !pipe.reader_is_alive.load(Ordering::SeqCst)
+        } else {
+            event::trigger(scheme_id, key | WRITE_NOT_READ_BIT, EVENT_WRITE);
 
-            if Arc::weak_count(&self.vec) == 0 {
-                return Ok(0);
-            } else if self.flags.load(Ordering::SeqCst) & O_NONBLOCK == O_NONBLOCK {
-                return Err(Error::new(EAGAIN));
-            } else if ! self.condition.wait(vec, "PipeRead::read") {
-                return Err(Error::new(EINTR));
-            }
-        }
-    }
-}
+            pipe.write_condition.notify();
+            pipe.reader_is_alive.store(false, Ordering::SeqCst);
 
-/// Read side of a pipe
-pub struct PipeWrite {
-    scheme_id: SchemeId,
-    read_id: usize,
-    flags: AtomicUsize,
-    condition: Arc<WaitCondition>,
-    vec: Option<Weak<Mutex<VecDeque<u8>>>>
-}
+            !pipe.writer_is_alive.load(Ordering::SeqCst)
+        };
 
-impl PipeWrite {
-    pub fn new(read: &PipeRead, read_id: usize, flags: usize) -> Self {
-        PipeWrite {
-            scheme_id: read.scheme_id,
-            read_id,
-            flags: AtomicUsize::new(flags),
-            condition: read.condition.clone(),
-            vec: Some(Arc::downgrade(&read.vec)),
+        if can_remove {
+            let _ = PIPES.write().remove(&key);
         }
-    }
 
-    fn fcntl(&self, cmd: usize, arg: usize) -> Result<usize> {
-        match cmd {
-            F_GETFL => Ok(self.flags.load(Ordering::SeqCst)),
-            F_SETFL => {
-                self.flags.store(arg & ! O_ACCMODE, Ordering::SeqCst);
-                Ok(0)
-            },
-            _ => Err(Error::new(EINVAL))
-        }
+        Ok(0)
     }
 
-    fn write(&self, buf: &[u8]) -> Result<usize> {
-        if let Some(ref vec_weak) = self.vec {
-            if let Some(vec_lock) = vec_weak.upgrade() {
-                {
-                    let mut vec = vec_lock.lock();
-
-                    for &b in buf.iter() {
-                        vec.push_back(b);
-                    }
-                }
-
-                event::trigger(self.scheme_id, self.read_id, EVENT_READ);
-                self.condition.notify();
-
-                Ok(buf.len())
-            } else {
-                Err(Error::new(EPIPE))
-            }
-        } else {
-            panic!("PipeWrite dropped before write");
-        }
+    fn seek(&self, _id: usize, _pos: isize, _whence: usize) -> Result<isize> {
+        Err(Error::new(ESPIPE))
     }
 }
 
-impl Drop for PipeWrite {
-    fn drop(&mut self) {
-        drop(self.vec.take());
-        event::trigger(self.scheme_id, self.read_id, EVENT_READ);
-        self.condition.notify();
-    }
+pub struct Pipe {
+    read_flags: AtomicUsize, // fcntl read flags
+    write_flags: AtomicUsize, // fcntl write flags
+    read_condition: WaitCondition, // signals whether there are available bytes to read
+    write_condition: WaitCondition, // signals whether there is room for additional bytes
+    queue: Mutex<VecDeque<u8>>,
+    reader_is_alive: AtomicBool, // starts set, unset when reader closes
+    writer_is_alive: AtomicBool, // starts set, unset when writer closes
+    has_run_dup: AtomicBool,
 }
+
 impl crate::scheme::KernelScheme for PipeScheme {}
diff --git a/src/syscall/fs.rs b/src/syscall/fs.rs
index 792a4041..ca532a3b 100644
--- a/src/syscall/fs.rs
+++ b/src/syscall/fs.rs
@@ -91,16 +91,17 @@ pub fn open(path: &str, flags: usize) -> Result<FileHandle> {
 
 pub fn pipe2(fds: &mut [usize], flags: usize) -> Result<usize> {
     if fds.len() < 2 {
-        return Err(Error::new(EFAULT));
+        return Err(Error::new(EINVAL));
     }
 
-    let scheme_id = crate::scheme::pipe::pipe_scheme_id().ok_or(Error::new(ENODEV))?;
-    let (read_id, write_id) = crate::scheme::pipe::pipe(flags);
+    let scheme_id = crate::scheme::pipe::pipe_scheme_id();
+    let (read_id, write_id) = crate::scheme::pipe::pipe(flags)?;
 
-    let contexts = context::contexts();
-    let context_lock = contexts.current().ok_or(Error::new(ESRCH))?;
+    let context_lock = context::current()?;
     let context = context_lock.read();
 
+    //log::warn!("Context {} used deprecated pipe2.", context.name);
+
     let read_fd = context.add_file(FileDescriptor {
         description: Arc::new(RwLock::new(FileDescription {
             namespace: context.ens,
-- 
GitLab