Skip to content
Snippets Groups Projects
Verified Commit fcd8ce7e authored by Jeremy Soller's avatar Jeremy Soller
Browse files

Fix event logic for pipes

parent afa3f383
No related branches found
No related tags found
No related merge requests found
......@@ -7,7 +7,7 @@ use event;
use scheme::{AtomicSchemeId, ATOMIC_SCHEMEID_INIT, SchemeId};
use sync::WaitCondition;
use syscall::error::{Error, Result, EAGAIN, EBADF, EINTR, EINVAL, EPIPE, ESPIPE};
use syscall::flag::{EVENT_READ, F_GETFL, F_SETFL, O_ACCMODE, O_NONBLOCK, MODE_FIFO};
use syscall::flag::{EVENT_READ, EVENT_WRITE, F_GETFL, F_SETFL, O_ACCMODE, O_NONBLOCK, MODE_FIFO};
use syscall::scheme::Scheme;
use syscall::data::Stat;
......@@ -36,8 +36,8 @@ pub fn pipe(flags: usize) -> (usize, usize) {
let scheme_id = PIPE_SCHEME_ID.load(Ordering::SeqCst);
let read_id = PIPE_NEXT_ID.fetch_add(1, Ordering::SeqCst);
let write_id = PIPE_NEXT_ID.fetch_add(1, Ordering::SeqCst);
let read = PipeRead::new(scheme_id, read_id, flags);
let write = PipeWrite::new(&read, flags);
let read = PipeRead::new(scheme_id, write_id, flags);
let write = PipeWrite::new(&read, read_id, flags);
pipes.0.insert(read_id, Arc::new(read));
pipes.1.insert(write_id, Arc::new(write));
(read_id, write_id)
......@@ -91,7 +91,20 @@ impl Scheme for PipeScheme {
let pipes = pipes();
if let Some(pipe) = pipes.0.get(&id) {
return pipe.fevent(flags);
if flags == EVENT_READ {
// TODO: Return correct flags
if pipe.vec.lock().is_empty() {
return Ok(0);
} else {
return Ok(EVENT_READ);
}
}
}
if let Some(pipe) = pipes.1.get(&id) {
if flags == EVENT_WRITE {
return Ok(EVENT_WRITE);
}
}
Err(Error::new(EBADF))
......@@ -137,17 +150,17 @@ impl Scheme for PipeScheme {
/// Read side of a pipe
pub struct PipeRead {
scheme_id: SchemeId,
event_id: usize,
write_id: usize,
flags: AtomicUsize,
condition: Arc<WaitCondition>,
vec: Arc<Mutex<VecDeque<u8>>>
}
impl PipeRead {
pub fn new(scheme_id: SchemeId, event_id: usize, flags: usize) -> Self {
pub fn new(scheme_id: SchemeId, write_id: usize, flags: usize) -> Self {
PipeRead {
scheme_id: scheme_id,
event_id: event_id,
scheme_id,
write_id,
flags: AtomicUsize::new(flags),
condition: Arc::new(WaitCondition::new()),
vec: Arc::new(Mutex::new(VecDeque::new())),
......@@ -165,10 +178,6 @@ impl PipeRead {
}
}
fn fevent(&self, _flags: usize) -> Result<usize> {
Ok(0)
}
fn read(&self, buf: &mut [u8]) -> Result<usize> {
loop {
{
......@@ -185,6 +194,8 @@ impl PipeRead {
}
if i > 0 {
event::trigger(self.scheme_id, self.write_id, EVENT_WRITE);
return Ok(i);
}
}
......@@ -205,17 +216,17 @@ impl PipeRead {
/// Read side of a pipe
pub struct PipeWrite {
scheme_id: SchemeId,
event_id: usize,
read_id: usize,
flags: AtomicUsize,
condition: Arc<WaitCondition>,
vec: Option<Weak<Mutex<VecDeque<u8>>>>
}
impl PipeWrite {
pub fn new(read: &PipeRead, flags: usize) -> Self {
pub fn new(read: &PipeRead, read_id: usize, flags: usize) -> Self {
PipeWrite {
scheme_id: read.scheme_id,
event_id: read.event_id,
read_id,
flags: AtomicUsize::new(flags),
condition: read.condition.clone(),
vec: Some(Arc::downgrade(&read.vec)),
......@@ -244,7 +255,7 @@ impl PipeWrite {
}
}
event::trigger(self.scheme_id, self.event_id, EVENT_READ);
event::trigger(self.scheme_id, self.read_id, EVENT_READ);
self.condition.notify();
Ok(buf.len())
......@@ -260,7 +271,7 @@ impl PipeWrite {
impl Drop for PipeWrite {
fn drop(&mut self) {
drop(self.vec.take());
event::trigger(self.scheme_id, self.event_id, EVENT_READ);
event::trigger(self.scheme_id, self.read_id, EVENT_READ);
self.condition.notify();
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment