From 7a96e6c9be882af79dbb4d90a9b5e9969a81fd07 Mon Sep 17 00:00:00 2001
From: Jeremy Soller <jackpot51@gmail.com>
Date: Thu, 6 Oct 2016 18:46:24 -0600
Subject: [PATCH] Add pipe2

---
 lib.rs             |   1 +
 scheme/mod.rs      |   5 ++
 scheme/pipe.rs     | 173 +++++++++++++++++++++++++++++++++++++++++++++
 syscall/fs.rs      |  29 ++++++++
 syscall/mod.rs     |   3 +-
 syscall/process.rs |  65 +++++++++++------
 6 files changed, 253 insertions(+), 23 deletions(-)
 create mode 100644 scheme/pipe.rs

diff --git a/lib.rs b/lib.rs
index f77da770..3119c250 100644
--- a/lib.rs
+++ b/lib.rs
@@ -65,6 +65,7 @@
 //! In this case, it is recommended to add one page, 4096 bytes, to the buffer and retry.
 
 #![feature(alloc)]
+#![feature(arc_counts)]
 #![feature(asm)]
 #![feature(collections)]
 #![feature(const_fn)]
diff --git a/scheme/mod.rs b/scheme/mod.rs
index 7d3e25e8..d64d03f3 100644
--- a/scheme/mod.rs
+++ b/scheme/mod.rs
@@ -20,6 +20,7 @@ use self::event::EventScheme;
 use self::env::EnvScheme;
 use self::initfs::InitFsScheme;
 use self::irq::IrqScheme;
+use self::pipe::{PIPE_SCHEME_ID, PipeScheme};
 use self::root::RootScheme;
 
 /// Debug scheme
@@ -37,6 +38,9 @@ pub mod initfs;
 /// IRQ handling
 pub mod irq;
 
+/// Anonymouse pipe
+pub mod pipe;
+
 /// Root scheme
 pub mod root;
 
@@ -116,6 +120,7 @@ fn init_schemes() -> RwLock<SchemeList> {
     list.insert(Box::new(*b"env"), Arc::new(Box::new(EnvScheme::new()))).expect("failed to insert env scheme");
     list.insert(Box::new(*b"initfs"), Arc::new(Box::new(InitFsScheme::new()))).expect("failed to insert initfs scheme");
     list.insert(Box::new(*b"irq"), Arc::new(Box::new(IrqScheme))).expect("failed to insert irq scheme");
+    PIPE_SCHEME_ID.store(list.insert(Box::new(*b"pipe"), Arc::new(Box::new(PipeScheme))).expect("failed to insert pipe scheme"), Ordering::SeqCst);
     RwLock::new(list)
 }
 
diff --git a/scheme/pipe.rs b/scheme/pipe.rs
new file mode 100644
index 00000000..eee3d4fe
--- /dev/null
+++ b/scheme/pipe.rs
@@ -0,0 +1,173 @@
+use alloc::arc::{Arc, Weak};
+use collections::{BTreeMap, VecDeque};
+use core::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering};
+use spin::{Mutex, Once, RwLock, RwLockReadGuard, RwLockWriteGuard};
+
+use context;
+use syscall::error::{Error, Result, EBADF, EPIPE};
+use syscall::scheme::Scheme;
+
+/// Pipes list
+pub static PIPE_SCHEME_ID: AtomicUsize = ATOMIC_USIZE_INIT;
+static PIPE_NEXT_ID: AtomicUsize = ATOMIC_USIZE_INIT;
+static PIPES: Once<RwLock<(BTreeMap<usize, PipeRead>, BTreeMap<usize, PipeWrite>)>> = Once::new();
+
+/// Initialize pipes, called if needed
+fn init_pipes() -> RwLock<(BTreeMap<usize, PipeRead>, BTreeMap<usize, PipeWrite>)> {
+    RwLock::new((BTreeMap::new(), BTreeMap::new()))
+}
+
+/// Get the global pipes list, const
+fn pipes() -> RwLockReadGuard<'static, (BTreeMap<usize, PipeRead>, BTreeMap<usize, PipeWrite>)> {
+    PIPES.call_once(init_pipes).read()
+}
+
+/// Get the global schemes list, mutable
+fn pipes_mut() -> RwLockWriteGuard<'static, (BTreeMap<usize, PipeRead>, BTreeMap<usize, PipeWrite>)> {
+    PIPES.call_once(init_pipes).write()
+}
+
+pub fn pipe(_flags: usize) -> (usize, usize) {
+    let mut pipes = pipes_mut();
+    let read_id = PIPE_NEXT_ID.fetch_add(1, Ordering::SeqCst);
+    let read = PipeRead::new();
+    let write_id = PIPE_NEXT_ID.fetch_add(1, Ordering::SeqCst);
+    let write = PipeWrite::new(&read);
+    pipes.0.insert(read_id, read);
+    pipes.1.insert(write_id, write);
+    (read_id, write_id)
+}
+
+pub struct PipeScheme;
+
+impl Scheme for PipeScheme {
+    fn dup(&self, id: usize) -> Result<usize> {
+        let mut pipes = pipes_mut();
+
+        let read_option = pipes.0.get(&id).map(|pipe| pipe.clone());
+        if let Some(pipe) = read_option {
+            let pipe_id = PIPE_NEXT_ID.fetch_add(1, Ordering::SeqCst);
+            pipes.0.insert(pipe_id, pipe);
+            return Ok(pipe_id);
+        }
+
+        let write_option = pipes.1.get(&id).map(|pipe| pipe.clone());
+        if let Some(pipe) = write_option {
+            let pipe_id = PIPE_NEXT_ID.fetch_add(1, Ordering::SeqCst);
+            pipes.1.insert(pipe_id, pipe);
+            return Ok(pipe_id);
+        }
+
+        Err(Error::new(EBADF))
+    }
+
+    fn read(&self, id: usize, buf: &mut [u8]) -> Result<usize> {
+        let pipe_option = {
+            let pipes = pipes();
+            pipes.0.get(&id).map(|pipe| pipe.clone())
+        };
+
+        if let Some(pipe) = pipe_option {
+            pipe.read(buf)
+        } else {
+            Err(Error::new(EBADF))
+        }
+    }
+
+    fn write(&self, id: usize, buf: &[u8]) -> Result<usize> {
+        let pipe_option = {
+            let pipes = pipes();
+            pipes.1.get(&id).map(|pipe| pipe.clone())
+        };
+
+        if let Some(pipe) = pipe_option {
+            pipe.write(buf)
+        } else {
+            Err(Error::new(EBADF))
+        }
+    }
+
+    fn fsync(&self, _id: usize) -> Result<usize> {
+        Ok(0)
+    }
+
+    fn close(&self, id: usize) -> Result<usize> {
+        let mut pipes = pipes_mut();
+
+        drop(pipes.0.remove(&id));
+        drop(pipes.1.remove(&id));
+
+        Ok(0)
+    }
+}
+
+/// Read side of a pipe
+#[derive(Clone)]
+pub struct PipeRead {
+    vec: Arc<Mutex<VecDeque<u8>>>
+}
+
+impl PipeRead {
+    pub fn new() -> Self {
+        PipeRead {
+            vec: Arc::new(Mutex::new(VecDeque::new()))
+        }
+    }
+
+    fn read(&self, buf: &mut [u8]) -> Result<usize> {
+        if buf.is_empty() || (Arc::weak_count(&self.vec) == 0 && self.vec.lock().is_empty()) {
+            Ok(0)
+        } else {
+            /*loop {
+                {
+                    if let Some(byte) = self.vec.lock().pop_front() {
+                        buf[0] = byte;
+                        break;
+                    }
+                }
+                unsafe { context::switch(); }
+            }*/
+
+            let mut i = 0;
+
+            while i < buf.len() {
+                match self.vec.lock().pop_front() {
+                    Some(b) => {
+                        buf[i] = b;
+                        i += 1;
+                    },
+                    None => break
+                }
+            }
+
+            Ok(i)
+        }
+    }
+}
+
+/// Read side of a pipe
+#[derive(Clone)]
+pub struct PipeWrite {
+    vec: Weak<Mutex<VecDeque<u8>>>,
+}
+
+impl PipeWrite {
+    pub fn new(read: &PipeRead) -> Self {
+        PipeWrite {
+            vec: Arc::downgrade(&read.vec),
+        }
+    }
+
+    fn write(&self, buf: &[u8]) -> Result<usize> {
+        match self.vec.upgrade() {
+            Some(vec) => {
+                for &b in buf.iter() {
+                    vec.lock().push_back(b);
+                }
+
+                Ok(buf.len())
+            },
+            None => Err(Error::new(EPIPE))
+        }
+    }
+}
diff --git a/syscall/fs.rs b/syscall/fs.rs
index f527c36c..fb06884e 100644
--- a/syscall/fs.rs
+++ b/syscall/fs.rs
@@ -1,4 +1,5 @@
 //! Filesystem syscalls
+use core::sync::atomic::Ordering;
 
 use context;
 use scheme;
@@ -101,6 +102,34 @@ pub fn open(path: &[u8], flags: usize) -> Result<usize> {
     }).ok_or(Error::new(EMFILE))
 }
 
+pub fn pipe2(fds: &mut [usize], flags: usize) -> Result<usize> {
+    if fds.len() >= 2 {
+        let scheme_id = ::scheme::pipe::PIPE_SCHEME_ID.load(Ordering::SeqCst);
+        let (read_id, write_id) = ::scheme::pipe::pipe(flags);
+
+        let contexts = context::contexts();
+        let context_lock = contexts.current().ok_or(Error::new(ESRCH))?;
+        let context = context_lock.read();
+
+        let read_fd = context.add_file(::context::file::File {
+            scheme: scheme_id,
+            number: read_id
+        }).ok_or(Error::new(EMFILE))?;
+
+        let write_fd = context.add_file(::context::file::File {
+            scheme: scheme_id,
+            number: write_id
+        }).ok_or(Error::new(EMFILE))?;
+
+        fds[0] = read_fd;
+        fds[1] = write_fd;
+
+        Ok(0)
+    } else {
+        Err(Error::new(EFAULT))
+    }
+}
+
 /// mkdir syscall
 pub fn mkdir(path: &[u8], mode: u16) -> Result<usize> {
     let (path_canon, uid, gid) = {
diff --git a/syscall/mod.rs b/syscall/mod.rs
index fc357a6b..9599a443 100644
--- a/syscall/mod.rs
+++ b/syscall/mod.rs
@@ -59,6 +59,7 @@ pub extern fn syscall(a: usize, b: usize, c: usize, d: usize, e: usize, f: usize
                 SYS_GETEGID => getegid(),
                 SYS_SETUID => setuid(b as u32),
                 SYS_SETGID => setgid(b as u32),
+                SYS_PIPE2 => pipe2(validate_slice_mut(b as *mut usize, 2)?, c),
                 SYS_PHYSALLOC => physalloc(b),
                 SYS_PHYSFREE => physfree(b, c),
                 SYS_PHYSMAP => physmap(b, c, d),
@@ -72,7 +73,7 @@ pub extern fn syscall(a: usize, b: usize, c: usize, d: usize, e: usize, f: usize
     let result = inner(a, b, c, d, e, f, stack);
 
     if let Err(ref err) = result {
-        println!("{}, {}, {}, {}: {}", a, b, c, d, err);
+        println!("{}, {}, {}, {}: {}", a & 0xFFFF, b, c, d, err);
     }
 
     Error::mux(result)
diff --git a/syscall/process.rs b/syscall/process.rs
index e00d3146..61d0b03e 100644
--- a/syscall/process.rs
+++ b/syscall/process.rs
@@ -3,6 +3,7 @@ use alloc::arc::Arc;
 use alloc::boxed::Box;
 use collections::{BTreeMap, Vec};
 use core::mem;
+use core::ops::DerefMut;
 use core::str;
 use spin::Mutex;
 
@@ -397,31 +398,51 @@ pub fn clone(flags: usize, stack_base: usize) -> Result<usize> {
 
 pub fn exit(status: usize) -> ! {
     {
-        let contexts = context::contexts();
-        let (vfork, ppid) = {
-            let context_lock = contexts.current().expect("tried to exit without context");
-            let mut context = context_lock.write();
-            context.image.clear();
-            drop(context.heap.take());
-            drop(context.stack.take());
-            context.grants = Arc::new(Mutex::new(Vec::new()));
-            context.files = Arc::new(Mutex::new(Vec::new()));
-            context.status = context::Status::Exited(status);
-
-            let vfork = context.vfork;
-            context.vfork = false;
-            (vfork, context.ppid)
-        };
-        if vfork {
-            if let Some(context_lock) = contexts.get(ppid) {
+        let mut close_files = Vec::new();
+        {
+            let contexts = context::contexts();
+            let (vfork, ppid) = {
+                let context_lock = contexts.current().expect("tried to exit without context");
                 let mut context = context_lock.write();
-                if context.status == context::Status::Blocked {
-                    context.status = context::Status::Runnable;
+                context.image.clear();
+                drop(context.heap.take());
+                drop(context.stack.take());
+                context.grants = Arc::new(Mutex::new(Vec::new()));
+                if Arc::strong_count(&context.files) == 1 {
+                    mem::swap(context.files.lock().deref_mut(), &mut close_files);
+                }
+                context.files = Arc::new(Mutex::new(Vec::new()));
+                context.status = context::Status::Exited(status);
+
+                let vfork = context.vfork;
+                context.vfork = false;
+                (vfork, context.ppid)
+            };
+            if vfork {
+                if let Some(context_lock) = contexts.get(ppid) {
+                    let mut context = context_lock.write();
+                    if context.status == context::Status::Blocked {
+                        context.status = context::Status::Runnable;
+                    } else {
+                        println!("{} not blocked for exit vfork unblock", ppid);
+                    }
                 } else {
-                    println!("{} not blocked for exit vfork unblock", ppid);
+                    println!("{} not found for exit vfork unblock", ppid);
+                }
+            }
+        }
+
+        for (fd, file_option) in close_files.drain(..).enumerate() {
+            if let Some(file) = file_option {
+                context::event::unregister(fd, file.scheme, file.number);
+
+                let scheme_option = {
+                    let schemes = scheme::schemes();
+                    schemes.get(file.scheme).map(|scheme| scheme.clone())
+                };
+                if let Some(scheme) = scheme_option {
+                    let _ = scheme.close(file.number);
                 }
-            } else {
-                println!("{} not found for exit vfork unblock", ppid);
             }
         }
     }
-- 
GitLab