oysh/crates/oyster_runtime/src/pipeline.rs

206 lines
6 KiB
Rust
Raw Normal View History

2022-09-15 20:16:37 +00:00
use std::{
borrow::Cow,
ffi::{OsStr, OsString},
fs::File,
io,
os::unix::{io::FromRawFd, process::CommandExt},
2022-09-15 20:16:37 +00:00
process::{Child, Command, Stdio},
slice::Iter,
};
use nix::{
errno::Errno,
2022-09-15 20:16:37 +00:00
fcntl::OFlag,
libc,
sys::{
signal::{self, SaFlags, SigAction, SigHandler},
signalfd::SigSet,
wait::{self, WaitPidFlag, WaitStatus},
},
2022-09-15 20:16:37 +00:00
unistd::{self, Pid},
};
use oyster_parser::ast::{self, Redirect};
use crate::{RuntimeError, Shell, Status};
/// A command that's ready to be run.
struct PreparedCommand<'a> {
cmd: Cow<'a, str>,
args: WordBuilder<'a>,
redirect: Redirect,
stdin: Option<File>,
stdout: Option<File>,
stderr: Option<File>,
}
impl<'a> PreparedCommand<'a> {
/// Run this command with the given context.
fn spawn(self, pgid: &mut Pid) -> Result<Child, RuntimeError> {
let args = self.args.map(|w| match w {
Cow::Borrowed(s) => Cow::Borrowed(OsStr::new(s)),
Cow::Owned(s) => Cow::Owned(OsString::from(s)),
});
let mut cmd = Command::new(self.cmd.as_ref());
cmd.args(args);
cmd.stdin(self.stdin.map_or(Stdio::inherit(), Stdio::from));
cmd.stdout(self.stdout.map_or(Stdio::inherit(), Stdio::from));
cmd.stderr(self.stderr.map_or(Stdio::inherit(), Stdio::from));
cmd.process_group(pgid.as_raw());
{
let pgid = *pgid;
unsafe {
cmd.pre_exec(move || {
if pgid == Pid::from_raw(0) {
let _ = unistd::tcsetpgrp(libc::STDIN_FILENO, unistd::getpid());
}
let default =
SigAction::new(SigHandler::SigDfl, SaFlags::empty(), SigSet::empty());
let _ = signal::sigaction(signal::Signal::SIGTSTP, &default);
let _ = signal::sigaction(signal::Signal::SIGTTOU, &default);
Ok(())
});
}
}
2022-09-15 20:16:37 +00:00
let child = match cmd.spawn() {
Ok(child) => child,
Err(err) => {
return Err(match err.kind() {
io::ErrorKind::NotFound => RuntimeError::CommandNotFound(self.cmd.into_owned()),
io::ErrorKind::PermissionDenied => {
RuntimeError::PermissionDenied(self.cmd.into_owned())
}
_ => RuntimeError::SpawnFailed(err),
})
}
};
let child_id = Pid::from_raw(child.id() as i32);
if *pgid == Pid::from_raw(0) {
*pgid = child_id;
};
// prevent race conditions
let _ = unistd::setpgid(child_id, *pgid);
Ok(child)
}
}
impl<'a> From<&'a ast::Command<'a>> for PreparedCommand<'a> {
fn from(command: &'a ast::Command<'a>) -> Self {
let mut words = WordBuilder(command.0.iter());
let cmd = words.next().expect("words need to have >1 parts");
let args = words;
let redirect = command.1;
PreparedCommand {
cmd,
args,
redirect,
stdin: None,
stdout: None,
stderr: None,
}
}
}
impl Shell {
pub(crate) fn run_pipeline(
&mut self,
pipeline: &ast::Pipeline,
) -> Result<Status, RuntimeError> {
let mut pgid = Pid::from_raw(0);
let status = (|| {
let mut cmds = pipeline.0.iter().map(PreparedCommand::from);
2022-09-15 20:16:37 +00:00
let mut last_cmd = cmds.next().expect("pipelines need to have >1 commands");
for mut cmd in cmds {
let (output, input) = create_pipe()?;
cmd.stdin = Some(output);
2022-09-15 20:16:37 +00:00
match last_cmd.redirect {
Redirect::None => (),
Redirect::Stdout => last_cmd.stdout = Some(input),
}
2022-09-15 20:16:37 +00:00
last_cmd.spawn(&mut pgid)?;
2022-09-15 20:16:37 +00:00
last_cmd = cmd;
}
2022-09-15 20:16:37 +00:00
last_cmd.spawn(&mut pgid)?;
2022-09-15 20:16:37 +00:00
wait_pgid(pgid)
})();
2022-09-15 20:16:37 +00:00
if status.is_err() && pgid != Pid::from_raw(0) {
let _ = signal::killpg(pgid, signal::Signal::SIGTERM);
let _ = signal::killpg(pgid, signal::Signal::SIGCONT);
}
let _ = unistd::tcsetpgrp(libc::STDIN_FILENO, unistd::getpgid(None).unwrap());
status
}
}
/// Wait for all processes of the specified job to terminate.
fn wait_pgid(pgid: Pid) -> Result<Status, RuntimeError> {
let pgid = Pid::from_raw(-pgid.as_raw());
let mut last_status = Status::SUCCESS;
loop {
match wait::waitpid(pgid, Some(WaitPidFlag::WUNTRACED)) {
Ok(WaitStatus::Exited(_, code)) => last_status = Status(code),
Ok(WaitStatus::Signaled(_, signal, _)) => {
last_status = Status(Status::SIG_BASE.0 + signal as i32)
}
Ok(WaitStatus::Stopped(_, _)) => todo!("put job into background job list"),
Ok(_) => (),
Err(err) => match err {
Errno::ECHILD => break Ok(last_status), // no more children
_ => break Err(RuntimeError::WaidPid(err)),
},
}
2022-09-15 20:16:37 +00:00
}
}
/// Create a new unix pipe.
fn create_pipe() -> Result<(File, File), RuntimeError> {
let (output, input) =
unistd::pipe2(OFlag::O_CLOEXEC).map_err(RuntimeError::PipeCreationFailed)?;
Ok(unsafe { (File::from_raw_fd(output), File::from_raw_fd(input)) })
}
/// Build words from WordParts.
struct WordBuilder<'a>(Iter<'a, ast::Word<'a>>);
impl<'a> Iterator for WordBuilder<'a> {
type Item = Cow<'a, str>;
fn next(&mut self) -> Option<Self::Item> {
self.0.next().map(|word| {
let mut words = word.0.iter();
let mut s = match words.next().expect("words need to have >1 parts") {
ast::WordPart::Text(text) => Cow::from(*text),
};
for part in words {
match part {
ast::WordPart::Text(text) => s.to_mut().push_str(text),
}
}
s
})
}
}