Commit dad2a4aa authored by Michael Aaron Murphy's avatar Michael Aaron Murphy

Async History File Writes

Fixes a severe bottleneck in the Ion shell where adding a new command
to the history causes a one second hang before the prompt can be displayed.
parent adab01da
......@@ -6,6 +6,11 @@ use std::fs::{File, OpenOptions};
use std::io::{self, Read, Seek, SeekFrom, Write};
use std::ops::Index;
use std::ops::IndexMut;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::mpsc::{channel, Sender};
use std::thread::{sleep, spawn, JoinHandle};
use std::time::Duration;
const DEFAULT_MAX_SIZE: usize = 1000;
......@@ -14,8 +19,6 @@ pub struct History {
// TODO: this should eventually be private
/// Vector of buffers to store history in
pub buffers: VecDeque<Buffer>,
// TODO: Do we need this here? Ion can take care of this.
// pub previous_status: i32,
/// Store a filename to save history into; if None don't save history
file_name: Option<String>,
/// Maximal number of buffers stored in the memory
......@@ -23,20 +26,59 @@ pub struct History {
max_size: usize,
/// Maximal number of lines stored in the file
// TODO: just make this public?
max_file_size: usize,
max_file_size: Arc<AtomicUsize>,
/// Handle to the background thread managing writes to the history file
bg_handle: Option<JoinHandle<()>>,
/// Signals the background thread to stop when dropping the struct
bg_stop: Arc<AtomicBool>,
/// Sends commands to write to the history file
sender: Sender<(Buffer, String)>,
// TODO set from environment variable?
pub append_duplicate_entries: bool,
}
impl History {
/// It's important to execute this function before exiting your program, as it will
/// ensure that all history data has been written to the disk.
pub fn commit_history(&mut self) {
// Signal the background thread to stop
self.bg_stop.store(true, Ordering::Relaxed);
// Wait for the background thread to stop
if let Some(handle) = self.bg_handle.take() {
let _ = handle.join();
}
}
/// Create new History structure.
pub fn new() -> History {
let max_file_size = Arc::new(AtomicUsize::new(DEFAULT_MAX_SIZE));
let bg_stop = Arc::new(AtomicBool::new(false));
let (sender, receiver) = channel();
let stop_signal = bg_stop.clone();
let max_size = max_file_size.clone();
History {
buffers: VecDeque::with_capacity(DEFAULT_MAX_SIZE),
file_name: None,
sender: sender,
bg_handle: Some(spawn(move || {
while !stop_signal.load(Ordering::Relaxed) {
if let Ok((command, filepath)) = receiver.try_recv() {
let max_file_size = max_size.load(Ordering::Relaxed);
let _ = write_to_disk(max_file_size, &command, &filepath);
}
sleep(Duration::from_millis(1));
}
if let Ok((command, filepath)) = receiver.try_recv() {
let max_file_size = max_size.load(Ordering::Relaxed);
let _ = write_to_disk(max_file_size, &command, &filepath);
}
})),
bg_stop: bg_stop,
max_size: DEFAULT_MAX_SIZE,
max_file_size: DEFAULT_MAX_SIZE,
max_file_size: max_file_size,
append_duplicate_entries: false,
}
}
......@@ -50,10 +92,11 @@ impl History {
/// size has been met. If writing to the disk is enabled, this function will be used for
/// logging history to the designated history file.
pub fn push(&mut self, new_item: Buffer) -> io::Result<()> {
let mut ret = Ok(());
self.file_name
.as_ref()
.map(|name| { ret = self.write_to_disk(&new_item, name); });
.map(|name| {
let _ = self.sender.send((new_item.clone(), name.to_owned()));
});
// buffers[0] is the oldest entry
// the new entry goes to the end
......@@ -66,7 +109,7 @@ impl History {
while self.buffers.len() > self.max_size {
self.buffers.pop_front();
}
ret
Ok(())
}
/// Go through the history and try to find a buffer which starts the same as the new buffer
......@@ -107,7 +150,7 @@ impl History {
/// Set maximal number of entries in history file
pub fn set_max_file_size(&mut self, size: usize) {
self.max_file_size = size;
self.max_file_size.store(size, Ordering::Relaxed);
}
/// Load history from given file name
......@@ -131,74 +174,6 @@ impl History {
}
Ok(())
}
/// Perform write operation. If the history file does not exist, it will be created.
/// This function is not part of the public interface.
/// XXX: include more information in the file (like fish does)
fn write_to_disk(&self, new_item: &Buffer, file_name: &String) -> io::Result<()> {
let ret = match OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(file_name) {
Ok(mut file) => {
// Determine the number of commands stored and the file length.
let (file_length, commands_stored) = {
let mut commands_stored = 0;
let mut file_length = 0;
let file = File::open(file_name).unwrap();
for byte in file.bytes() {
if byte.unwrap_or(b' ') == b'\n' {
commands_stored += 1;
}
file_length += 1;
}
(file_length, commands_stored)
};
// If the max history file size has been reached, truncate the file so that only
// N amount of commands are listed. To truncate the file, the seek point will be
// discovered by counting the number of bytes until N newlines have been found and
// then the file will be seeked to that point, copying all data after and rewriting
// the file with the first N lines removed.
if commands_stored >= self.max_file_size {
let seek_point = {
let commands_to_delete = commands_stored - self.max_file_size + 1;
let mut matched = 0;
let mut bytes = 0;
let file = File::open(file_name).unwrap();
for byte in file.bytes() {
if byte.unwrap_or(b' ') == b'\n' {
matched += 1;
}
bytes += 1;
if matched == commands_to_delete {
break;
}
}
bytes as u64
};
try!(file.seek(SeekFrom::Start(seek_point)));
let mut buffer: Vec<u8> = Vec::with_capacity(file_length - seek_point as usize);
try!(file.read_to_end(&mut buffer));
try!(file.set_len(0));
try!(io::copy(&mut buffer.as_slice(), &mut file));
}
// Seek to end for appending
try!(file.seek(SeekFrom::End(0)));
// Write the command to the history file.
try!(file.write_all(String::from(new_item.clone()).as_bytes()));
try!(file.write(b"\n"));
Ok(())
}
Err(message) => Err(message),
};
ret
}
}
impl Index<usize> for History {
......@@ -214,3 +189,71 @@ impl IndexMut<usize> for History {
&mut self.buffers[index]
}
}
/// Perform write operation. If the history file does not exist, it will be created.
/// This function is not part of the public interface.
/// XXX: include more information in the file (like fish does)
fn write_to_disk(max_file_size: usize, new_item: &Buffer, file_name: &str) -> io::Result<()> {
let ret = match OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(file_name) {
Ok(mut file) => {
// Determine the number of commands stored and the file length.
let (file_length, commands_stored) = {
let mut commands_stored = 0;
let mut file_length = 0;
let file = File::open(file_name).unwrap();
for byte in file.bytes() {
if byte.unwrap_or(b' ') == b'\n' {
commands_stored += 1;
}
file_length += 1;
}
(file_length, commands_stored)
};
// If the max history file size has been reached, truncate the file so that only
// N amount of commands are listed. To truncate the file, the seek point will be
// discovered by counting the number of bytes until N newlines have been found and
// then the file will be seeked to that point, copying all data after and rewriting
// the file with the first N lines removed.
if commands_stored >= max_file_size {
let seek_point = {
let commands_to_delete = commands_stored - max_file_size + 1;
let mut matched = 0;
let mut bytes = 0;
let file = File::open(file_name).unwrap();
for byte in file.bytes() {
if byte.unwrap_or(b' ') == b'\n' {
matched += 1;
}
bytes += 1;
if matched == commands_to_delete {
break;
}
}
bytes as u64
};
try!(file.seek(SeekFrom::Start(seek_point)));
let mut buffer: Vec<u8> = Vec::with_capacity(file_length - seek_point as usize);
try!(file.read_to_end(&mut buffer));
try!(file.set_len(0));
try!(io::copy(&mut buffer.as_slice(), &mut file));
}
// Seek to end for appending
try!(file.seek(SeekFrom::End(0)));
// Write the command to the history file.
try!(file.write_all(String::from(new_item.clone()).as_bytes()));
try!(file.write(b"\n"));
Ok(())
}
Err(message) => Err(message),
};
ret
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment