From 5514bef912dcbf35b0cfc75d88d7422617110aa2 Mon Sep 17 00:00:00 2001 From: Michael Aaron Murphy <mmstickman@gmail.com> Date: Fri, 27 Oct 2017 11:02:26 -0400 Subject: [PATCH] Simply & Improve Stream Dup/Redir Logic Rather than returning raw fds, we will return Files --- src/shell/job.rs | 42 ++++---- src/shell/pipe_exec/mod.rs | 187 +++++++++++++++------------------ src/shell/pipe_exec/streams.rs | 35 ++++++ 3 files changed, 145 insertions(+), 119 deletions(-) create mode 100644 src/shell/pipe_exec/streams.rs diff --git a/src/shell/job.rs b/src/shell/job.rs index 0b88fcef..b0916b0f 100644 --- a/src/shell/job.rs +++ b/src/shell/job.rs @@ -85,8 +85,8 @@ pub(crate) enum RefinedJob { /// Represents redirection into stdin from more than one source Cat { sources: Vec<File>, - stdin: Option<File>, - stdout: Option<File>, + stdin: Option<File>, + stdout: Option<File>, }, Tee { /// 0 for stdout, 1 for stderr @@ -105,28 +105,32 @@ pub struct TeeItem { } impl TeeItem { - /// Writes out to all destinations of a Tee. Takes an extra `RedirectFrom` argument in order to - /// handle piping. `RedirectFrom` paradoxically indicates where we are piping **to**. It should + /// Writes out to all destinations of a Tee. Takes an extra `RedirectFrom` argument in + /// order to + /// handle piping. `RedirectFrom` paradoxically indicates where we are piping **to**. It + /// should /// never be `RedirectFrom`::Both` pub(crate) fn write_to_all(&mut self, extra: Option<RedirectFrom>) -> ::std::io::Result<()> { - use ::std::io::{self, Write, Read}; - use ::std::os::unix::io::*; - fn write_out<R>(source: &mut R, sinks: &mut [File]) - -> io::Result<()> - where - R: Read + use std::io::{self, Read, Write}; + use std::os::unix::io::*; + fn write_out<R>(source: &mut R, sinks: &mut [File]) -> io::Result<()> + where R: Read { let mut buf = [0; 4096]; loop { // TODO: Figure out how to not block on this read let len = source.read(&mut buf)?; - if len == 0 { return Ok(()); } + if len == 0 { + return Ok(()); + } for file in sinks.iter_mut() { let mut total = 0; loop { let wrote = file.write(&buf[total..len])?; total += wrote; - if total == len { break; } + if total == len { + break; + } } } } @@ -134,14 +138,16 @@ impl TeeItem { let stdout = io::stdout(); let stderr = io::stderr(); match extra { - None => {}, + None => {} Some(RedirectFrom::Stdout) => unsafe { self.sinks.push(File::from_raw_fd(stdout.as_raw_fd())) }, Some(RedirectFrom::Stderr) => unsafe { self.sinks.push(File::from_raw_fd(stderr.as_raw_fd())) }, - Some(RedirectFrom::Both) => panic!("logic error! extra should never be RedirectFrom::Both"), + Some(RedirectFrom::Both) => { + panic!("logic error! extra should never be RedirectFrom::Both") + } }; if let Some(ref mut file) = self.source { write_out(file, &mut self.sinks) @@ -201,8 +207,8 @@ impl RefinedJob { pub(crate) fn tee(tee_out: Option<TeeItem>, tee_err: Option<TeeItem>) -> Self { RefinedJob::Tee { - items: (tee_out, tee_err), - stdin: None, + items: (tee_out, tee_err), + stdin: None, stdout: None, stderr: None, } @@ -266,9 +272,7 @@ impl RefinedJob { format!("{}", args.join(" ")) } // TODO: Figure out real printing - RefinedJob::Cat { .. } | RefinedJob::Tee { .. } => { - "".into() - } + RefinedJob::Cat { .. } | RefinedJob::Tee { .. } => "".into(), } } } diff --git a/src/shell/pipe_exec/mod.rs b/src/shell/pipe_exec/mod.rs index 53d2d012..3c5b9d64 100644 --- a/src/shell/pipe_exec/mod.rs +++ b/src/shell/pipe_exec/mod.rs @@ -7,9 +7,11 @@ pub mod foreground; mod fork; pub mod job_control; +mod streams; use self::fork::{create_process_group, fork_pipe}; use self::job_control::JobControl; +use self::streams::{duplicate_streams, redir, redirect_streams}; use super::{JobKind, Shell}; use super::flags::*; use super::flow_control::FunctionError; @@ -28,12 +30,7 @@ use sys; type RefinedItem = (RefinedJob, JobKind, Vec<Redirection>, Vec<Input>); -/// Use dup2 to replace `old` with `new` using `old`s file descriptor ID -fn redir(old: RawFd, new: RawFd) { - if let Err(e) = sys::dup2(old, new) { - eprintln!("ion: could not duplicate {} to {}: {}", old, new, e); - } -} + /// Create an OS pipe and write the contents of a byte slice to one end /// such that reading from this pipe will produce the byte slice. Return @@ -82,8 +79,7 @@ fn is_implicit_cd(argument: &str) -> bool { /// Insert the multiple redirects as pipelines if necessary. Handle both input and output /// redirection if necessary. -fn do_redirection(piped_commands: Vec<RefinedItem>) - -> Option<Vec<(RefinedJob, JobKind)>> { +fn do_redirection(piped_commands: Vec<RefinedItem>) -> Option<Vec<(RefinedJob, JobKind)>> { macro_rules! get_infile { ($input:expr) => { match $input { @@ -117,7 +113,7 @@ fn do_redirection(piped_commands: Vec<RefinedItem>) JobKind::Pipe(RedirectFrom::Both) => { stdout_count += 1; stderr_count += 1; - }, + } JobKind::Pipe(RedirectFrom::Stdout) => stdout_count += 1, JobKind::Pipe(RedirectFrom::Stderr) => stderr_count += 1, _ => {} @@ -231,12 +227,11 @@ fn do_redirection(piped_commands: Vec<RefinedItem>) let mut prev_kind = JobKind::And; for (mut job, kind, outputs, mut inputs) in piped_commands { match (inputs.len(), prev_kind) { - (0, _) => {}, + (0, _) => {} (1, JobKind::Pipe(_)) => { let sources = vec![get_infile!(inputs[0])?]; - new_commands.push((RefinedJob::cat(sources), - JobKind::Pipe(RedirectFrom::Stdout))); - }, + new_commands.push((RefinedJob::cat(sources), JobKind::Pipe(RedirectFrom::Stdout))); + } (1, _) => job.stdin(get_infile!(inputs[0])?), _ => { let mut sources = Vec::new(); @@ -247,9 +242,8 @@ fn do_redirection(piped_commands: Vec<RefinedItem>) return None; }); } - new_commands.push((RefinedJob::cat(sources), - JobKind::Pipe(RedirectFrom::Stdout))); - }, + new_commands.push((RefinedJob::cat(sources), JobKind::Pipe(RedirectFrom::Stdout))); + } } prev_kind = kind; if outputs.is_empty() { @@ -268,8 +262,14 @@ fn do_redirection(piped_commands: Vec<RefinedItem>) (true, false) => set_one_tee!(new_commands, outputs, job, kind, Stdout, Stderr), // tee both (true, true) => { - let mut tee_out = TeeItem { sinks: Vec::new(), source: None }; - let mut tee_err = TeeItem { sinks: Vec::new(), source: None }; + let mut tee_out = TeeItem { + sinks: Vec::new(), + source: None, + }; + let mut tee_err = TeeItem { + sinks: Vec::new(), + source: None, + }; for output in outputs { match if output.append { OpenOptions::new().create(true).write(true).append(true).open(&output.file) @@ -283,15 +283,17 @@ fn do_redirection(piped_commands: Vec<RefinedItem>) Ok(f_copy) => { tee_out.sinks.push(f); tee_err.sinks.push(f_copy); - }, + } Err(e) => { eprintln!( - "ion: failed to redirect both stdout and stderr to file '{:?}': {}", + "ion: failed to redirect both stdout and stderr to file \ + '{:?}': {}", f, - e); + e + ); return None; } - } + }, }, Err(e) => { eprintln!("ion: failed to redirect output into {}: {}", output.file, e); @@ -332,8 +334,7 @@ pub(crate) trait PipelineExecution { /// /// Each generated command will either be a builtin or external command, and will be /// associated will be marked as an `&&`, `||`, `|`, or final job. - fn generate_commands(&self, pipeline: &mut Pipeline) - -> Result<Vec<RefinedItem>, i32>; + fn generate_commands(&self, pipeline: &mut Pipeline) -> Result<Vec<RefinedItem>, i32>; /// Waits for all of the children within a pipe to finish exuecting, returning the /// exit status of the last process in the queue. @@ -372,19 +373,21 @@ pub(crate) trait PipelineExecution { ) -> i32; /// For cat jobs - fn exec_multi_in(&mut self, - sources: &mut [File], - stdout: &Option<File>, - stdin: &mut Option<File>, + fn exec_multi_in( + &mut self, + sources: &mut [File], + stdout: &Option<File>, + stdin: &mut Option<File>, ) -> i32; /// For tee jobs - fn exec_multi_out(&mut self, - items: &mut (Option<TeeItem>, Option<TeeItem>), - stdout: &Option<File>, - stderr: &Option<File>, - stdin: &Option<File>, - kind: JobKind + fn exec_multi_out( + &mut self, + items: &mut (Option<TeeItem>, Option<TeeItem>), + stdout: &Option<File>, + stderr: &Option<File>, + stdin: &Option<File>, + kind: JobKind, ) -> i32; } @@ -430,13 +433,14 @@ impl<'a> PipelineExecution for Shell<'a> { } } - fn generate_commands( - &self, - pipeline: &mut Pipeline, - ) -> Result<Vec<RefinedItem>, i32> { + fn generate_commands(&self, pipeline: &mut Pipeline) -> Result<Vec<RefinedItem>, i32> { let mut results = Vec::new(); for item in pipeline.items.drain(..) { - let PipeItem { mut job, outputs, inputs } = item; + let PipeItem { + mut job, + outputs, + inputs, + } = item; let refined = { if is_implicit_cd(&job.args[0]) { RefinedJob::builtin( @@ -516,19 +520,11 @@ impl<'a> PipelineExecution for Shell<'a> { ref stdout, ref stderr, } => { - if let Ok(stdout_bk) = sys::dup(sys::STDOUT_FILENO) { - if let Ok(stderr_bk) = sys::dup(sys::STDERR_FILENO) { - if let Ok(stdin_bk) = sys::dup(sys::STDIN_FILENO) { - let args: Vec<&str> = args.iter().map(|x| x as &str).collect(); - let code = self.exec_builtin(name, &args, stdout, stderr, stdin); - redir(stdout_bk, sys::STDOUT_FILENO); - redir(stderr_bk, sys::STDERR_FILENO); - redir(stdin_bk, sys::STDIN_FILENO); - return code; - } - let _ = sys::close(stderr_bk); - } - let _ = sys::close(stdout_bk); + if let Ok((stdin_bk, stdout_bk, stderr_bk)) = duplicate_streams() { + let args: Vec<&str> = args.iter().map(|x| x as &str).collect(); + let code = self.exec_builtin(name, &args, stdout, stderr, stdin); + redirect_streams(stdin_bk, stdout_bk, stderr_bk); + return code; } eprintln!("ion: failed to `dup` STDOUT, STDIN, or STDERR: not running '{}'", long); COULD_NOT_EXEC @@ -540,19 +536,11 @@ impl<'a> PipelineExecution for Shell<'a> { ref stdout, ref stderr, } => { - if let Ok(stdout_bk) = sys::dup(sys::STDOUT_FILENO) { - if let Ok(stderr_bk) = sys::dup(sys::STDERR_FILENO) { - if let Ok(stdin_bk) = sys::dup(sys::STDIN_FILENO) { - let args: Vec<&str> = args.iter().map(|x| x as &str).collect(); - let code = self.exec_function(name, &args, stdout, stderr, stdin); - redir(stdout_bk, sys::STDOUT_FILENO); - redir(stderr_bk, sys::STDERR_FILENO); - redir(stdin_bk, sys::STDIN_FILENO); - return code; - } - let _ = sys::close(stderr_bk); - } - let _ = sys::close(stdout_bk); + if let Ok((stdin_bk, stdout_bk, stderr_bk)) = duplicate_streams() { + let args: Vec<&str> = args.iter().map(|x| x as &str).collect(); + let code = self.exec_function(name, &args, stdout, stderr, stdin); + redirect_streams(stdin_bk, stdout_bk, stderr_bk); + return code; } eprintln!("ion: failed to `dup` STDOUT, STDIN, or STDERR: not running '{}'", long); COULD_NOT_EXEC @@ -633,43 +621,41 @@ impl<'a> PipelineExecution for Shell<'a> { redir(file.as_raw_fd(), sys::STDOUT_FILENO) } - fn read_and_write<R: io::Read>(src: &mut R, stdout: &mut io::StdoutLock) - -> io::Result<()> { + fn read_and_write<R: io::Read>(src: &mut R, stdout: &mut io::StdoutLock) -> io::Result<()> { let mut buf = [0; 4096]; loop { let len = src.read(&mut buf)?; - if len == 0 { return Ok(()) }; + if len == 0 { + return Ok(()); + }; let mut total = 0; loop { let wrote = stdout.write(&buf[total..len])?; total += wrote; - if total == len { break; } + if total == len { + break; + } } } }; let stdout = io::stdout(); let mut stdout = stdout.lock(); for file in stdin.iter_mut().chain(sources) { - match read_and_write(file, &mut stdout) { - Ok(_) => {} - Err(e) => { - eprintln!( - "ion: error in multiple input redirect process: {:?}", - e - ); - return FAILURE; - } + if let Err(why) = read_and_write(file, &mut stdout) { + eprintln!("ion: error in multiple input redirect process: {:?}", why); + return FAILURE; } } SUCCESS } - fn exec_multi_out(&mut self, - items: &mut (Option<TeeItem>, Option<TeeItem>), - stdout: &Option<File>, - stderr: &Option<File>, - stdin: &Option<File>, - kind: JobKind + fn exec_multi_out( + &mut self, + items: &mut (Option<TeeItem>, Option<TeeItem>), + stdout: &Option<File>, + stderr: &Option<File>, + stdin: &Option<File>, + kind: JobKind, ) -> i32 { if let Some(ref file) = *stdin { redir(file.as_raw_fd(), sys::STDIN_FILENO); @@ -686,19 +672,19 @@ impl<'a> PipelineExecution for Shell<'a> { JobKind::Pipe(RedirectFrom::Stderr) => tee_out.write_to_all(None), JobKind::Pipe(_) => tee_out.write_to_all(Some(RedirectFrom::Stdout)), _ => tee_out.write_to_all(None), - } + }, &mut (None, Some(ref mut tee_err)) => match kind { JobKind::Pipe(RedirectFrom::Stdout) => tee_err.write_to_all(None), JobKind::Pipe(_) => tee_err.write_to_all(Some(RedirectFrom::Stderr)), _ => tee_err.write_to_all(None), - } + }, &mut (Some(ref mut tee_out), Some(ref mut tee_err)) => { - // TODO Make it work with pipes - if let Err(e) = tee_out.write_to_all(None) { - Err(e) - } else { + // TODO Make it work with pipes + if let Err(e) = tee_out.write_to_all(None) { + Err(e) + } else { tee_err.write_to_all(None) - } + } } }; if let Err(e) = res { @@ -965,22 +951,21 @@ pub(crate) fn pipe( // If parent is a RefindJob::External, then we need to keep track of the // output pipes, so we can properly close them after the job has been // spawned. - let is_external = if let RefinedJob::External(..) = parent { - true - } else { - false - }; + let is_external = + if let RefinedJob::External(..) = parent { true } else { false }; // If we need to tee both stdout and stderr, we directly connect pipes to // the relevant sources in both of them. if let RefinedJob::Tee { items: (Some(ref mut tee_out), Some(ref mut tee_err)), .. - } = child { + } = child + { match sys::pipe2(sys::O_CLOEXEC) { Err(e) => eprintln!("ion: failed to create pipe: {:?}", e), Ok((out_reader, out_writer)) => { - (*tee_out).source = Some(unsafe { File::from_raw_fd(out_reader) }); + (*tee_out).source = + Some(unsafe { File::from_raw_fd(out_reader) }); parent.stdout(unsafe { File::from_raw_fd(out_writer) }); if is_external { ext_stdio.get_or_insert(vec![]).push(out_writer); @@ -990,7 +975,8 @@ pub(crate) fn pipe( match sys::pipe2(sys::O_CLOEXEC) { Err(e) => eprintln!("ion: failed to create pipe: {:?}", e), Ok((err_reader, err_writer)) => { - (*tee_err).source = Some(unsafe { File::from_raw_fd(err_reader) }); + (*tee_err).source = + Some(unsafe { File::from_raw_fd(err_reader) }); parent.stderr(unsafe { File::from_raw_fd(err_writer) }); if is_external { ext_stdio.get_or_insert(vec![]).push(err_writer); @@ -1019,7 +1005,8 @@ pub(crate) fn pipe( match temp.try_clone() { Err(e) => { eprintln!( - "ion: failed to redirect stdout and stderr: {}", + "ion: failed to redirect stdout and \ + stderr: {}", e ); } diff --git a/src/shell/pipe_exec/streams.rs b/src/shell/pipe_exec/streams.rs new file mode 100644 index 00000000..4bb99a2b --- /dev/null +++ b/src/shell/pipe_exec/streams.rs @@ -0,0 +1,35 @@ +use std::fs::File; +use std::io; +use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; +use sys; + +/// Use dup2 to replace `old` with `new` using `old`s file descriptor ID +pub(crate) fn redir(old: RawFd, new: RawFd) { + if let Err(e) = sys::dup2(old, new) { + eprintln!("ion: could not duplicate {} to {}: {}", old, new, e); + } +} + +/// Duplicates STDIN, STDOUT, and STDERR; in that order; and returns them as `File`s. +/// Why, you ask? A simple safety mechanism to ensure that the duplicated FDs are closed +/// when dropped. +pub(crate) fn duplicate_streams() -> io::Result<(File, File, File)> { + // Duplicates STDIN and converts it into a `File`. + sys::dup(sys::STDIN_FILENO).map(|fd| unsafe { File::from_raw_fd(fd) }) + // Do the same for stdout, and then meld the result with stdin + .and_then(|stdin| sys::dup(sys::STDOUT_FILENO) + .map(|fd| unsafe { File::from_raw_fd(fd) }) + .map(|stdout| (stdin, stdout)) + ) + // And then meld stderr alongside stdin and stdout + .and_then(|(stdin, stdout)| sys::dup(sys::STDERR_FILENO) + .map(|fd| unsafe { File::from_raw_fd(fd) }) + .map(|stderr| (stdin, stdout, stderr)) + ) +} + +pub(crate) fn redirect_streams(inp: File, out: File, err: File) { + redir(inp.as_raw_fd(), sys::STDIN_FILENO); + redir(out.as_raw_fd(), sys::STDOUT_FILENO); + redir(err.as_raw_fd(), sys::STDERR_FILENO); +} -- GitLab