Commit 370b11fb authored by Jeremy Soller's avatar Jeremy Soller

Cleanup of wait queue

parent dca4ea50
......@@ -120,7 +120,7 @@ impl Smolnetd {
pub fn on_network_scheme_event(&mut self) -> Result<Option<()>> {
if self.read_frames()? > 0 {
self.poll().map(Some)?;
self.poll()?;
}
Ok(None)
}
......
......@@ -104,8 +104,6 @@ struct WaitHandle {
type WaitQueue = Vec<WaitHandle>;
type WaitQueueMap = BTreeMap<SocketHandle, WaitQueue>;
pub type DupResult<T> = Option<(
SchemeFile<T>,
Option<(SocketHandle, <T as SchemeSocket>::DataT)>,
......@@ -163,7 +161,7 @@ where
files: BTreeMap<usize, SchemeFile<SocketT>>,
socket_set: Rc<RefCell<SocketSet>>,
scheme_file: File,
wait_queue_map: WaitQueueMap,
wait_queue: WaitQueue,
scheme_data: SocketT::SchemeDataT,
_phantom_socket: PhantomData<SocketT>,
}
......@@ -180,7 +178,7 @@ where
socket_set,
scheme_data: SocketT::new_scheme_data(),
scheme_file,
wait_queue_map: BTreeMap::new(),
wait_queue: Vec::new(),
_phantom_socket: PhantomData,
}
}
......@@ -195,13 +193,32 @@ where
packet.a = a;
self.scheme_file.write_all(&packet)?;
} else {
self.handle_block(packet)?;
match self.handle_block(&mut packet) {
Ok(timeout) => {
self.wait_queue.push(WaitHandle {
until: timeout,
packet: packet,
});
},
Err(err) => {
packet.a = (-err.errno) as usize;
self.scheme_file.write_all(&packet)?;
return Err(Error::from_syscall_error(
err,
"Can't handle blocked socket",
));
}
}
}
}
Ok(None)
}
pub fn notify_sockets(&mut self) -> Result<()> {
let mut cur_time = TimeSpec::default();
syscall::clock_gettime(syscall::CLOCK_MONOTONIC, &mut cur_time)
.map_err(|e| Error::from_syscall_error(e, "Can't get time"))?;
// Notify non-blocking sockets
for (&fd, ref mut file) in &mut self.files {
if let &mut SchemeFile::Socket(SocketFile {
......@@ -236,112 +253,43 @@ where
}
// Wake up blocking queue
self.wake_up_queues()?;
Ok(())
}
fn wake_up_queues(&mut self) -> Result<()> {
let mut cur_time = TimeSpec::default();
syscall::clock_gettime(syscall::CLOCK_MONOTONIC, &mut cur_time)
.map_err(|e| Error::from_syscall_error(e, "Can't get time"))?;
let socket_handles: Vec<_> = self.wait_queue_map.keys().cloned().collect();
for socket_handle in socket_handles {
self.wake_up_wait_queue(socket_handle, cur_time)?;
}
Ok(())
}
fn wake_up_wait_queue(
&mut self,
socket_handle: SocketHandle,
cur_time: syscall::TimeSpec,
) -> Result<()> {
let mut input_queue = if let Some(wait_queue) = self.wait_queue_map.get_mut(&socket_handle)
{
::std::mem::replace(wait_queue, vec![])
} else {
vec![]
};
let mut to_retain = vec![];
for wait_handle in input_queue.drain(..) {
let mut packet = wait_handle.packet;
if let Some(a) = self.handle(&mut packet) {
let mut i = 0;
while i < self.wait_queue.len() {
let mut packet = self.wait_queue[i].packet;
if let Some(a) = self.handle(&packet) {
self.wait_queue.remove(i);
packet.a = a;
self.scheme_file.write_all(&packet)?;
} else {
match wait_handle.until {
match self.wait_queue[i].until {
Some(until)
if (until.tv_sec < cur_time.tv_sec
|| (until.tv_sec == cur_time.tv_sec
&& until.tv_nsec < cur_time.tv_nsec)) =>
{
self.wait_queue.remove(i);
packet.a = (-syscall::ETIMEDOUT) as usize;
self.scheme_file.write_all(&packet)?;
}
},
_ => {
to_retain.push(wait_handle);
i += 1;
}
}
}
}
if let Some(wait_queue) = self.wait_queue_map.get_mut(&socket_handle) {
wait_queue.extend(to_retain);
}
Ok(())
}
fn move_file_to_new_socket_handle(
wait_queue_map: &mut WaitQueueMap,
fd: usize,
from: SocketHandle,
to: SocketHandle,
) -> SyscallResult<()> {
let mut to_move = vec![];
if let Some(wait_queue) = wait_queue_map.get_mut(&from) {
to_move = wait_queue
.drain_filter(|wh| wh.packet.b == fd)
.collect::<Vec<_>>();
}
wait_queue_map
.entry(to)
.or_insert_with(|| vec![])
.extend(to_move);
Ok(())
}
fn handle_block(&mut self, mut packet: SyscallPacket) -> Result<()> {
let syscall_result = self.try_handle_block(&mut packet);
if let Err(syscall_error) = syscall_result {
packet.a = (-syscall_error.errno) as usize;
self.scheme_file.write_all(&packet)?;
Err(Error::from_syscall_error(
syscall_error,
"Can't handle blocked socket",
))
} else {
Ok(())
}
}
fn try_handle_block(&mut self, packet: &mut SyscallPacket) -> SyscallResult<()> {
fn handle_block(&mut self, packet: &mut SyscallPacket) -> SyscallResult<Option<TimeSpec>> {
let fd = packet.b;
let (socket_handle, read_timeout, write_timeout) = {
let (read_timeout, write_timeout) = {
let file = self.files
.get(&fd)
.ok_or_else(|| SyscallError::new(syscall::EBADF))?;
if let SchemeFile::Socket(ref scheme_file) = *file {
Ok((
scheme_file.socket_handle,
scheme_file.read_timeout,
scheme_file.write_timeout,
))
......@@ -362,16 +310,7 @@ where
*timeout = add_time(timeout, &cur_time)
}
let wait_queue = self.wait_queue_map
.entry(socket_handle)
.or_insert_with(|| vec![]);
wait_queue.push(WaitHandle {
until: timeout,
packet: *packet,
});
Ok(())
Ok(timeout)
}
fn get_setting(
......@@ -538,21 +477,14 @@ where
let socket = socket_set.get::<SocketT>(socket_handle);
socket.close_file(&scheme_file, &mut self.scheme_data)?;
}
let remove_wq =
if let Some(ref mut wait_queue) = self.wait_queue_map.get_mut(&socket_handle) {
wait_queue.retain(
|&WaitHandle {
packet: SyscallPacket { a, .. },
..
}| a != fd,
);
wait_queue.is_empty()
} else {
false
};
if remove_wq {
self.wait_queue_map.remove(&socket_handle);
}
self.wait_queue.retain(
|&WaitHandle {
packet: SyscallPacket { a, .. },
..
}| a != fd,
);
socket_set.release(socket_handle);
//TODO: removing sockets in release should make prune unnecessary
socket_set.prune();
......@@ -560,6 +492,7 @@ where
}
fn write(&mut self, fd: usize, buf: &[u8]) -> SyscallResult<Option<usize>> {
println!("write({}, {:p}, {})", fd, buf.as_ptr(), buf.len());
let (fd, setting) = {
let file = self.files
.get_mut(&fd)
......@@ -580,6 +513,7 @@ where
}
fn read(&mut self, fd: usize, buf: &mut [u8]) -> SyscallResult<Option<usize>> {
println!("read({}, {:p}, {})", fd, buf.as_ptr(), buf.len());
let (fd, setting) = {
let file = self.files
.get_mut(&fd)
......@@ -653,12 +587,6 @@ where
if let Some((socket_handle, data)) = update_with {
if let SchemeFile::Socket(ref mut file) = *file {
Self::move_file_to_new_socket_handle(
&mut self.wait_queue_map,
fd,
file.socket_handle,
socket_handle,
)?;
file.socket_handle = socket_handle;
file.data = data;
} else {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment