Verified Commit 9454cdfb authored by jD91mZM2's avatar jD91mZM2
Browse files

Code cleanup & bugfixes related to connections dropped early

parent 31a2b95f
use std::{
fs::File,
io::{self, prelude::*}
};
fn main() -> io::Result<()> {
let mut client = File::open("chan:hello")?;
io::copy(&mut client, &mut io::stdout())?;
Ok(())
}
......@@ -9,14 +9,22 @@ use std::{
fn from_syscall_error(error: syscall::Error) -> io::Error {
io::Error::from_raw_os_error(error.errno as i32)
}
fn nonblock(file: &File) -> io::Result<()> {
syscall::fcntl(file.as_raw_fd(), syscall::F_SETFL, syscall::O_NONBLOCK)
.map(|_| ())
.map_err(from_syscall_error)
}
fn dup(file: &File, buf: &str) -> io::Result<File> {
let stream = syscall::dup(file.as_raw_fd(), buf.as_bytes()).map_err(from_syscall_error)?;
Ok(unsafe { File::from_raw_fd(stream) })
}
fn main() -> io::Result<()> {
let server = File::create("chan:hello_world")?;
println!("Testing events...");
syscall::fcntl(server.as_raw_fd(), syscall::F_SETFL, syscall::O_NONBLOCK)
.map_err(from_syscall_error)?;
nonblock(&server)?;
let mut event_file = File::open("event:")?;
let mut time_file = File::open(format!("time:{}", syscall::CLOCK_MONOTONIC))?;
......@@ -65,8 +73,7 @@ fn main() -> io::Result<()> {
assert_eq!(event.flags, syscall::EVENT_WRITE);
println!("-> Accept event");
let stream = syscall::dup(server.as_raw_fd(), b"listen").map_err(from_syscall_error)?;
let mut stream = unsafe { File::from_raw_fd(stream) };
let mut stream = dup(&server, "listen")?;
event_file.read(&mut event)?;
assert_eq!(event.data, TOKEN_CLIENT);
......
......@@ -11,6 +11,15 @@ use std::{
fn from_syscall_error(error: syscall::Error) -> io::Error {
io::Error::from_raw_os_error(error.errno as i32)
}
fn nonblock(file: &File) -> io::Result<()> {
syscall::fcntl(file.as_raw_fd(), syscall::F_SETFL, syscall::O_NONBLOCK)
.map(|_| ())
.map_err(from_syscall_error)
}
fn dup(file: &File, buf: &str) -> io::Result<File> {
let stream = syscall::dup(file.as_raw_fd(), buf.as_bytes()).map_err(from_syscall_error)?;
Ok(unsafe { File::from_raw_fd(stream) })
}
fn main() -> io::Result<()> {
let mut buf = [0; 5];
......@@ -20,8 +29,7 @@ fn main() -> io::Result<()> {
// First client not accepted yet
assert_eq!(File::open("chan:hello_world").unwrap_err().kind(), io::ErrorKind::ConnectionRefused);
let stream = syscall::dup(server.as_raw_fd(), b"listen").map_err(from_syscall_error)?;
let mut stream = unsafe { File::from_raw_fd(stream) };
let mut stream = dup(&server, "listen")?;
println!("Testing basic I/O...");
......@@ -40,11 +48,9 @@ fn main() -> io::Result<()> {
assert_eq!(stream.read(&mut buf)?, 0);
}
println!("Testing alternative connect method...");
let client = syscall::dup(server.as_raw_fd(), b"connect").map_err(from_syscall_error)?;
let mut client = unsafe { File::from_raw_fd(client) };
let stream = syscall::dup(server.as_raw_fd(), b"listen").map_err(from_syscall_error)?;
let mut stream = unsafe { File::from_raw_fd(stream) };
let mut client = dup(&server, "connect")?;
let mut stream = dup(&server, "listen")?;
println!("Testing blocking I/O...");
......@@ -67,24 +73,33 @@ fn main() -> io::Result<()> {
println!("Testing non-blocking I/O...");
syscall::fcntl(client.as_raw_fd(), syscall::F_SETFL, syscall::O_NONBLOCK)
.map_err(from_syscall_error)?;
syscall::fcntl(server.as_raw_fd(), syscall::F_SETFL, syscall::O_NONBLOCK)
.map_err(from_syscall_error)?;
nonblock(&client)?;
nonblock(&server)?;
assert_eq!(client.read(&mut buf).unwrap_err().kind(), io::ErrorKind::WouldBlock);
println!("-> Read would block");
match syscall::dup(server.as_raw_fd(), b"listen") {
Ok(stream) => {
unsafe { File::from_raw_fd(stream); }
panic!("this is supposed to fail");
},
Err(err) => {
let err = from_syscall_error(err);
assert_eq!(err.kind(), io::ErrorKind::WouldBlock);
}
}
assert_eq!(dup(&server, "listen").unwrap_err().kind(), io::ErrorKind::WouldBlock);
println!("-> Accept would block");
drop(client);
{
let mut client = File::open("chan:hello_world")?;
nonblock(&client)?;
assert_eq!(client.write(b"a").unwrap_err().kind(), io::ErrorKind::WouldBlock);
println!("-> Write before accept would block");
}
assert_eq!(dup(&server, "listen").unwrap_err().kind(), io::ErrorKind::ConnectionReset);
println!("-> Server can't accept dropped client");
let mut client = dup(&server, "connect")?;
nonblock(&client)?;
assert_eq!(client.write(b"a").unwrap_err().kind(), io::ErrorKind::WouldBlock);
println!("-> Write before accept would block (alternative connection method)");
println!("Everything tested!");
Ok(())
}
......@@ -9,16 +9,17 @@ use std::{
fn from_syscall_error(error: syscall::Error) -> io::Error {
io::Error::from_raw_os_error(error.errno as i32)
}
fn dup(file: &File, buf: &str) -> io::Result<File> {
let stream = syscall::dup(file.as_raw_fd(), buf.as_bytes()).map_err(from_syscall_error)?;
Ok(unsafe { File::from_raw_fd(stream) })
}
fn main() -> io::Result<()> {
let mut buf = [0; 5];
let server = File::create("chan:")?;
let client = syscall::dup(server.as_raw_fd(), b"connect").map_err(from_syscall_error)?;
let mut client = unsafe { File::from_raw_fd(client) };
let stream = syscall::dup(server.as_raw_fd(), b"listen").map_err(from_syscall_error)?;
let mut stream = unsafe { File::from_raw_fd(stream) };
let mut client = dup(&server, "connect")?;
let mut stream = dup(&server, "listen")?;
println!("Testing basic I/O...");
......
......@@ -7,11 +7,25 @@ use std::{
};
use syscall::{flag::*, error::*, Error, SchemeBlockMut, Result};
#[derive(Default)]
#[derive(Debug, Default)]
pub struct Client {
buffer: Vec<u8>
}
#[derive(Debug, Default)]
pub struct Listener {
path: Option<String>
}
#[derive(Clone, Copy, Eq, PartialEq)]
#[derive(Debug)]
pub enum Extra {
Client(Client),
Listener(Listener)
}
impl Default for Extra {
fn default() -> Self {
Extra::Client(Client::default())
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum Connection {
Waiting,
Open(usize),
......@@ -23,16 +37,15 @@ impl Default for Connection {
}
}
#[derive(Default)]
#[derive(Debug, Default)]
pub struct Handle {
flags: usize,
fevent: usize,
notified_read: bool,
notified_write: bool,
listener: Option<Listener>,
remote: Connection,
buffer: Vec<u8>
extra: Extra
}
impl Handle {
pub fn accept(&mut self) -> Self {
......@@ -43,7 +56,11 @@ impl Handle {
}
}
pub fn is_listener(&self) -> bool {
self.listener.is_some()
if let Extra::Listener(_) = self.extra {
true
} else {
false
}
}
}
......@@ -56,30 +73,33 @@ pub struct IpcScheme {
impl IpcScheme {
pub fn post_fevents(&mut self, file: &mut File) -> io::Result<()> {
for (id, handle) in &mut self.handles {
if handle.is_listener() {
if let Connection::Open(_) = handle.remote {
// Send writable because that's what smolnetd does for TcpListener
if !handle.notified_write {
handle.notified_write = true;
post_fevent(file, *id, EVENT_WRITE)?;
match handle.extra {
Extra::Listener(_) => {
if let Connection::Open(_) = handle.remote {
// Send writable because that's what smolnetd does for TcpListener
if !handle.notified_write {
handle.notified_write = true;
post_fevent(file, *id, EVENT_WRITE)?;
}
} else {
handle.notified_write = false;
}
} else {
handle.notified_write = false;
}
} else {
if let Connection::Open(_) = handle.remote {
if !handle.notified_write {
handle.notified_write = true;
post_fevent(file, *id, EVENT_WRITE)?;
},
Extra::Client(ref mut client) => {
if let Connection::Open(_) = handle.remote {
if !handle.notified_write {
handle.notified_write = true;
post_fevent(file, *id, EVENT_WRITE)?;
}
}
}
if !handle.buffer.is_empty() || handle.remote == Connection::Closed {
if !handle.notified_read {
handle.notified_read = true;
post_fevent(file, *id, EVENT_READ)?;
if !client.buffer.is_empty() || handle.remote == Connection::Closed {
if !handle.notified_read {
handle.notified_read = true;
post_fevent(file, *id, EVENT_READ)?;
}
} else {
handle.notified_read = false;
}
} else {
handle.notified_read = false;
}
}
}
......@@ -103,7 +123,7 @@ impl SchemeBlockMut for IpcScheme {
self.listeners.insert(String::from(path), id);
listener.path = Some(String::from(path));
}
new.listener = Some(listener);
new.extra = Extra::Listener(listener);
} else {
let listener = self.listeners.get(path).ok_or(Error::new(ENOENT))?;
let handle = self.handles.get_mut(&listener).expect("orphan listener left over");
......@@ -125,13 +145,18 @@ impl SchemeBlockMut for IpcScheme {
};
if let Connection::Open(remote) = remote {
let new_id = self.next_id;
let mut clone = self.handles.get_mut(&id).map(Handle::accept).unwrap();
{
// This might fail if the remote side closed early
let mut remote = self.handles.get_mut(&remote).ok_or(Error::new(ECONNRESET))?;
remote.remote = Connection::Open(new_id);
}
self.handles.insert(new_id, clone);
self.next_id += 1;
let mut remote = self.handles.get_mut(&remote).unwrap();
remote.remote = Connection::Open(new_id);
Ok(Some(new_id))
} else if flags & O_NONBLOCK == O_NONBLOCK {
Err(Error::new(EAGAIN))
......@@ -141,9 +166,8 @@ impl SchemeBlockMut for IpcScheme {
},
b"connect" => {
let mut new = Handle::default();
new.remote = Connection::Open(id);
let new_id = self.next_id;
match self.handles.get_mut(&id) {
Some(ref mut handle) if handle.is_listener() => {
if handle.remote != Connection::Waiting {
......@@ -153,6 +177,7 @@ impl SchemeBlockMut for IpcScheme {
},
_ => return Err(Error::new(EBADF))
}
self.handles.insert(new_id, new);
self.next_id += 1;
Ok(Some(new_id))
......@@ -187,8 +212,13 @@ impl SchemeBlockMut for IpcScheme {
};
if let Connection::Open(remote) = remote {
let mut remote = self.handles.get_mut(&remote).unwrap();
remote.buffer.extend(buf);
Ok(Some(buf.len()))
match remote.extra {
Extra::Client(ref mut client) => {
client.buffer.extend(buf);
Ok(Some(buf.len()))
},
Extra::Listener(_) => panic!("somehow, a client was connected to a listener directly")
}
} else if remote == Connection::Waiting && flags & O_NONBLOCK == O_NONBLOCK {
Err(Error::new(EAGAIN))
} else if remote == Connection::Waiting {
......@@ -205,12 +235,15 @@ impl SchemeBlockMut for IpcScheme {
fn read(&mut self, id: usize, buf: &mut [u8]) -> Result<Option<usize>> {
let handle = self.handles.get_mut(&id).ok_or(Error::new(EBADF))?;
if handle.is_listener() {
Err(Error::new(EBADF))
} else if !handle.buffer.is_empty() {
let len = buf.len().min(handle.buffer.len());
buf[..len].copy_from_slice(&handle.buffer[..len]);
handle.buffer.drain(..len);
let client = match handle.extra {
Extra::Client(ref mut client) => client,
Extra::Listener(_) => return Err(Error::new(EBADF))
};
if !client.buffer.is_empty() {
let len = buf.len().min(client.buffer.len());
buf[..len].copy_from_slice(&client.buffer[..len]);
client.buffer.drain(..len);
Ok(Some(len))
} else if handle.remote == Connection::Closed {
// Remote dropped, send EOF
......@@ -224,13 +257,19 @@ impl SchemeBlockMut for IpcScheme {
fn close(&mut self, id: usize) -> Result<Option<usize>> {
let handle = self.handles.remove(&id).ok_or(Error::new(EBADF))?;
if let Connection::Open(remote) = handle.remote {
let mut remote = self.handles.get_mut(&remote).unwrap();
remote.remote = Connection::Closed;
}
if let Some(listener) = handle.listener {
if let Some(path) = listener.path {
self.listeners.remove(&path);
match handle.extra {
Extra::Client(_) => {
if let Connection::Open(remote) = handle.remote {
let mut remote = self.handles.get_mut(&remote).unwrap();
remote.remote = Connection::Closed;
}
},
Extra::Listener(listener) => {
// Clients never register server's remote
if let Some(path) = listener.path {
self.listeners.remove(&path);
}
}
}
Ok(Some(0))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment