summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorMatthias Schiffer <mschiffer@universe-factory.net>2021-02-05 20:15:11 +0100
committerMatthias Schiffer <mschiffer@universe-factory.net>2021-02-05 20:15:11 +0100
commite78e29eae4a0cf2e3f46c6a117e1fe86219efe96 (patch)
tree29b4b4c2bbd47329d2c11b5ca7fbc7629401d662 /src
parentda9fa7d1ad81528c60607f488f84155a3ecc3ee6 (diff)
downloadrebel-e78e29eae4a0cf2e3f46c6a117e1fe86219efe96.tar
rebel-e78e29eae4a0cf2e3f46c6a117e1fe86219efe96.zip
IPC setup
Diffstat (limited to 'src')
-rw-r--r--src/executor.rs7
-rw-r--r--src/main.rs42
-rw-r--r--src/prepared_command.rs100
-rw-r--r--src/runner.rs2
-rw-r--r--src/runner/runc.rs88
-rw-r--r--src/runner/runc/init.rs47
-rw-r--r--src/runner/runc/run.rs18
-rw-r--r--src/types.rs4
-rw-r--r--src/unshare.rs31
-rw-r--r--src/util.rs20
10 files changed, 185 insertions, 174 deletions
diff --git a/src/executor.rs b/src/executor.rs
index 814aa79..1340f83 100644
--- a/src/executor.rs
+++ b/src/executor.rs
@@ -50,7 +50,7 @@ impl<'a> Executor<'a> {
fn run_one(&mut self, runner: &impl runner::Runner) -> runner::Result<()> {
let task = self.tasks_runnable.pop().expect("No runnable tasks left");
- runner.run(&task)?;
+ runner.run(self.tasks, &task)?;
let rdeps = self.rdeps.get(&task);
@@ -69,10 +69,9 @@ impl<'a> Executor<'a> {
Ok(())
}
- pub fn run(&mut self) -> runner::Result<()> {
- let runner = runner::runc::RuncRunner::new(self.tasks);
+ pub fn run(&mut self, runner: &impl runner::Runner) -> runner::Result<()> {
while !self.tasks_runnable.is_empty() {
- self.run_one(&runner)?;
+ self.run_one(runner)?;
}
assert!(self.tasks_blocked.is_empty(), "No runnable tasks left");
diff --git a/src/main.rs b/src/main.rs
index 8d4787d..d3ac7c8 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,5 +1,4 @@
mod executor;
-mod prepared_command;
mod recipe;
mod resolve;
mod runner;
@@ -7,43 +6,13 @@ mod types;
mod unshare;
mod util;
-use nix::{
- mount::{self, MsFlags},
- unistd,
-};
-use std::{io::Result, path::Path};
+use std::path::Path;
use types::*;
-use util::ToIOResult;
-
-fn mount_buildtmp() -> Result<()> {
- mount::mount::<_, _, _, str>(
- Some("buildtmp"),
- "build/tmp",
- Some("tmpfs"),
- MsFlags::empty(),
- None,
- )
- .to_io_result()
-}
-
-fn exec_shell() -> Result<std::convert::Infallible> {
- let bin_sh = std::ffi::CString::new("/bin/sh").unwrap();
- unistd::execv(&bin_sh, &[&bin_sh]).to_io_result()
-}
-
-fn execute(mut exc: executor::Executor) -> Result<()> {
- unshare::unshare()?;
- mount_buildtmp()?;
-
- exc.run()?;
-
- exec_shell()?;
-
- Ok(())
-}
fn main() {
+ let runner = unsafe { runner::runc::RuncRunner::new() }.unwrap();
+
let recipes = recipe::read_recipes(Path::new("examples")).unwrap();
let mut tasks: TaskMap = TaskMap::default();
@@ -64,9 +33,8 @@ fn main() {
std::process::exit(1);
}
let taskset = rsv.to_taskset();
- let exc = executor::Executor::new(&tasks, taskset);
-
- if let Err(error) = execute(exc) {
+ let mut exc = executor::Executor::new(&tasks, taskset);
+ if let Err(error) = exc.run(&runner) {
eprintln!("{}", error);
std::process::exit(1);
}
diff --git a/src/prepared_command.rs b/src/prepared_command.rs
deleted file mode 100644
index 8297095..0000000
--- a/src/prepared_command.rs
+++ /dev/null
@@ -1,100 +0,0 @@
-use std::{
- ffi::{CString, OsStr},
- io::{Error, ErrorKind, Result},
- iter,
- os::unix::{ffi::*, io::RawFd},
- ptr,
-};
-
-use libc::{c_char, c_void};
-use nix::{fcntl::OFlag, sys::wait, unistd};
-
-use crate::util::ToIOResult;
-
-#[derive(Clone, Debug)]
-pub struct PreparedCommandBuilder {
- program: CString,
- args: Vec<CString>,
-}
-
-#[derive(Debug)]
-pub struct PreparedCommand {
- child: unistd::Pid,
- pipew: RawFd,
-}
-
-impl Drop for PreparedCommand {
- fn drop(&mut self) {
- assert!(unistd::close(self.pipew).is_ok());
- }
-}
-
-fn os2c<S: AsRef<OsStr>>(s: S) -> CString {
- CString::new(s.as_ref().as_bytes()).unwrap()
-}
-
-impl PreparedCommandBuilder {
- pub fn arg<S: AsRef<OsStr>>(&mut self, arg: S) -> &mut Self {
- self.args.push(os2c(arg));
- self
- }
-
- pub fn prepare(&mut self) -> Result<PreparedCommand> {
- let exe_p = self.program.as_ptr();
-
- let argv: Vec<*const c_char> = iter::once(exe_p)
- .chain(self.args.iter().map(|arg| arg.as_ptr()))
- .chain(iter::once(ptr::null()))
- .collect();
-
- let argv_p = argv.as_ptr();
-
- let (piper, pipew) = unistd::pipe2(OFlag::O_CLOEXEC).to_io_result()?;
-
- unsafe {
- match unistd::fork().to_io_result()? {
- unistd::ForkResult::Parent { child } => {
- let cmd = PreparedCommand { child, pipew };
- assert!(unistd::close(piper).is_ok());
- return Ok(cmd);
- }
- unistd::ForkResult::Child => {}
- }
-
- // Child process - only async-signal-safe calls allowed
-
- if libc::close(pipew) != 0 {
- libc::_exit(126);
- }
-
- // Wait for run trigger
- let mut buf = [0u8; 1];
- if libc::read(piper, buf.as_mut_ptr() as *mut c_void, buf.len()) != 1 {
- // PreparedCommand was dropped, or controlling process exited
- libc::_exit(126);
- }
-
- libc::execvp(exe_p, argv_p);
-
- // exec failed
- libc::_exit(127);
- }
- }
-}
-
-impl PreparedCommand {
- pub fn new<S: AsRef<OsStr>>(program: S) -> PreparedCommandBuilder {
- PreparedCommandBuilder {
- program: os2c(program),
- args: Vec::new(),
- }
- }
-
- pub fn run(self) -> Result<wait::WaitStatus> {
- if unistd::write(self.pipew, &[0]).to_io_result()? != 1 {
- return Err(Error::new(ErrorKind::Other, "command trigger write failed"));
- }
-
- wait::waitpid(Some(self.child), None).to_io_result()
- }
-}
diff --git a/src/runner.rs b/src/runner.rs
index a290aeb..810586c 100644
--- a/src/runner.rs
+++ b/src/runner.rs
@@ -7,5 +7,5 @@ use crate::types::*;
pub type Result<T> = io::Result<T>;
pub trait Runner {
- fn run(&self, task: &TaskRef) -> Result<()>;
+ fn run(&self, tasks: &TaskMap, task: &TaskRef) -> Result<()>;
}
diff --git a/src/runner/runc.rs b/src/runner/runc.rs
index b323d5d..663dc7a 100644
--- a/src/runner/runc.rs
+++ b/src/runner/runc.rs
@@ -1,21 +1,87 @@
-use super::*;
-use crate::types::TaskMap;
+mod init;
+mod run;
-pub struct RuncRunner<'a> {
- tasks: &'a TaskMap,
+use std::{io, process};
+
+use ipc_channel::ipc;
+use nix::unistd;
+use serde::{Deserialize, Serialize};
+
+use crate::types::*;
+use crate::unshare;
+use crate::util::CheckDisconnect;
+
+#[derive(Debug, Deserialize, Serialize)]
+struct Request(TaskRef, Task, ipc::IpcSender<run::Error>);
+
+fn runner(init_error_sender: ipc::IpcSender<init::Error>, channel: ipc::IpcReceiver<Request>) -> ! {
+ if let Err(error) = init::runc_initialize() {
+ init_error_sender.send(error).expect("IPC send() failed");
+ process::exit(1);
+ }
+
+ drop(init_error_sender);
+
+ while let Ok(request) = channel.recv() {
+ let Request(task, task_def, reply_sender) = request;
+ if let Err(error) = run::handle_task(task, task_def) {
+ reply_sender.send(error).expect("IPC send() failed");
+ }
+ }
+
+ process::exit(0);
+}
+
+pub struct RuncRunner {
+ channel: ipc::IpcSender<Request>,
}
-impl<'a> RuncRunner<'a> {
- pub fn new(tasks: &'a TaskMap) -> Self {
- RuncRunner { tasks }
+impl RuncRunner {
+ /// Creates a new Runc runner
+ ///
+ /// Unsafe: Do not call in multithreaded processes
+ pub unsafe fn new() -> io::Result<Self> {
+ let (tx, rx) = ipc::channel().expect("IPC channel creation failed");
+ let (init_error_tx, init_error_rx) = ipc::channel().expect("IPC channel creation failed");
+
+ let pid = match unistd::fork().expect("fork()") {
+ unistd::ForkResult::Parent { child } => {
+ drop(rx);
+ drop(init_error_tx);
+ child
+ }
+ unistd::ForkResult::Child => {
+ drop(tx);
+ drop(init_error_rx);
+ runner(init_error_tx, rx);
+ /* Not reached */
+ }
+ };
+
+ init_error_rx
+ .recv()
+ .check_disconnect()
+ .expect("IPC recv() error")?;
+
+ unshare::idmap(pid)?;
+
+ Ok(RuncRunner { channel: tx })
}
}
-impl<'a> Runner for RuncRunner<'a> {
- fn run(&self, task: &TaskRef) -> Result<()> {
- let task_def = self.tasks.get(task).expect("Invalid TaskRef");
+impl super::Runner for RuncRunner {
+ fn run(&self, tasks: &TaskMap, task: &TaskRef) -> super::Result<()> {
+ let task_def = tasks.get(task).expect("Invalid TaskRef");
+ let (reply_tx, reply_rx) = ipc::channel().expect("IPC channel creation failed");
+
+ self.channel
+ .send(Request(task.clone(), task_def.clone(), reply_tx))
+ .expect("RuncRunner task submission failed");
- println!("{}:\n\t{:?}", task, task_def);
+ reply_rx
+ .recv()
+ .check_disconnect()
+ .expect("IPC recv() error")?;
Ok(())
}
diff --git a/src/runner/runc/init.rs b/src/runner/runc/init.rs
new file mode 100644
index 0000000..02b89e4
--- /dev/null
+++ b/src/runner/runc/init.rs
@@ -0,0 +1,47 @@
+use std::io;
+
+use nix::{
+ mount::{self, MsFlags},
+ sched::{self, CloneFlags},
+};
+use serde::{Deserialize, Serialize};
+
+fn mount_buildtmp() -> nix::Result<()> {
+ mount::mount::<_, _, _, str>(
+ Some("buildtmp"),
+ "build/tmp",
+ Some("tmpfs"),
+ MsFlags::empty(),
+ None,
+ )
+}
+
+#[derive(Debug, Deserialize, Serialize)]
+pub enum Error {
+ Code(i32),
+ String(String),
+}
+
+impl From<nix::Error> for Error {
+ fn from(error: nix::Error) -> Self {
+ match error {
+ nix::Error::Sys(code) => Error::Code(code as i32),
+ _ => Error::String(error.to_string()),
+ }
+ }
+}
+
+impl From<Error> for io::Error {
+ fn from(error: Error) -> Self {
+ match error {
+ Error::Code(code) => io::Error::from_raw_os_error(code),
+ Error::String(string) => io::Error::new(io::ErrorKind::Other, string),
+ }
+ }
+}
+
+pub fn runc_initialize() -> Result<(), Error> {
+ sched::unshare(CloneFlags::CLONE_NEWUSER | CloneFlags::CLONE_NEWNS)?;
+ mount_buildtmp()?;
+ Ok(())
+}
diff --git a/src/runner/runc/run.rs b/src/runner/runc/run.rs
new file mode 100644
index 0000000..2d13330
--- /dev/null
+++ b/src/runner/runc/run.rs
@@ -0,0 +1,18 @@
+use std::io;
+
+use serde::{Deserialize, Serialize};
+
+use crate::types::*;
+#[derive(Debug, Deserialize, Serialize)]
+pub struct Error;
+
+impl From<Error> for io::Error {
+ fn from(_: Error) -> Self {
+ io::Error::new(io::ErrorKind::Other, "Failed to run task")
+ }
+}
+
+pub fn handle_task(task: TaskRef, task_def: Task) -> Result<(), Error> {
+ println!("{}:\n\t{:?}", task, task_def);
+ Ok(())
+}
diff --git a/src/types.rs b/src/types.rs
index 1773504..331b49a 100644
--- a/src/types.rs
+++ b/src/types.rs
@@ -1,9 +1,9 @@
-use serde::Deserialize;
+use serde::{Deserialize, Serialize};
use std::collections::HashMap;
pub type TaskRef = String;
-#[derive(Clone, Debug, Deserialize)]
+#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Task {
#[serde(default)]
pub depends: Vec<TaskRef>,
diff --git a/src/unshare.rs b/src/unshare.rs
index a8ec1c1..da16d24 100644
--- a/src/unshare.rs
+++ b/src/unshare.rs
@@ -4,15 +4,13 @@ use std::{
io::{self, BufRead, Result},
os::unix::ffi::*,
path::Path,
+ process,
};
-use nix::{
- sched::{self, CloneFlags},
- unistd,
-};
+use nix::unistd;
-use crate::prepared_command::PreparedCommand;
-use crate::util::{Checkable, ToIOResult};
+// use crate::prepared_command::PreparedCommand;
+use crate::util::Checkable;
#[derive(Debug, Eq, Ord, PartialEq, PartialOrd)]
struct SubIDRange {
@@ -98,27 +96,22 @@ fn get_gid_map() -> Result<Vec<SubIDMap>> {
Ok(generate_idmap(gid, gid_ranges))
}
-fn prepare_idmap_cmd(cmd: &str, pid: &str, map: &Vec<SubIDMap>) -> Result<PreparedCommand> {
- let mut builder = PreparedCommand::new(cmd);
- builder.arg(&pid);
+fn run_idmap_cmd(cmd: &str, pid: &str, map: &Vec<SubIDMap>) -> Result<()> {
+ let mut builder = process::Command::new(cmd);
+ builder.arg(pid);
for uids in map {
builder.arg(uids.lower.to_string());
builder.arg(uids.upper.to_string());
builder.arg(uids.count.to_string());
}
- builder.prepare()
+ builder.status().and_then(|status| status.check())
}
-pub fn unshare() -> Result<()> {
- let pid = unistd::getpid().to_string();
-
- let newuidmap = prepare_idmap_cmd("newuidmap", pid.as_str(), &get_uid_map()?)?;
- let newgidmap = prepare_idmap_cmd("newgidmap", pid.as_str(), &get_gid_map()?)?;
-
- sched::unshare(CloneFlags::CLONE_NEWUSER | CloneFlags::CLONE_NEWNS).to_io_result()?;
+pub fn idmap(pid: unistd::Pid) -> Result<()> {
+ let pid_string = pid.to_string();
- newuidmap.run()?.check()?;
- newgidmap.run()?.check()?;
+ run_idmap_cmd("newuidmap", pid_string.as_str(), &get_uid_map()?)?;
+ run_idmap_cmd("newgidmap", pid_string.as_str(), &get_gid_map()?)?;
Ok(())
}
diff --git a/src/util.rs b/src/util.rs
index 4e996e8..0418c51 100644
--- a/src/util.rs
+++ b/src/util.rs
@@ -1,8 +1,10 @@
use std::{
io::{Error, ErrorKind, Result},
process::ExitStatus,
+ result,
};
+use ipc_channel::ipc;
use nix::sys::wait;
pub trait ToIOResult<T> {
@@ -46,3 +48,21 @@ impl Checkable for wait::WaitStatus {
}
}
}
+
+pub trait CheckDisconnect {
+ type Output;
+
+ fn check_disconnect(self) -> Result<Self::Output>;
+}
+
+impl<T> CheckDisconnect for result::Result<T, ipc::IpcError> {
+ type Output = result::Result<(), T>;
+
+ fn check_disconnect(self) -> Result<Self::Output> {
+ match self {
+ Ok(v) => Ok(Err(v)),
+ Err(ipc::IpcError::Disconnected) => Ok(Ok(())),
+ Err(error) => Err(Error::new(ErrorKind::Other, format!("{:?}", error))),
+ }
+ }
+}