From d7b7ffe63535b70568741c8bb895fe4ea2278535 Mon Sep 17 00:00:00 2001 From: Michael Murphy <mmstickman@gmail.com> Date: Tue, 2 Jan 2018 15:45:45 -0500 Subject: [PATCH] Pipeline Fixes & Tests (#665) Found some issues with pipeline execution logic, foreground job control, and background job control, and so I've fixed them and added a new integration test. The following use cases are now working: ``` echo foo | grep foo && echo found foo echo foo | grep bar || echo did not find bar echo test | grep test && echo found test | cat ``` --- examples/pipelines.ion | 6 + examples/pipelines.out | 8 ++ src/lib/shell/mod.rs | 4 +- src/lib/shell/pipe_exec/mod.rs | 210 ++++++++++++++++---------------- src/lib/sys/unix/job_control.rs | 70 +++++------ src/lib/sys/unix/mod.rs | 25 ++-- 6 files changed, 174 insertions(+), 149 deletions(-) create mode 100644 examples/pipelines.ion create mode 100644 examples/pipelines.out diff --git a/examples/pipelines.ion b/examples/pipelines.ion new file mode 100644 index 00000000..0495fb91 --- /dev/null +++ b/examples/pipelines.ion @@ -0,0 +1,6 @@ +echo test | cat | cat +echo one | cat <<< two +echo foo | grep foo && echo found foo +echo foo | grep bar || echo did not find bar +echo test | grep test && echo found test | cat + diff --git a/examples/pipelines.out b/examples/pipelines.out new file mode 100644 index 00000000..a8202cc4 --- /dev/null +++ b/examples/pipelines.out @@ -0,0 +1,8 @@ +test +one +two +foo +found foo +did not find bar +test +found test diff --git a/src/lib/shell/mod.rs b/src/lib/shell/mod.rs index 3a7522e8..c483cf76 100644 --- a/src/lib/shell/mod.rs +++ b/src/lib/shell/mod.rs @@ -38,7 +38,7 @@ use parser::Terminator; use parser::pipelines::Pipeline; use smallvec::SmallVec; use std::fs::File; -use std::io::{self, Read}; +use std::io::{self, Read, Write}; use std::iter::FromIterator; use std::ops::Deref; use std::path::Path; @@ -124,6 +124,8 @@ impl ShellBuilder { let _ = sys::signal(sys::SIGTERM, handler); extern "C" fn sigpipe_handler(signal: i32) { + let _ = io::stdout().flush(); + let _ = io::stderr().flush(); sys::fork_exit(127 + signal); } diff --git a/src/lib/shell/pipe_exec/mod.rs b/src/lib/shell/pipe_exec/mod.rs index 7300fd69..26529d11 100644 --- a/src/lib/shell/pipe_exec/mod.rs +++ b/src/lib/shell/pipe_exec/mod.rs @@ -11,7 +11,6 @@ mod fork; pub mod job_control; mod streams; -// TODO: Reintegrate this use self::command_not_found::command_not_found; use self::fork::fork_pipe; use self::job_control::{JobControl, ProcessState}; @@ -337,9 +336,8 @@ pub(crate) trait PipelineExecution { /// that a job was signaled to stop or killed. /// /// If a job is stopped, the shell will add that job to a list of background jobs and - /// continue - /// to watch the job in the background, printing notifications on status changes of that job - /// over time. + /// continue to watch the job in the background, printing notifications on status changes + /// of that job over time. fn execute_pipeline(&mut self, pipeline: &mut Pipeline) -> i32; /// Generates a vector of commands from a given `Pipeline`. @@ -430,10 +428,9 @@ impl PipelineExecution for Shell { return SUCCESS; } - let piped_commands = if let Some(c) = do_redirection(piped_commands) { - c - } else { - return COULD_NOT_EXEC; + let piped_commands = match do_redirection(piped_commands) { + Some(c) => c, + None => return COULD_NOT_EXEC }; // If the given pipeline is a background task, fork the shell. @@ -442,11 +439,7 @@ impl PipelineExecution for Shell { self, piped_commands, command_name, - if disown { - ProcessState::Empty - } else { - ProcessState::Running - }, + if disown { ProcessState::Empty } else { ProcessState::Running } ) } else { // While active, the SIGTTOU signal will be ignored. @@ -503,7 +496,6 @@ impl PipelineExecution for Shell { } fn exec_job(&mut self, job: &mut RefinedJob, _foreground: bool) -> i32 { - let short = job.short(); let long = job.long(); match *job { RefinedJob::External { @@ -513,8 +505,17 @@ impl PipelineExecution for Shell { ref stdout, ref stderr, } => { - let args: Vec<&str> = args.iter().skip(1).map(|x| x as &str).collect(); - return self.exec_external(&name, &args, stdin, stdout, stderr); + if let Ok((stdin_bk, stdout_bk, stderr_bk)) = duplicate_streams() { + let args: Vec<&str> = args.iter().skip(1).map(|x| x as &str).collect(); + let code = self.exec_external(&name, &args, stdin, stdout, stderr); + redirect_streams(stdin_bk, stdout_bk, stderr_bk); + return code; + } + eprintln!( + "ion: failed to `dup` STDOUT, STDIN, or STDERR: not running '{}'", + long + ); + COULD_NOT_EXEC } RefinedJob::Builtin { main, @@ -710,30 +711,55 @@ impl PipelineExecution for Shell { stderr: &Option<File>, ) -> i32 { let result = sys::fork_and_exec( - name, - &args, - if let Some(ref f) = *stdin { Some(f.as_raw_fd()) } else { None }, - if let Some(ref f) = *stdout { Some(f.as_raw_fd()) } else { None }, - if let Some(ref f) = *stderr { Some(f.as_raw_fd()) } else { None }, - false, - || prepare_child(false) - ); - - match result { - Ok(pid) => { - self.watch_foreground(pid as i32, "") - } - Err(ref err) if err.kind() == io::ErrorKind::NotFound => { - if !command_not_found(self, &name) { - eprintln!("ion: command not found: {}", name); - } - NO_SUCH_COMMAND - } - Err(ref err) => { - eprintln!("ion: command exec error: {}", err); - FAILURE + name, + &args, + if let Some(ref f) = *stdin { Some(f.as_raw_fd()) } else { None }, + if let Some(ref f) = *stdout { Some(f.as_raw_fd()) } else { None }, + if let Some(ref f) = *stderr { Some(f.as_raw_fd()) } else { None }, + false, + || prepare_child(true, 0) + ); + + match result { + Ok(pid) => { + let _ = sys::setpgid(pid, pid); + let _ = sys::tcsetpgrp(0, pid); + let _ = sys::wait_for_interrupt(pid); + let _ = sys::kill(pid, sys::SIGCONT); + self.watch_foreground(-(pid as i32), "") + } + Err(ref err) if err.kind() == io::ErrorKind::NotFound => { + if !command_not_found(self, &name) { + eprintln!("ion: command not found: {}", name); } + NO_SUCH_COMMAND } + Err(ref err) => { + eprintln!("ion: command exec error: {}", err); + FAILURE + } + } + } +} + +/// When the `&&` or `||` operator is utilized, commands should be executed +/// based on the previously-recorded exit status. This function will return +/// **true** to indicate that the current job should be skipped. +fn should_skip( + previous: &mut JobKind, + previous_status: i32, + current: JobKind, +) -> bool { + match *previous { + JobKind::And if previous_status != SUCCESS => { + *previous = if JobKind::Or == current { current } else { *previous }; + true + } + JobKind::Or if previous_status == SUCCESS => { + *previous = if JobKind::And == current { current } else { *previous }; + true + } + _ => false } } @@ -752,36 +778,15 @@ pub(crate) fn pipe( loop { if let Some((mut parent, mut kind)) = commands.next() { - // When an `&&` or `||` operator is utilized, execute commands based on the - // previous status. - match previous_kind { - JobKind::And => if previous_status != SUCCESS { - if let JobKind::Or = kind { - previous_kind = kind - } - continue; - }, - JobKind::Or => if previous_status == SUCCESS { - if let JobKind::And = kind { - previous_kind = kind - } - continue; - }, - _ => (), - } + if should_skip(&mut previous_kind, previous_status, kind) { continue } match kind { JobKind::Pipe(mut mode) => { // We need to remember the commands as they own the file // descriptors that are created by sys::pipe. let mut remember = Vec::new(); - - let mut pgid = 0; - let mut last_pid = 0; - let mut current_pid = 0; - - // When set to true, this command will be SIGSTOP'd before it executes. - let mut child_blocked; + let mut block_child = true; + let (mut pgid, mut last_pid, mut current_pid) = (0, 0, 0); // Append jobs until all piped jobs are running while let Some((mut child, ckind)) = commands.next() { @@ -794,6 +799,7 @@ pub(crate) fn pipe( false }; + // TODO: Refactor this part // If we need to tee both stdout and stderr, we directly connect pipes to // the relevant sources in both of them. if let RefinedJob::Tee { @@ -808,7 +814,8 @@ pub(crate) fn pipe( Some(unsafe { File::from_raw_fd(out_reader) }); parent.stdout(unsafe { File::from_raw_fd(out_writer) }); if is_external { - possible_external_stdio_pipes.get_or_insert(vec![]).push(unsafe { File::from_raw_fd(out_writer) }); + possible_external_stdio_pipes.get_or_insert(vec![]) + .push(unsafe { File::from_raw_fd(out_writer) }); } } } @@ -819,7 +826,8 @@ pub(crate) fn pipe( Some(unsafe { File::from_raw_fd(err_reader) }); parent.stderr(unsafe { File::from_raw_fd(err_writer) }); if is_external { - possible_external_stdio_pipes.get_or_insert(vec![]).push(unsafe { File::from_raw_fd(err_writer) }); + possible_external_stdio_pipes.get_or_insert(vec![]) + .push(unsafe { File::from_raw_fd(err_writer) }); } } } @@ -830,7 +838,8 @@ pub(crate) fn pipe( } Ok((reader, writer)) => { if is_external { - possible_external_stdio_pipes.get_or_insert(vec![]).push(unsafe { File::from_raw_fd(writer) }); + possible_external_stdio_pipes.get_or_insert(vec![]) + .push(unsafe { File::from_raw_fd(writer) }); } child.stdin(unsafe { File::from_raw_fd(reader) }); match mode { @@ -865,57 +874,39 @@ pub(crate) fn pipe( } } - child_blocked = match ckind { - JobKind::Pipe(_) | JobKind::Last => true, - _ => false - }; - - match spawn_proc(shell, parent, kind, child_blocked, &mut last_pid, &mut current_pid) { + match spawn_proc(shell, parent, kind, block_child, &mut last_pid, &mut current_pid, pgid) { SUCCESS => (), error_code => return error_code } - // remember.push(parent); possible_external_stdio_pipes = None; if set_process_group(&mut pgid, current_pid) && foreground && !shell.is_library { let _ = sys::tcsetpgrp(0, pgid); } - resume_prior_process(&mut last_pid, current_pid, child_blocked); + resume_prior_process(&mut last_pid, current_pid); if let JobKind::Pipe(m) = ckind { parent = child; mode = m; } else { - // We set the kind to the last child kind that was - // processed. For example, the pipeline - // `foo | bar | baz && zardoz` should have the - // previous kind set to `And` after processing the - // initial pipeline kind = ckind; - - child_blocked = match commands.peek() { - Some(&(_, JobKind::Pipe(_))) => true, - Some(&(_, JobKind::Last)) => true, - _ => false - }; - - match spawn_proc(shell, child, kind, child_blocked, &mut last_pid, &mut current_pid) { + block_child = false; + match spawn_proc(shell, child, kind, block_child, &mut last_pid, &mut current_pid, pgid) { SUCCESS => (), error_code => return error_code } - set_process_group(&mut pgid, current_pid) && foreground && !shell.is_library; - - // remember.push(child); - resume_prior_process(&mut last_pid, current_pid, child_blocked); - + resume_prior_process(&mut last_pid, current_pid); break; } } + previous_kind = kind; previous_status = shell.wait(pgid, remember); + let _ = io::stdout().flush(); + let _ = io::stderr().flush(); if previous_status == TERMINATED { if let Err(why) = sys::killpg(pgid, sys::SIGTERM) { eprintln!("ion: failed to terminate foreground jobs: {}", why); @@ -926,6 +917,8 @@ pub(crate) fn pipe( _ => { previous_status = shell.exec_job(&mut parent, foreground); previous_kind = kind; + let _ = io::stdout().flush(); + let _ = io::stderr().flush(); } } } else { @@ -939,9 +932,10 @@ fn spawn_proc( shell: &mut Shell, mut cmd: RefinedJob, kind: JobKind, - child_blocked: bool, + block_child: bool, last_pid: &mut u32, - current_pid: &mut u32 + current_pid: &mut u32, + pgid: u32 ) -> i32 { let short = cmd.short(); match cmd { @@ -954,7 +948,7 @@ fn spawn_proc( if let Some(ref f) = *stdout { Some(f.as_raw_fd()) } else { None }, if let Some(ref f) = *stderr { Some(f.as_raw_fd()) } else { None }, false, - || prepare_child(child_blocked) + || prepare_child(block_child, pgid) ); match result { @@ -976,7 +970,7 @@ fn spawn_proc( let args: Vec<&str> = args.iter().map(|x| x as &str).collect(); match unsafe { sys::fork() } { Ok(0) => { - prepare_child(child_blocked); + prepare_child(block_child, pgid); let ret = shell.exec_builtin(main, &args, stdout, stderr, stdin); close(stdout); close(stderr); @@ -984,6 +978,7 @@ fn spawn_proc( exit(ret) }, Ok(pid) => { + close(stdin); close(stdout); close(stderr); *last_pid = *current_pid; @@ -998,7 +993,7 @@ fn spawn_proc( let args: Vec<&str> = args.iter().map(|x| x as &str).collect(); match unsafe { sys::fork() } { Ok(0) => { - prepare_child(child_blocked); + prepare_child(block_child, pgid); let ret = shell.exec_function(name, &args, stdout, stderr, stdin); close(stdout); close(stderr); @@ -1006,6 +1001,7 @@ fn spawn_proc( exit(ret) }, Ok(pid) => { + close(stdin); close(stdout); close(stderr); *last_pid = *current_pid; @@ -1019,7 +1015,7 @@ fn spawn_proc( RefinedJob::Cat { ref mut sources, ref stdout, ref mut stdin } => { match unsafe { sys::fork() } { Ok(0) => { - prepare_child(child_blocked); + prepare_child(block_child, pgid); let ret = shell.exec_multi_in(sources, stdout, stdin); close(stdout); @@ -1027,6 +1023,7 @@ fn spawn_proc( exit(ret); } Ok(pid) => { + close(stdin); close(stdout); *last_pid = *current_pid; *current_pid = pid; @@ -1037,7 +1034,7 @@ fn spawn_proc( RefinedJob::Tee { ref mut items, ref stdout, ref stderr, ref stdin } => { match unsafe { sys::fork() } { Ok(0) => { - prepare_child(child_blocked); + prepare_child(block_child, pgid); let ret = shell.exec_multi_out(items, stdout, stderr, stdin, kind); close(stdout); @@ -1046,6 +1043,7 @@ fn spawn_proc( exit(ret); }, Ok(pid) => { + close(stdin); close(stdout); close(stderr); *last_pid = *current_pid; @@ -1067,27 +1065,25 @@ fn close(file: &Option<File>) { } } -fn prepare_child(child_blocked: bool) { +fn prepare_child(block_child: bool, pgid: u32) { signals::unblock(); let _ = sys::reset_signal(sys::SIGINT); let _ = sys::reset_signal(sys::SIGHUP); let _ = sys::reset_signal(sys::SIGTERM); - if child_blocked { + if block_child { let _ = sys::kill(process::id(), sys::SIGSTOP); } else { + let _ = sys::setpgid(process::id(), pgid); } } -fn resume_prior_process(last_pid: &mut u32, current_pid: u32, child_blocked: bool) { - if child_blocked { +fn resume_prior_process(last_pid: &mut u32, current_pid: u32) { + if *last_pid != 0 { // Ensure that the process is stopped before continuing. - if let Err(why) = sys::wait_for_interrupt(current_pid) { + if let Err(why) = sys::wait_for_interrupt(*last_pid) { eprintln!("ion: error waiting for sigstop: {}", why); } - } - - if *last_pid != 0 { let _ = sys::kill(*last_pid, sys::SIGCONT); } @@ -1099,4 +1095,4 @@ fn set_process_group(pgid: &mut u32, pid: u32) -> bool { if pgid_set { *pgid = pid; } let _ = sys::setpgid(pid, *pgid); pgid_set -} \ No newline at end of file +} diff --git a/src/lib/sys/unix/job_control.rs b/src/lib/sys/unix/job_control.rs index fdfc879e..2d3696d0 100644 --- a/src/lib/sys/unix/job_control.rs +++ b/src/lib/sys/unix/job_control.rs @@ -8,52 +8,51 @@ use std::thread::sleep; use std::time::Duration; use super::{errno, write_errno}; +const OPTS: i32 = WUNTRACED | WCONTINUED | WNOHANG; + pub(crate) fn watch_background( fg: Arc<ForegroundSignals>, processes: Arc<Mutex<Vec<BackgroundProcess>>>, - pid: u32, + pgid: u32, njob: usize, ) { let mut fg_was_grabbed = false; - loop { - if !fg_was_grabbed { - if fg.was_grabbed(pid) { - fg_was_grabbed = true; - } - } + let mut status; + let mut exit_status = 0; - let opts = WUNTRACED | WCONTINUED | WNOHANG; - let mut status = 0; + loop { + fg_was_grabbed = !fg_was_grabbed && fg.was_grabbed(pgid); unsafe { - let pid = waitpid(-(pid as pid_t), &mut status, opts); - match pid { - -1 => { - eprintln!("ion: ([{}] {}) errored: {}", njob, pid, errno()); + status = 0; + match waitpid(-(pgid as pid_t), &mut status, OPTS) { + -1 if errno() == ECHILD => { + if !fg_was_grabbed { + eprintln!("ion: ([{}] {}) exited with {}", njob, pgid, status); + } let mut processes = processes.lock().unwrap(); let process = &mut processes.iter_mut().nth(njob).unwrap(); process.state = ProcessState::Empty; if fg_was_grabbed { - fg.errored(); + fg.reply_with(exit_status as i8); } break; } - 0 => (), - _ if WIFEXITED(status) => { - if !fg_was_grabbed { - eprintln!("ion: ([{}] {}) exited with {}", njob, pid, status); - } + -1 => { + eprintln!("ion: ([{}] {}) errored: {}", njob, pgid, errno()); let mut processes = processes.lock().unwrap(); let process = &mut processes.iter_mut().nth(njob).unwrap(); process.state = ProcessState::Empty; if fg_was_grabbed { - fg.reply_with(WEXITSTATUS(status) as i8); + fg.errored(); } break; } - _ if WIFSTOPPED(status) => { + 0 => (), + _pid if WIFEXITED(status) => exit_status = WEXITSTATUS(status), + _pid if WIFSTOPPED(status) => { if !fg_was_grabbed { - eprintln!("ion: ([{}] {}) Stopped", njob, pid); + eprintln!("ion: ([{}] {}) Stopped", njob, pgid); } let mut processes = processes.lock().unwrap(); let process = &mut processes.iter_mut().nth(njob).unwrap(); @@ -63,9 +62,9 @@ pub(crate) fn watch_background( } process.state = ProcessState::Stopped; } - _ if WIFCONTINUED(status) => { + _pid if WIFCONTINUED(status) => { if !fg_was_grabbed { - eprintln!("ion: ([{}] {}) Running", njob, pid); + eprintln!("ion: ([{}] {}) Running", njob, pgid); } let mut processes = processes.lock().unwrap(); let process = &mut processes.iter_mut().nth(njob).unwrap(); @@ -98,13 +97,17 @@ pub(crate) fn watch_foreground(shell: &mut Shell, pid: i32, command: &str) -> i3 } } 0 => (), - _pid if WIFEXITED(status) => { - exit_status = WEXITSTATUS(status) as i32; - } - _pid if WIFSIGNALED(status) => { + _pid if WIFEXITED(status) => exit_status = WEXITSTATUS(status), + pid if WIFSIGNALED(status) => { let signal = WTERMSIG(status); - if signal == SIGPIPE { continue } - eprintln!("ion: process ended by signal {}", signal); + if signal == SIGPIPE { + continue + } else if WCOREDUMP(status) { + eprintln!("ion: process ({}) had a core dump", pid); + continue + } + + eprintln!("ion: process ({}) ended by signal {}", pid, signal); match signal { SIGINT => { let _ = kill(pid, signal as i32); @@ -116,11 +119,10 @@ pub(crate) fn watch_foreground(shell: &mut Shell, pid: i32, command: &str) -> i3 } signaled = 128 + signal as i32; } - _pid if WIFSTOPPED(status) => { - // TODO: Rework background control - shell.send_to_background(pid as u32, ProcessState::Stopped, command.into()); + pid if WIFSTOPPED(status) => { + shell.send_to_background(pid.abs() as u32, ProcessState::Stopped, command.into()); shell.break_flow = true; - break 128 + signal as i32; + break 128 + WSTOPSIG(status); } _ => (), } diff --git a/src/lib/sys/unix/mod.rs b/src/lib/sys/unix/mod.rs index f10fa882..40623c04 100644 --- a/src/lib/sys/unix/mod.rs +++ b/src/lib/sys/unix/mod.rs @@ -25,8 +25,20 @@ pub(crate) const STDOUT_FILENO: i32 = libc::STDOUT_FILENO; pub(crate) const STDERR_FILENO: i32 = libc::STDERR_FILENO; pub(crate) const STDIN_FILENO: i32 = libc::STDIN_FILENO; +// Why each platform wants to be unique in this regard is anyone's guess. + +#[cfg(target_os = "linux")] fn errno() -> i32 { unsafe { *libc::__errno_location() } } +#[cfg(any(target_os = "openbsd", target_os = "bitrig", target_os = "android"))] +fn errno() -> i32 { unsafe { *libc::__errno() } } + +#[cfg(any(target_os = "macos", target_os = "ios", target_os = "freebsd"))] +fn errno() -> i32 { unsafe { *libc::__error() } } + +#[cfg(target_os = "dragonfly")] +fn errno() -> i32 { unsafe { *libc::__dfly_error() } } + fn write_errno(msg: &str, errno: i32) { let stderr = io::stderr(); let mut stderr = stderr.lock(); @@ -44,16 +56,15 @@ pub(crate) fn is_root() -> bool { unsafe { libc::geteuid() == 0 } } pub unsafe fn fork() -> io::Result<u32> { cvt(libc::fork()).map(|pid| pid as u32) } pub fn wait_for_interrupt(pid: u32) -> io::Result<()> { - let mut status = 0; - let mut result; + let mut status; loop { - result = unsafe { waitpid(pid as i32, &mut status, WUNTRACED) }; - if result == -1 { - if errno() == EINTR { continue } - break Err(io::Error::from_raw_os_error(errno())); + status = 0; + match unsafe { waitpid(pid as i32, &mut status, WUNTRACED) } { + -1 if errno() == EINTR => continue, + -1 => break Err(io::Error::from_raw_os_error(errno())), + _ => break Ok(()) } - break Ok(()); } } -- GitLab