diff options
author | Matthias Schiffer <mschiffer@universe-factory.net> | 2021-02-05 20:15:11 +0100 |
---|---|---|
committer | Matthias Schiffer <mschiffer@universe-factory.net> | 2021-02-05 20:15:11 +0100 |
commit | e78e29eae4a0cf2e3f46c6a117e1fe86219efe96 (patch) | |
tree | 29b4b4c2bbd47329d2c11b5ca7fbc7629401d662 /src | |
parent | da9fa7d1ad81528c60607f488f84155a3ecc3ee6 (diff) | |
download | rebel-e78e29eae4a0cf2e3f46c6a117e1fe86219efe96.tar rebel-e78e29eae4a0cf2e3f46c6a117e1fe86219efe96.zip |
IPC setup
Diffstat (limited to 'src')
-rw-r--r-- | src/executor.rs | 7 | ||||
-rw-r--r-- | src/main.rs | 42 | ||||
-rw-r--r-- | src/prepared_command.rs | 100 | ||||
-rw-r--r-- | src/runner.rs | 2 | ||||
-rw-r--r-- | src/runner/runc.rs | 88 | ||||
-rw-r--r-- | src/runner/runc/init.rs | 47 | ||||
-rw-r--r-- | src/runner/runc/run.rs | 18 | ||||
-rw-r--r-- | src/types.rs | 4 | ||||
-rw-r--r-- | src/unshare.rs | 31 | ||||
-rw-r--r-- | src/util.rs | 20 |
10 files changed, 185 insertions, 174 deletions
diff --git a/src/executor.rs b/src/executor.rs index 814aa79..1340f83 100644 --- a/src/executor.rs +++ b/src/executor.rs @@ -50,7 +50,7 @@ impl<'a> Executor<'a> { fn run_one(&mut self, runner: &impl runner::Runner) -> runner::Result<()> { let task = self.tasks_runnable.pop().expect("No runnable tasks left"); - runner.run(&task)?; + runner.run(self.tasks, &task)?; let rdeps = self.rdeps.get(&task); @@ -69,10 +69,9 @@ impl<'a> Executor<'a> { Ok(()) } - pub fn run(&mut self) -> runner::Result<()> { - let runner = runner::runc::RuncRunner::new(self.tasks); + pub fn run(&mut self, runner: &impl runner::Runner) -> runner::Result<()> { while !self.tasks_runnable.is_empty() { - self.run_one(&runner)?; + self.run_one(runner)?; } assert!(self.tasks_blocked.is_empty(), "No runnable tasks left"); diff --git a/src/main.rs b/src/main.rs index 8d4787d..d3ac7c8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,4 @@ mod executor; -mod prepared_command; mod recipe; mod resolve; mod runner; @@ -7,43 +6,13 @@ mod types; mod unshare; mod util; -use nix::{ - mount::{self, MsFlags}, - unistd, -}; -use std::{io::Result, path::Path}; +use std::path::Path; use types::*; -use util::ToIOResult; - -fn mount_buildtmp() -> Result<()> { - mount::mount::<_, _, _, str>( - Some("buildtmp"), - "build/tmp", - Some("tmpfs"), - MsFlags::empty(), - None, - ) - .to_io_result() -} - -fn exec_shell() -> Result<std::convert::Infallible> { - let bin_sh = std::ffi::CString::new("/bin/sh").unwrap(); - unistd::execv(&bin_sh, &[&bin_sh]).to_io_result() -} - -fn execute(mut exc: executor::Executor) -> Result<()> { - unshare::unshare()?; - mount_buildtmp()?; - - exc.run()?; - - exec_shell()?; - - Ok(()) -} fn main() { + let runner = unsafe { runner::runc::RuncRunner::new() }.unwrap(); + let recipes = recipe::read_recipes(Path::new("examples")).unwrap(); let mut tasks: TaskMap = TaskMap::default(); @@ -64,9 +33,8 @@ fn main() { std::process::exit(1); } let taskset = rsv.to_taskset(); - let exc = executor::Executor::new(&tasks, taskset); - - if let Err(error) = execute(exc) { + let mut exc = executor::Executor::new(&tasks, taskset); + if let Err(error) = exc.run(&runner) { eprintln!("{}", error); std::process::exit(1); } diff --git a/src/prepared_command.rs b/src/prepared_command.rs deleted file mode 100644 index 8297095..0000000 --- a/src/prepared_command.rs +++ /dev/null @@ -1,100 +0,0 @@ -use std::{ - ffi::{CString, OsStr}, - io::{Error, ErrorKind, Result}, - iter, - os::unix::{ffi::*, io::RawFd}, - ptr, -}; - -use libc::{c_char, c_void}; -use nix::{fcntl::OFlag, sys::wait, unistd}; - -use crate::util::ToIOResult; - -#[derive(Clone, Debug)] -pub struct PreparedCommandBuilder { - program: CString, - args: Vec<CString>, -} - -#[derive(Debug)] -pub struct PreparedCommand { - child: unistd::Pid, - pipew: RawFd, -} - -impl Drop for PreparedCommand { - fn drop(&mut self) { - assert!(unistd::close(self.pipew).is_ok()); - } -} - -fn os2c<S: AsRef<OsStr>>(s: S) -> CString { - CString::new(s.as_ref().as_bytes()).unwrap() -} - -impl PreparedCommandBuilder { - pub fn arg<S: AsRef<OsStr>>(&mut self, arg: S) -> &mut Self { - self.args.push(os2c(arg)); - self - } - - pub fn prepare(&mut self) -> Result<PreparedCommand> { - let exe_p = self.program.as_ptr(); - - let argv: Vec<*const c_char> = iter::once(exe_p) - .chain(self.args.iter().map(|arg| arg.as_ptr())) - .chain(iter::once(ptr::null())) - .collect(); - - let argv_p = argv.as_ptr(); - - let (piper, pipew) = unistd::pipe2(OFlag::O_CLOEXEC).to_io_result()?; - - unsafe { - match unistd::fork().to_io_result()? { - unistd::ForkResult::Parent { child } => { - let cmd = PreparedCommand { child, pipew }; - assert!(unistd::close(piper).is_ok()); - return Ok(cmd); - } - unistd::ForkResult::Child => {} - } - - // Child process - only async-signal-safe calls allowed - - if libc::close(pipew) != 0 { - libc::_exit(126); - } - - // Wait for run trigger - let mut buf = [0u8; 1]; - if libc::read(piper, buf.as_mut_ptr() as *mut c_void, buf.len()) != 1 { - // PreparedCommand was dropped, or controlling process exited - libc::_exit(126); - } - - libc::execvp(exe_p, argv_p); - - // exec failed - libc::_exit(127); - } - } -} - -impl PreparedCommand { - pub fn new<S: AsRef<OsStr>>(program: S) -> PreparedCommandBuilder { - PreparedCommandBuilder { - program: os2c(program), - args: Vec::new(), - } - } - - pub fn run(self) -> Result<wait::WaitStatus> { - if unistd::write(self.pipew, &[0]).to_io_result()? != 1 { - return Err(Error::new(ErrorKind::Other, "command trigger write failed")); - } - - wait::waitpid(Some(self.child), None).to_io_result() - } -} diff --git a/src/runner.rs b/src/runner.rs index a290aeb..810586c 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -7,5 +7,5 @@ use crate::types::*; pub type Result<T> = io::Result<T>; pub trait Runner { - fn run(&self, task: &TaskRef) -> Result<()>; + fn run(&self, tasks: &TaskMap, task: &TaskRef) -> Result<()>; } diff --git a/src/runner/runc.rs b/src/runner/runc.rs index b323d5d..663dc7a 100644 --- a/src/runner/runc.rs +++ b/src/runner/runc.rs @@ -1,21 +1,87 @@ -use super::*; -use crate::types::TaskMap; +mod init; +mod run; -pub struct RuncRunner<'a> { - tasks: &'a TaskMap, +use std::{io, process}; + +use ipc_channel::ipc; +use nix::unistd; +use serde::{Deserialize, Serialize}; + +use crate::types::*; +use crate::unshare; +use crate::util::CheckDisconnect; + +#[derive(Debug, Deserialize, Serialize)] +struct Request(TaskRef, Task, ipc::IpcSender<run::Error>); + +fn runner(init_error_sender: ipc::IpcSender<init::Error>, channel: ipc::IpcReceiver<Request>) -> ! { + if let Err(error) = init::runc_initialize() { + init_error_sender.send(error).expect("IPC send() failed"); + process::exit(1); + } + + drop(init_error_sender); + + while let Ok(request) = channel.recv() { + let Request(task, task_def, reply_sender) = request; + if let Err(error) = run::handle_task(task, task_def) { + reply_sender.send(error).expect("IPC send() failed"); + } + } + + process::exit(0); +} + +pub struct RuncRunner { + channel: ipc::IpcSender<Request>, } -impl<'a> RuncRunner<'a> { - pub fn new(tasks: &'a TaskMap) -> Self { - RuncRunner { tasks } +impl RuncRunner { + /// Creates a new Runc runner + /// + /// Unsafe: Do not call in multithreaded processes + pub unsafe fn new() -> io::Result<Self> { + let (tx, rx) = ipc::channel().expect("IPC channel creation failed"); + let (init_error_tx, init_error_rx) = ipc::channel().expect("IPC channel creation failed"); + + let pid = match unistd::fork().expect("fork()") { + unistd::ForkResult::Parent { child } => { + drop(rx); + drop(init_error_tx); + child + } + unistd::ForkResult::Child => { + drop(tx); + drop(init_error_rx); + runner(init_error_tx, rx); + /* Not reached */ + } + }; + + init_error_rx + .recv() + .check_disconnect() + .expect("IPC recv() error")?; + + unshare::idmap(pid)?; + + Ok(RuncRunner { channel: tx }) } } -impl<'a> Runner for RuncRunner<'a> { - fn run(&self, task: &TaskRef) -> Result<()> { - let task_def = self.tasks.get(task).expect("Invalid TaskRef"); +impl super::Runner for RuncRunner { + fn run(&self, tasks: &TaskMap, task: &TaskRef) -> super::Result<()> { + let task_def = tasks.get(task).expect("Invalid TaskRef"); + let (reply_tx, reply_rx) = ipc::channel().expect("IPC channel creation failed"); + + self.channel + .send(Request(task.clone(), task_def.clone(), reply_tx)) + .expect("RuncRunner task submission failed"); - println!("{}:\n\t{:?}", task, task_def); + reply_rx + .recv() + .check_disconnect() + .expect("IPC recv() error")?; Ok(()) } diff --git a/src/runner/runc/init.rs b/src/runner/runc/init.rs new file mode 100644 index 0000000..02b89e4 --- /dev/null +++ b/src/runner/runc/init.rs @@ -0,0 +1,47 @@ +use std::io; + +use nix::{ + mount::{self, MsFlags}, + sched::{self, CloneFlags}, +}; +use serde::{Deserialize, Serialize}; + +fn mount_buildtmp() -> nix::Result<()> { + mount::mount::<_, _, _, str>( + Some("buildtmp"), + "build/tmp", + Some("tmpfs"), + MsFlags::empty(), + None, + ) +} + +#[derive(Debug, Deserialize, Serialize)] +pub enum Error { + Code(i32), + String(String), +} + +impl From<nix::Error> for Error { + fn from(error: nix::Error) -> Self { + match error { + nix::Error::Sys(code) => Error::Code(code as i32), + _ => Error::String(error.to_string()), + } + } +} + +impl From<Error> for io::Error { + fn from(error: Error) -> Self { + match error { + Error::Code(code) => io::Error::from_raw_os_error(code), + Error::String(string) => io::Error::new(io::ErrorKind::Other, string), + } + } +} + +pub fn runc_initialize() -> Result<(), Error> { + sched::unshare(CloneFlags::CLONE_NEWUSER | CloneFlags::CLONE_NEWNS)?; + mount_buildtmp()?; + Ok(()) +} diff --git a/src/runner/runc/run.rs b/src/runner/runc/run.rs new file mode 100644 index 0000000..2d13330 --- /dev/null +++ b/src/runner/runc/run.rs @@ -0,0 +1,18 @@ +use std::io; + +use serde::{Deserialize, Serialize}; + +use crate::types::*; +#[derive(Debug, Deserialize, Serialize)] +pub struct Error; + +impl From<Error> for io::Error { + fn from(_: Error) -> Self { + io::Error::new(io::ErrorKind::Other, "Failed to run task") + } +} + +pub fn handle_task(task: TaskRef, task_def: Task) -> Result<(), Error> { + println!("{}:\n\t{:?}", task, task_def); + Ok(()) +} diff --git a/src/types.rs b/src/types.rs index 1773504..331b49a 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1,9 +1,9 @@ -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use std::collections::HashMap; pub type TaskRef = String; -#[derive(Clone, Debug, Deserialize)] +#[derive(Clone, Debug, Deserialize, Serialize)] pub struct Task { #[serde(default)] pub depends: Vec<TaskRef>, diff --git a/src/unshare.rs b/src/unshare.rs index a8ec1c1..da16d24 100644 --- a/src/unshare.rs +++ b/src/unshare.rs @@ -4,15 +4,13 @@ use std::{ io::{self, BufRead, Result}, os::unix::ffi::*, path::Path, + process, }; -use nix::{ - sched::{self, CloneFlags}, - unistd, -}; +use nix::unistd; -use crate::prepared_command::PreparedCommand; -use crate::util::{Checkable, ToIOResult}; +// use crate::prepared_command::PreparedCommand; +use crate::util::Checkable; #[derive(Debug, Eq, Ord, PartialEq, PartialOrd)] struct SubIDRange { @@ -98,27 +96,22 @@ fn get_gid_map() -> Result<Vec<SubIDMap>> { Ok(generate_idmap(gid, gid_ranges)) } -fn prepare_idmap_cmd(cmd: &str, pid: &str, map: &Vec<SubIDMap>) -> Result<PreparedCommand> { - let mut builder = PreparedCommand::new(cmd); - builder.arg(&pid); +fn run_idmap_cmd(cmd: &str, pid: &str, map: &Vec<SubIDMap>) -> Result<()> { + let mut builder = process::Command::new(cmd); + builder.arg(pid); for uids in map { builder.arg(uids.lower.to_string()); builder.arg(uids.upper.to_string()); builder.arg(uids.count.to_string()); } - builder.prepare() + builder.status().and_then(|status| status.check()) } -pub fn unshare() -> Result<()> { - let pid = unistd::getpid().to_string(); - - let newuidmap = prepare_idmap_cmd("newuidmap", pid.as_str(), &get_uid_map()?)?; - let newgidmap = prepare_idmap_cmd("newgidmap", pid.as_str(), &get_gid_map()?)?; - - sched::unshare(CloneFlags::CLONE_NEWUSER | CloneFlags::CLONE_NEWNS).to_io_result()?; +pub fn idmap(pid: unistd::Pid) -> Result<()> { + let pid_string = pid.to_string(); - newuidmap.run()?.check()?; - newgidmap.run()?.check()?; + run_idmap_cmd("newuidmap", pid_string.as_str(), &get_uid_map()?)?; + run_idmap_cmd("newgidmap", pid_string.as_str(), &get_gid_map()?)?; Ok(()) } diff --git a/src/util.rs b/src/util.rs index 4e996e8..0418c51 100644 --- a/src/util.rs +++ b/src/util.rs @@ -1,8 +1,10 @@ use std::{ io::{Error, ErrorKind, Result}, process::ExitStatus, + result, }; +use ipc_channel::ipc; use nix::sys::wait; pub trait ToIOResult<T> { @@ -46,3 +48,21 @@ impl Checkable for wait::WaitStatus { } } } + +pub trait CheckDisconnect { + type Output; + + fn check_disconnect(self) -> Result<Self::Output>; +} + +impl<T> CheckDisconnect for result::Result<T, ipc::IpcError> { + type Output = result::Result<(), T>; + + fn check_disconnect(self) -> Result<Self::Output> { + match self { + Ok(v) => Ok(Err(v)), + Err(ipc::IpcError::Disconnected) => Ok(Ok(())), + Err(error) => Err(Error::new(ErrorKind::Other, format!("{:?}", error))), + } + } +} |