summaryrefslogtreecommitdiffstats
path: root/crates/rebel-runner/src
diff options
context:
space:
mode:
Diffstat (limited to 'crates/rebel-runner/src')
-rw-r--r--crates/rebel-runner/src/init.rs68
-rw-r--r--crates/rebel-runner/src/jobserver.rs79
-rw-r--r--crates/rebel-runner/src/lib.rs217
-rw-r--r--crates/rebel-runner/src/ns.rs84
-rw-r--r--crates/rebel-runner/src/paths.rs118
-rw-r--r--crates/rebel-runner/src/tar.rs105
-rw-r--r--crates/rebel-runner/src/task.rs638
-rw-r--r--crates/rebel-runner/src/util/checkable.rs37
-rw-r--r--crates/rebel-runner/src/util/cjson.rs37
-rw-r--r--crates/rebel-runner/src/util/clone.rs59
-rw-r--r--crates/rebel-runner/src/util/fs.rs127
-rw-r--r--crates/rebel-runner/src/util/mod.rs7
-rw-r--r--crates/rebel-runner/src/util/stack.rs25
-rw-r--r--crates/rebel-runner/src/util/steal.rs40
-rw-r--r--crates/rebel-runner/src/util/unix.rs86
15 files changed, 1727 insertions, 0 deletions
diff --git a/crates/rebel-runner/src/init.rs b/crates/rebel-runner/src/init.rs
new file mode 100644
index 0000000..0172a01
--- /dev/null
+++ b/crates/rebel-runner/src/init.rs
@@ -0,0 +1,68 @@
+use nix::mount::{self, MsFlags};
+
+use rebel_common::error::*;
+
+use crate::{paths, util::fs};
+
+fn prepare_dev(path: &str) -> Result<()> {
+ fs::mkdir(path)?;
+ mount::mount::<_, _, str, str>(Some(path), path, None, MsFlags::MS_BIND, None)
+ .context("Failed to bind mount container /dev")?;
+
+ for dir in ["pts", "shm"] {
+ fs::mkdir(paths::join(&[path, dir]))?;
+ }
+
+ for (link, target) in [
+ ("fd", "/proc/self/fd"),
+ ("stdin", "/proc/self/fd/0"),
+ ("stdout", "/proc/self/fd/1"),
+ ("stderr", "/proc/self/fd/2"),
+ ("ptmx", "pts/ptmx"),
+ ] {
+ let path = paths::join(&[path, link]);
+ std::os::unix::fs::symlink(target, &path)
+ .with_context(|| format!("Failed to create link {}", path))?;
+ }
+
+ for dev in ["null", "zero", "full", "random", "urandom", "tty"] {
+ let source = paths::join(&["/dev", dev]);
+ let target = paths::join(&[path, dev]);
+ fs::create(&target)?;
+ mount::mount::<str, str, str, str>(Some(&source), &target, None, MsFlags::MS_BIND, None)
+ .with_context(|| format!("Failed to bind mount {}", source))?;
+ }
+
+ mount::mount::<str, _, str, str>(
+ None,
+ path,
+ None,
+ MsFlags::MS_REMOUNT | MsFlags::MS_BIND | MsFlags::MS_RDONLY,
+ None,
+ )
+ .context("Failed to mount container /dev read-only")?;
+
+ Ok(())
+}
+
+pub fn init_runner() -> Result<()> {
+ fs::mkdir(paths::LAYER_STATE_DIR)?;
+ fs::mkdir(paths::OUTPUT_STATE_DIR)?;
+
+ fs::ensure_removed(paths::TMP_DIR)?;
+ fs::mkdir(paths::TMP_DIR)?;
+ mount::mount::<_, _, str, str>(
+ Some(paths::TMP_DIR),
+ paths::TMP_DIR,
+ None,
+ MsFlags::MS_BIND,
+ None,
+ )
+ .context("Failed to bind mount build tmpdir")?;
+ mount::mount::<str, _, str, str>(None, paths::TMP_DIR, None, MsFlags::MS_PRIVATE, None)
+ .context("Failed to set MS_PRIVATE for build tmpdir")?;
+
+ prepare_dev(paths::DEV_DIR)?;
+
+ Ok(())
+}
diff --git a/crates/rebel-runner/src/jobserver.rs b/crates/rebel-runner/src/jobserver.rs
new file mode 100644
index 0000000..7c3f2f7
--- /dev/null
+++ b/crates/rebel-runner/src/jobserver.rs
@@ -0,0 +1,79 @@
+use std::{
+ os::fd::{AsFd, AsRawFd, OwnedFd},
+ slice,
+};
+
+use nix::{errno::Errno, fcntl::OFlag, poll, unistd};
+
+use rebel_common::error::*;
+
+use super::util::unix;
+
+#[derive(Debug)]
+pub struct Jobserver {
+ tokens: usize,
+ r: OwnedFd,
+ w: OwnedFd,
+}
+
+impl Jobserver {
+ pub fn new(tokens: usize) -> Result<Jobserver> {
+ let (r, w) = unistd::pipe2(OFlag::O_CLOEXEC | OFlag::O_NONBLOCK).context("pipe()")?;
+
+ for _ in 0..tokens {
+ if unistd::write(w.as_fd(), b"+").is_err() {
+ break;
+ }
+ }
+ unix::set_blocking(&w, true)?;
+
+ Ok(Jobserver { tokens, r, w })
+ }
+
+ pub fn wait(&mut self) -> u8 {
+ loop {
+ poll::poll(
+ &mut [poll::PollFd::new(self.r.as_fd(), poll::PollFlags::POLLIN)],
+ poll::PollTimeout::NONE,
+ )
+ .expect("poll()");
+
+ let mut token = 0;
+ match unistd::read(self.r.as_raw_fd(), slice::from_mut(&mut token)) {
+ Ok(n) => {
+ assert!(n == 1);
+ return token;
+ }
+ Err(Errno::EAGAIN) => {
+ // Token was sniped by another task
+ continue;
+ }
+ error @ Err(_) => {
+ error.expect("read()");
+ }
+ }
+ }
+ }
+
+ pub fn post(&mut self, token: u8) {
+ let n = unistd::write(self.w.as_fd(), slice::from_ref(&token)).expect("write()");
+ assert!(n == 1);
+ }
+
+ pub fn set_cloexec(&mut self, cloexec: bool) -> Result<()> {
+ unix::set_cloexec(&self.r, cloexec)?;
+ unix::set_cloexec(&self.w, cloexec)?;
+ Ok(())
+ }
+
+ pub fn to_makeflags(&self) -> String {
+ format!(
+ " -j{} --jobserver-auth={},{}",
+ self.tokens,
+ self.r.as_raw_fd(),
+ self.w.as_raw_fd(),
+ )
+ }
+}
+
+// FIXME Log lost tokens on drop
diff --git a/crates/rebel-runner/src/lib.rs b/crates/rebel-runner/src/lib.rs
new file mode 100644
index 0000000..7dde05d
--- /dev/null
+++ b/crates/rebel-runner/src/lib.rs
@@ -0,0 +1,217 @@
+mod init;
+mod jobserver;
+mod ns;
+mod paths;
+mod tar;
+mod task;
+mod util;
+
+use std::{
+ collections::HashSet,
+ fs::File,
+ net,
+ os::unix::{net::UnixStream, prelude::*},
+ process, slice,
+};
+
+use capctl::prctl;
+use nix::{
+ errno::Errno,
+ fcntl::Flock,
+ poll,
+ sched::CloneFlags,
+ sys::{
+ signal,
+ signalfd::{SfdFlags, SignalFd},
+ stat, wait,
+ },
+ unistd::{self, Gid, Pid, Uid},
+};
+use uds::UnixSeqpacketConn;
+
+use rebel_common::{error::*, types::*};
+
+use jobserver::Jobserver;
+use util::{checkable::Checkable, clone, steal::Steal, unix};
+
+#[derive(Debug, Clone)]
+pub struct Options {
+ pub jobs: Option<usize>,
+}
+
+#[derive(Debug)]
+struct RunnerContext {
+ socket: Steal<UnixSeqpacketConn>,
+ jobserver: Jobserver,
+ tasks: HashSet<Pid>,
+}
+
+fn handle_sigchld(ctx: &mut RunnerContext) -> Result<()> {
+ loop {
+ let status = match wait::waitpid(Pid::from_raw(-1), Some(wait::WaitPidFlag::WNOHANG)) {
+ Ok(wait::WaitStatus::StillAlive) | Err(Errno::ECHILD) => return Ok(()),
+ res => res.expect("waitpid()"),
+ };
+ let pid = status.pid().unwrap();
+ if ctx.tasks.remove(&pid) {
+ status.check()?;
+ }
+ }
+}
+
+fn handle_request(ctx: &mut RunnerContext, request_socket: UnixStream) {
+ let run = || {
+ ctx.socket.steal();
+
+ let task: Task =
+ bincode::deserialize_from(&request_socket).expect("Failed to decode task description");
+
+ prctl::set_name(&task.label).expect("prctl(PR_SET_NAME)");
+
+ let result = task::handle(task, &mut ctx.jobserver);
+ bincode::serialize_into(&request_socket, &result).expect("Failed to send task result");
+ drop(request_socket);
+ };
+
+ let pid = unsafe { clone::spawn(None, run) }.expect("fork()");
+ assert!(ctx.tasks.insert(pid));
+}
+
+fn handle_socket(ctx: &mut RunnerContext) -> bool {
+ let mut fd = 0;
+
+ match ctx
+ .socket
+ .recv_fds(&mut [0], slice::from_mut(&mut fd))
+ .expect("recv_fds()")
+ {
+ (1, _, n_fd) => {
+ assert!(n_fd == 1);
+ }
+ _ => return false,
+ }
+
+ let request_socket = unsafe { UnixStream::from_raw_fd(fd) };
+ handle_request(ctx, request_socket);
+ true
+}
+
+fn borrow_socket_fd(socket: &UnixSeqpacketConn) -> BorrowedFd<'_> {
+ unsafe { BorrowedFd::borrow_raw(socket.as_raw_fd()) }
+}
+
+fn runner(
+ uid: Uid,
+ gid: Gid,
+ socket: UnixSeqpacketConn,
+ _lockfile: Flock<File>,
+ options: &Options,
+) -> ! {
+ unistd::setsid().expect("setsid()");
+ ns::mount_proc();
+ ns::setup_userns(Uid::from_raw(0), Gid::from_raw(0), uid, gid);
+
+ stat::umask(stat::Mode::from_bits_truncate(0o022));
+
+ init::init_runner().unwrap();
+
+ let jobs = options
+ .jobs
+ .unwrap_or_else(|| unix::nproc().expect("Failed to get number of available CPUs"));
+ let jobserver = Jobserver::new(jobs).expect("Failed to initialize jobserver pipe");
+ let mut ctx = RunnerContext {
+ socket: socket.into(),
+ jobserver,
+ tasks: HashSet::new(),
+ };
+
+ let mut signals = signal::SigSet::empty();
+ signals.add(signal::Signal::SIGCHLD);
+ signal::pthread_sigmask(signal::SigmaskHow::SIG_BLOCK, Some(&signals), None)
+ .expect("pthread_sigmask()");
+ let mut signal_fd = SignalFd::with_flags(&signals, SfdFlags::SFD_CLOEXEC)
+ .expect("Failed to create signal file descriptor");
+
+ loop {
+ let socket_fd = borrow_socket_fd(&ctx.socket);
+ let mut pollfds = [
+ poll::PollFd::new(signal_fd.as_fd(), poll::PollFlags::POLLIN),
+ poll::PollFd::new(socket_fd.as_fd(), poll::PollFlags::POLLIN),
+ ];
+ poll::poll(&mut pollfds, poll::PollTimeout::NONE).expect("poll()");
+
+ let signal_events = pollfds[0]
+ .revents()
+ .expect("Unknown events in poll() return");
+ let socket_events = pollfds[1]
+ .revents()
+ .expect("Unknown events in poll() return");
+
+ if signal_events.contains(poll::PollFlags::POLLIN) {
+ let _signal = signal_fd.read_signal().expect("read_signal()").unwrap();
+ handle_sigchld(&mut ctx).expect("Task process exited abnormally");
+ } else if signal_events.intersects(!poll::PollFlags::POLLIN) {
+ panic!("Unexpected error status for signal file descriptor");
+ }
+
+ if socket_events.contains(poll::PollFlags::POLLIN) {
+ if !handle_socket(&mut ctx) {
+ break;
+ }
+ } else if socket_events.intersects(!poll::PollFlags::POLLIN) {
+ panic!("Unexpected error status for socket file descriptor");
+ }
+ }
+
+ process::exit(0);
+}
+
+pub struct Runner {
+ socket: UnixSeqpacketConn,
+}
+
+impl Runner {
+ /// Creates a new container runner
+ ///
+ /// # Safety
+ ///
+ /// Do not call in multithreaded processes.
+ pub unsafe fn new(options: &Options) -> Result<Self> {
+ let lockfile = unix::lock(paths::LOCKFILE, true, false)
+ .context("Failed to get lock on build directory, is another instance running?")?;
+
+ let uid = unistd::geteuid();
+ let gid = unistd::getegid();
+
+ let (local, remote) = UnixSeqpacketConn::pair().expect("socketpair()");
+
+ match clone::clone(
+ CloneFlags::CLONE_NEWUSER | CloneFlags::CLONE_NEWNS | CloneFlags::CLONE_NEWPID,
+ )
+ .expect("clone()")
+ {
+ unistd::ForkResult::Parent { .. } => Ok(Runner { socket: local }),
+ unistd::ForkResult::Child => {
+ drop(local);
+ runner(uid, gid, remote, lockfile, options);
+ }
+ }
+ }
+
+ pub fn spawn(&self, task: &Task) -> UnixStream {
+ let (local, remote) = UnixStream::pair().expect("socketpair()");
+
+ self.socket
+ .send_fds(&[0], &[remote.as_raw_fd()])
+ .expect("send()");
+
+ bincode::serialize_into(&local, task).expect("Task submission failed");
+ local.shutdown(net::Shutdown::Write).expect("shutdown()");
+
+ local
+ }
+
+ pub fn result(socket: &UnixStream) -> Result<TaskOutput> {
+ bincode::deserialize_from(socket).expect("Failed to read task result")
+ }
+}
diff --git a/crates/rebel-runner/src/ns.rs b/crates/rebel-runner/src/ns.rs
new file mode 100644
index 0000000..986aa80
--- /dev/null
+++ b/crates/rebel-runner/src/ns.rs
@@ -0,0 +1,84 @@
+use nix::{
+ mount::{self, MntFlags, MsFlags},
+ sched::CloneFlags,
+ unistd::{self, Gid, Pid, Uid},
+};
+
+use rebel_common::error::*;
+
+use super::util::clone;
+
+pub fn mount_proc() {
+ mount::mount::<_, _, _, str>(Some("proc"), "/proc", Some("proc"), MsFlags::empty(), None)
+ .expect("Failed to mount /proc");
+}
+
+pub fn setup_userns(inner_uid: Uid, inner_gid: Gid, outer_uid: Uid, outer_gid: Gid) {
+ std::fs::write("/proc/self/setgroups", "deny").expect("Failed to write /proc/self/setgroups");
+ std::fs::write(
+ "/proc/self/uid_map",
+ format!("{} {} 1", inner_uid, outer_uid),
+ )
+ .expect("Failed to write /proc/self/uid_map");
+ std::fs::write(
+ "/proc/self/gid_map",
+ format!("{} {} 1", inner_gid, outer_gid),
+ )
+ .expect("Failed to write /proc/self/gid_map");
+}
+
+pub unsafe fn spawn<F>(flags: CloneFlags, f: F) -> nix::Result<Pid>
+where
+ F: FnOnce(),
+{
+ assert!(flags.contains(CloneFlags::CLONE_NEWNS) || !flags.contains(CloneFlags::CLONE_NEWPID));
+
+ clone::spawn(Some(flags), || {
+ if flags.contains(CloneFlags::CLONE_NEWPID) {
+ mount_proc();
+ }
+ f()
+ })
+}
+
+pub fn pivot_root(path: &str) {
+ (|| -> Result<()> {
+ unistd::chdir(path).context("chdir()")?;
+ mount::mount::<_, _, str, str>(Some("/proc"), "proc", None, MsFlags::MS_BIND, None)
+ .context("Failed to bind mount /proc")?;
+ unistd::pivot_root(".", ".").context("pivot_root()")?;
+ mount::umount2(".", MntFlags::MNT_DETACH).context("umount2()")?;
+ unistd::chdir("/").context("chdir(\"/\")")?;
+ Ok(())
+ })()
+ .expect("Failed to pivot root");
+}
+
+pub fn container_mounts() -> Result<()> {
+ mount::mount(
+ Some("tmp"),
+ "/tmp",
+ Some("tmpfs"),
+ MsFlags::MS_NODEV | MsFlags::MS_NOSUID,
+ Some("mode=1777,size=1048576k"),
+ )
+ .context("Failed to mount /tmp")?;
+ mount::mount(
+ Some("devpts"),
+ "/dev/pts",
+ Some("devpts"),
+ MsFlags::MS_NOSUID | MsFlags::MS_NOEXEC,
+ Some("newinstance,ptmxmode=0666,mode=0620"),
+ )
+ .context("Failed to mount /dev/pts")?;
+ mount::mount(
+ Some("shm"),
+ "/dev/shm",
+ Some("tmpfs"),
+ MsFlags::MS_NOSUID | MsFlags::MS_NOEXEC | MsFlags::MS_NODEV,
+ Some("mode=1777,size=65536k"),
+ )
+ .context("Failed to mount /dev/shm")?;
+
+ Ok(())
+}
diff --git a/crates/rebel-runner/src/paths.rs b/crates/rebel-runner/src/paths.rs
new file mode 100644
index 0000000..84f9c4d
--- /dev/null
+++ b/crates/rebel-runner/src/paths.rs
@@ -0,0 +1,118 @@
+//! Build directory structure used through rebel
+//!
+//! # Current structure
+//!
+//! ```text
+//! build/
+//! ├── build.lock
+//! ├── downloads/
+//! │   └── ...
+//! ├── state/
+//! │   ├── output/
+//! │   │  ├── <input hash>.tar.tmp # during packing
+//! │   │  ├── <archive hash>.tar # files are renamed when packing is finished
+//! │   │   └── ...
+//! │   ├── layer/
+//! │   │ ├── <layer hash>/ # overlayfs layer dir of finished tasks
+//! │   │ └── ...
+//! │   └── task/
+//! │     ├── <input hash>/
+//! │ │ ├── layer/ # overlayfs layer dir (moved to layer/ after build)
+//! │ │ ├── work/ # overlayfs work dir (discarded after build)
+//! │ │ ├── task.json.tmp # during write
+//! │ │ ├── task.json # after write
+//! │ │ ├── task.log # stdout/stderr output of the task
+//! │ │ └── task.lock # task lockfile
+//! │   └── ...
+//! └── tmp/ # temporary files (cleaned on start)
+//!    ├── dev/ # container /dev
+//!    ├── depends/ # unpacked dependencies
+//!    └── task/
+//!    └── <input hash>/
+//! ├── build/ # mount point for /build directory
+//! │ ├── downloads/ # downloaded sources
+//! │ ├── task/ # internal runner files
+//! │ └── work/ # build overlay mountpoint
+//! └── rootfs/ # rootfs overlay mountpoint
+//! ```
+
+use rebel_common::string_hash::*;
+
+pub const DOWNLOADS_DIR: &str = "build/downloads";
+pub const PIN_DIR: &str = "build/pinned";
+
+pub const TMP_DIR: &str = "build/tmp";
+pub const DEV_DIR: &str = "build/tmp/dev";
+pub const DEPENDS_TMP_DIR: &str = "build/tmp/depends";
+pub const TASK_TMP_DIR: &str = "build/tmp/task";
+
+pub const TASK_TMP_ROOTFS_SUBDIR: &str = "rootfs";
+
+pub const LOCKFILE: &str = "build/build.lock";
+pub const OUTPUT_STATE_DIR: &str = "build/state/output";
+pub const TASK_STATE_DIR: &str = "build/state/task";
+pub const LAYER_STATE_DIR: &str = "build/state/layer";
+
+pub const TASK_STATE_LAYER_SUBDIR: &str = "layer";
+pub const TASK_STATE_WORK_SUBDIR: &str = "work";
+
+pub const TASK_BUILDDIR: &str = "build";
+pub const TASK_TASKDIR: &str = "build/task";
+
+pub const TASK_RUN: &str = "run";
+
+pub fn join(paths: &[&str]) -> String {
+ paths.join("/")
+}
+
+pub fn task_tmp_dir(hash: &InputHash) -> String {
+ join(&[TASK_TMP_DIR, &hash.to_string()])
+}
+
+pub fn task_state_dir(hash: &InputHash) -> String {
+ join(&[TASK_STATE_DIR, &hash.to_string()])
+}
+
+pub fn task_cache_tmp_filename(hash: &InputHash) -> String {
+ join(&[TASK_STATE_DIR, &hash.to_string(), "task.json.tmp"])
+}
+
+pub fn task_cache_filename(hash: &InputHash) -> String {
+ join(&[TASK_STATE_DIR, &hash.to_string(), "task.json"])
+}
+
+pub fn task_log_filename(hash: &InputHash) -> String {
+ join(&[TASK_STATE_DIR, &hash.to_string(), "task.log"])
+}
+
+pub fn task_lock_filename(hash: &InputHash) -> String {
+ join(&[TASK_STATE_DIR, &hash.to_string(), "task.lock"])
+}
+
+pub fn layer_dir(hash: &LayerHash) -> String {
+ join(&[LAYER_STATE_DIR, &hash.to_string()])
+}
+
+pub fn depend_tmp_dir(hash: &ArchiveHash) -> String {
+ join(&[DEPENDS_TMP_DIR, &hash.to_string()])
+}
+
+pub fn depend_dir(hash: &ArchiveHash) -> String {
+ join(&[DEPENDS_TMP_DIR, &hash.to_string()])
+}
+
+pub fn depend_lock_filename(hash: &ArchiveHash) -> String {
+ join(&[DEPENDS_TMP_DIR, &format!("{}.lock", hash)])
+}
+
+pub fn archive_tmp_filename(hash: &InputHash) -> String {
+ join(&[OUTPUT_STATE_DIR, &format!("{}.tar.tmp", hash)])
+}
+
+pub fn archive_filename(hash: &ArchiveHash) -> String {
+ join(&[OUTPUT_STATE_DIR, &format!("{}.tar", hash)])
+}
+
+pub fn pinned_archive_filename(name: &str) -> String {
+ join(&[PIN_DIR, &format!("{}.tar", name)])
+}
diff --git a/crates/rebel-runner/src/tar.rs b/crates/rebel-runner/src/tar.rs
new file mode 100644
index 0000000..891c603
--- /dev/null
+++ b/crates/rebel-runner/src/tar.rs
@@ -0,0 +1,105 @@
+use std::{
+ io::{self, Read, Write},
+ os::unix::prelude::CommandExt,
+ path::Path,
+ process::{self, Command, Stdio},
+};
+
+use nix::{
+ mount::{self, MsFlags},
+ sched::CloneFlags,
+ sys::wait,
+};
+
+use rebel_common::{error::*, string_hash::ArchiveHash};
+
+use super::{
+ ns,
+ util::{checkable::Checkable, fs},
+};
+use crate::paths;
+
+pub fn pack<W: Write, P: AsRef<Path>>(
+ rootfs_hash: &ArchiveHash,
+ archive: &mut W,
+ source: P,
+) -> Result<()> {
+ let rootfs = paths::depend_dir(rootfs_hash);
+ let _rootfs_mount = fs::mount(&rootfs, &rootfs, None, MsFlags::MS_BIND, None)
+ .with_context(|| format!("Failed to bind mount rootfs to {:?}", rootfs))?;
+ mount::mount::<str, str, str, str>(
+ None,
+ &rootfs,
+ None,
+ MsFlags::MS_REMOUNT | MsFlags::MS_BIND | MsFlags::MS_RDONLY,
+ None,
+ )
+ .context("Failed to mount container rootfs read-only")?;
+
+ let (mut piper, pipew) = fs::pipe()?;
+
+ let exec_tar = || -> Result<()> {
+ // We are in our own mount namespace, so mounting into the shared rootfs is fine
+ let dev_target = paths::join(&[&rootfs, "dev"]);
+ mount::mount::<_, _, str, str>(
+ Some(paths::DEV_DIR),
+ dev_target.as_str(),
+ None,
+ MsFlags::MS_BIND | MsFlags::MS_REC,
+ None,
+ )?;
+ let mount_target = paths::join(&[&rootfs, paths::TASK_BUILDDIR]);
+ mount::mount::<_, _, str, str>(
+ Some(source.as_ref()),
+ mount_target.as_str(),
+ None,
+ MsFlags::MS_BIND,
+ None,
+ )?;
+
+ ns::pivot_root(&rootfs);
+
+ let err = Command::new("tar")
+ .args([
+ "-c",
+ "--sort=name",
+ "--numeric-owner",
+ "--owner=0",
+ "--group=0",
+ "--mtime=@0",
+ ".",
+ ])
+ .stdin(Stdio::null())
+ .stdout(pipew)
+ .current_dir(paths::TASK_BUILDDIR)
+ .env_clear()
+ .env("PATH", "/usr/sbin:/usr/bin:/sbin:/bin")
+ .exec();
+ eprintln!("{}", err);
+ process::exit(127);
+ };
+
+ let pid = unsafe { ns::spawn(CloneFlags::CLONE_NEWNS, || exec_tar().unwrap()) }
+ .context("Failed to run tar")?;
+
+ let result = io::copy(&mut piper, archive).context("Failed to write TAR archive");
+
+ wait::waitpid(pid, None)?
+ .check()
+ .context("tar did not exit successfully")?;
+
+ result?;
+ Ok(())
+}
+
+pub fn unpack<R: Read, P: AsRef<Path>>(archive: R, dest: P) -> Result<()> {
+ fs::mkdir(&dest)?;
+
+ let mut ar = tar::Archive::new(archive);
+ ar.set_preserve_permissions(true);
+ ar.set_preserve_mtime(true);
+ ar.set_unpack_xattrs(true);
+ ar.set_overwrite(false);
+
+ ar.unpack(dest).context("Failed to unpack TAR archive")
+}
diff --git a/crates/rebel-runner/src/task.rs b/crates/rebel-runner/src/task.rs
new file mode 100644
index 0000000..5bb253a
--- /dev/null
+++ b/crates/rebel-runner/src/task.rs
@@ -0,0 +1,638 @@
+use std::{
+ collections::{BTreeMap, HashMap, HashSet},
+ io::{self, BufWriter},
+ os::unix::prelude::CommandExt,
+ path::{Path, PathBuf},
+ process::{self, Command, Stdio},
+ time::Instant,
+};
+
+use capctl::prctl;
+use nix::{
+ mount::{self, MsFlags},
+ sched::{unshare, CloneFlags},
+ sys::{resource, time::TimeVal, wait},
+ unistd::{self, Gid, Uid},
+};
+use serde::Serialize;
+use tee_readwrite::{TeeReader, TeeWriter};
+
+use rebel_common::{error::*, string_hash::*, types::*};
+use walkdir::WalkDir;
+
+use super::{
+ jobserver::Jobserver,
+ ns, tar,
+ util::{checkable::Checkable, cjson, fs},
+};
+use crate::{
+ paths,
+ util::{stack::Stack, unix},
+};
+
+const BUILD_UID: Uid = Uid::from_raw(1000);
+const BUILD_GID: Gid = Gid::from_raw(1000);
+
+type InputHasher = blake3::Hasher;
+type DependencyHasher = blake3::Hasher;
+type LayerHasher = blake3::Hasher;
+type ArchiveHasher = blake3::Hasher;
+
+type DependMap = BTreeMap<String, Vec<ArchiveHash>>;
+
+fn dependency_hash(dep: &Dependency) -> DependencyHash {
+ DependencyHash(StringHash(
+ cjson::digest::<DependencyHasher, _>(dep).unwrap().into(),
+ ))
+}
+
+fn input_hash(task: &Task) -> InputHash {
+ #[derive(Debug, Serialize)]
+ struct HashInput<'a> {
+ pub command: &'a str,
+ pub workdir: &'a str,
+ pub rootfs: &'a ArchiveHash,
+ pub ancestors: &'a [LayerHash],
+ pub depends: HashMap<DependencyHash, &'a Dependency>,
+ pub outputs: &'a HashMap<String, String>,
+ }
+ let input = HashInput {
+ command: &task.command,
+ workdir: &task.workdir,
+ rootfs: &task.rootfs,
+ ancestors: &task.ancestors,
+ depends: task
+ .depends
+ .iter()
+ .map(|dep| (dependency_hash(dep), dep))
+ .collect(),
+ outputs: &task.outputs,
+ };
+
+ InputHash(StringHash(
+ cjson::digest::<InputHasher, _>(&input).unwrap().into(),
+ ))
+}
+
+fn init_task(input_hash: &InputHash, task: &Task) -> Result<fs::Mount> {
+ // Remove metadata first to ensure task invalidation
+ fs::ensure_removed(paths::task_cache_filename(input_hash))?;
+
+ let task_state_dir = paths::task_state_dir(input_hash);
+
+ let task_layer_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_LAYER_SUBDIR]);
+ fs::ensure_removed(&task_layer_dir)?;
+ fs::mkdir(&task_layer_dir)?;
+ fs::fixup_permissions(&task_layer_dir)?;
+
+ let task_tmp_dir = paths::task_tmp_dir(input_hash);
+
+ let taskdir = paths::join(&[&task_tmp_dir, paths::TASK_TASKDIR]);
+ fs::mkdir(&taskdir)?;
+ let runfile = paths::join(&[&taskdir, paths::TASK_RUN]);
+ std::fs::write(&runfile, &task.command)
+ .with_context(|| format!("Failed to write {}", runfile))?;
+
+ let mount_target = paths::join(&[&task_tmp_dir, &task.workdir]);
+ let mount = if task.ancestors.is_empty() {
+ fs::mount(task_layer_dir, &mount_target, None, MsFlags::MS_BIND, None)
+ .with_context(|| format!("Failed to bind mount to {:?}", mount_target))?
+ } else {
+ let task_work_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_WORK_SUBDIR]);
+ fs::ensure_removed(&task_work_dir)?;
+ fs::mkdir(&task_work_dir)?;
+ fs::fixup_permissions(&task_work_dir)?;
+
+ let lower = task
+ .ancestors
+ .iter()
+ .rev()
+ .map(paths::layer_dir)
+ .collect::<Vec<_>>()
+ .join(":");
+ let options = format!(
+ "xino=off,index=off,metacopy=off,lowerdir={lower},upperdir={upper},workdir={work}",
+ lower = lower,
+ upper = task_layer_dir,
+ work = task_work_dir
+ );
+ fs::mount(
+ "overlay",
+ &mount_target,
+ Some("overlay"),
+ MsFlags::empty(),
+ Some(&options),
+ )
+ .with_context(|| format!("Failed to mount work overlay to {:?}", mount_target))?
+ };
+
+ Ok(mount)
+}
+
+fn get_contents<P1: AsRef<Path>, P2: AsRef<Path>>(
+ path: P1,
+ prefix: P2,
+) -> Result<(HashSet<PathBuf>, HashSet<PathBuf>)> {
+ let mut dirs = HashSet::new();
+ let mut files = HashSet::new();
+
+ let root: PathBuf = Path::new("/").join(prefix.as_ref());
+
+ for result in WalkDir::new(path.as_ref()).min_depth(1).into_iter() {
+ let entry = result
+ .with_context(|| format!("Failed to list contents of directory {:?}", path.as_ref()))?;
+ let is_dir = entry.file_type().is_dir();
+ let entry_path = root.join(entry.into_path().strip_prefix(path.as_ref()).unwrap());
+ if is_dir {
+ dirs.insert(entry_path);
+ } else {
+ files.insert(entry_path);
+ }
+ }
+
+ dirs.insert(root);
+
+ Ok((dirs, files))
+}
+
+fn check_conflicts(
+ dirs1: &HashSet<PathBuf>,
+ files1: &HashSet<PathBuf>,
+ dirs2: &HashSet<PathBuf>,
+ files2: &HashSet<PathBuf>,
+) -> Result<()> {
+ let mut conflicts = Vec::new();
+
+ conflicts.extend(files1.intersection(files2));
+ conflicts.extend(dirs1.intersection(files2));
+ conflicts.extend(files1.intersection(dirs2));
+
+ if !conflicts.is_empty() {
+ let mut conflict_strings: Box<[_]> = conflicts
+ .into_iter()
+ .map(|path| path.to_string_lossy().to_string())
+ .collect();
+ conflict_strings.sort();
+ return Err(Error::new(format!(
+ "Found the following file conflicts in dependencies:\n{}",
+ conflict_strings.join("\n")
+ )));
+ }
+
+ Ok(())
+}
+
+fn init_task_rootfs(input_hash: &InputHash, task: &Task) -> Result<Stack<fs::Mount>> {
+ let task_tmp_dir = paths::task_tmp_dir(input_hash);
+ let mount_target = paths::join(&[&task_tmp_dir, paths::TASK_TMP_ROOTFS_SUBDIR]);
+ let rootfs = paths::depend_dir(&task.rootfs);
+
+ let depends = unpack_dependencies(input_hash, task).context("Failed to unpack dependencies")?;
+
+ let mut mounts = Stack::new();
+
+ mounts.push(
+ fs::mount(rootfs, &mount_target, None, MsFlags::MS_BIND, None)
+ .with_context(|| format!("Failed to bind mount rootfs to {:?}", mount_target))?,
+ );
+ mount::mount::<str, str, str, str>(
+ None,
+ &mount_target,
+ None,
+ MsFlags::MS_REMOUNT | MsFlags::MS_BIND | MsFlags::MS_RDONLY,
+ None,
+ )
+ .context("Failed to mount container rootfs read-only")?;
+
+ let (mut dirs, mut files) = get_contents(&mount_target, "")?;
+
+ for (path, dep_hashes) in depends {
+ assert!(!dep_hashes.is_empty());
+
+ if !path.is_empty() && !path.starts_with('/') {
+ return Err(Error::new(format!(
+ "Dependency path {:?} must be absolute",
+ path
+ )));
+ }
+
+ let dep_target = mount_target.clone() + &path;
+ let dep_paths: Box<[_]> = dep_hashes.iter().map(paths::depend_dir).collect();
+
+ for dep in dep_paths.iter() {
+ let (dep_dirs, dep_files) = get_contents(dep, &path)?;
+ check_conflicts(&dirs, &files, &dep_dirs, &dep_files)?;
+ dirs.extend(dep_dirs);
+ files.extend(dep_files);
+ }
+
+ let options = format!(
+ "xino=off,index=off,metacopy=off,lowerdir={lower}:{base}",
+ lower = dep_paths.join(":"),
+ base = dep_target,
+ );
+
+ mounts.push(
+ fs::mount(
+ "overlay",
+ dep_target.as_str(),
+ Some("overlay"),
+ MsFlags::MS_RDONLY,
+ Some(&options),
+ )
+ .with_context(|| format!("Failed to mount overlay to {:?}", dep_target))?,
+ );
+ }
+
+ Ok(mounts)
+}
+
+fn cleanup_task(input_hash: &InputHash) -> Result<()> {
+ let task_tmp_dir = paths::task_tmp_dir(input_hash);
+
+ fs::ensure_removed(task_tmp_dir).context("Failed to remove task tmp dir")?;
+
+ let task_state_dir = paths::task_state_dir(input_hash);
+ let task_work_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_WORK_SUBDIR]);
+ fs::ensure_removed(task_work_dir).context("Failed to remove overlayfs workdir")?;
+
+ Ok(())
+}
+
+fn get_archive_filename(task: &Task, hash: &ArchiveHash) -> String {
+ if let Some(pinned_name) = task.pins.get(hash) {
+ paths::pinned_archive_filename(pinned_name)
+ } else {
+ paths::archive_filename(hash)
+ }
+}
+
+fn unpack_dependency(task: &Task, hash: &ArchiveHash) -> Result<()> {
+ let _lock = unix::lock(paths::depend_lock_filename(hash), true, true);
+
+ let filename = get_archive_filename(task, hash);
+
+ let dest = paths::depend_dir(hash);
+ if Path::new(&dest).is_dir() {
+ return Ok(());
+ }
+
+ (|| -> Result<()> {
+ let tmp_dest = paths::depend_tmp_dir(hash);
+ fs::ensure_removed(&tmp_dest)?;
+
+ let file = fs::open(&filename)?;
+ let hasher = ArchiveHasher::new();
+ let buffered_hasher = BufWriter::with_capacity(16 * 1024 * 1024, hasher);
+ let mut reader = TeeReader::new(file, buffered_hasher, false);
+
+ tar::unpack(&mut reader, &tmp_dest)?;
+
+ // Read file to end to get the correct hash
+ io::copy(&mut reader, &mut io::sink())?;
+
+ let (_, buffered_hasher) = reader.into_inner();
+ let hasher = buffered_hasher.into_inner()?;
+
+ let actual_hash = ArchiveHash(StringHash(hasher.finalize().into()));
+
+ if &actual_hash != hash {
+ return Err(Error::new(format!(
+ "Incorrect file hash for {:?} (expected: {}, actual: {})",
+ filename, hash, actual_hash,
+ )));
+ }
+
+ fs::rename(&tmp_dest, &dest)?;
+
+ Ok(())
+ })()
+ .with_context(|| format!("Failed to unpack {:?}", filename))
+}
+
+fn unpack_dependencies(input_hash: &InputHash, task: &Task) -> Result<DependMap> {
+ let task_tmp_dir = paths::task_tmp_dir(input_hash);
+
+ unpack_dependency(task, &task.rootfs)?;
+
+ let mut ret = DependMap::new();
+
+ for dep in &task.depends {
+ match dep {
+ Dependency::Fetch {
+ name, target_dir, ..
+ } => {
+ let path = paths::join(&[&task_tmp_dir, target_dir]);
+ fs::mkdir(&path)?;
+ fs::copy(
+ paths::join(&[paths::DOWNLOADS_DIR, name]),
+ paths::join(&[&path, name]),
+ )?;
+ }
+ Dependency::Task { output, path } => {
+ unpack_dependency(task, output)?;
+ ret.entry(path.clone()).or_default().push(*output);
+ }
+ }
+ }
+
+ Ok(ret)
+}
+
+fn collect_output(input_hash: &InputHash, task: &Task, path: &str) -> Result<Option<ArchiveHash>> {
+ let source = paths::join(&[&paths::task_tmp_dir(input_hash), path]);
+ if !Path::new(&source).is_dir() {
+ return Ok(None);
+ }
+
+ let filename = paths::archive_tmp_filename(input_hash);
+
+ let hash = (|| -> Result<ArchiveHash> {
+ let file = fs::create(&filename)?;
+ let hasher = ArchiveHasher::new();
+ let writer = TeeWriter::new(file, hasher);
+ let mut buffered_writer = BufWriter::with_capacity(16 * 1024 * 1024, writer);
+
+ super::tar::pack(&task.rootfs, &mut buffered_writer, &source)?;
+
+ let writer = buffered_writer.into_inner()?;
+ let (file, hasher) = writer.into_inner();
+ file.sync_all()?;
+ drop(file);
+
+ Ok(ArchiveHash(StringHash(hasher.finalize().into())))
+ })()
+ .with_context(|| format!("Failed to pack {:?} to {:?}", source, filename))?;
+
+ fs::rename(filename, paths::archive_filename(&hash))?;
+
+ Ok(Some(hash))
+}
+
+fn collect_outputs(input_hash: &InputHash, task: &Task) -> Result<HashMap<String, ArchiveHash>> {
+ let mut ret = HashMap::new();
+
+ for (name, path) in &task.outputs {
+ if let Some(hash) = collect_output(input_hash, task, path)? {
+ ret.insert(name.clone(), hash);
+ }
+ }
+
+ Ok(ret)
+}
+
+fn run_task(input_hash: &InputHash, task: &Task, jobserver: &mut Jobserver) -> Result<()> {
+ let _workdir_mount = init_task(input_hash, task).context("Failed to initialize task")?;
+ let _rootfs_mounts =
+ init_task_rootfs(input_hash, task).context("Failed to initialize task rootfs")?;
+
+ let task_tmp_dir = paths::task_tmp_dir(input_hash);
+ let rootfs = paths::join(&[&task_tmp_dir, paths::TASK_TMP_ROOTFS_SUBDIR]);
+
+ let builddir_source = paths::join(&[&task_tmp_dir, paths::TASK_BUILDDIR]);
+ let builddir_target = paths::join(&[&rootfs, paths::TASK_BUILDDIR]);
+
+ let log_filename = paths::task_log_filename(input_hash);
+
+ let mut exec_cmd = || -> Result<()> {
+ let log = fs::create(&log_filename)?;
+
+ let log_stdout = log
+ .try_clone()
+ .context("Failed to duplicate log file descriptor")?;
+ let log_stderr = log
+ .try_clone()
+ .context("Failed to duplicate log file descriptor")?;
+
+ mount::mount::<_, _, str, str>(
+ Some(paths::DEV_DIR),
+ paths::join(&[&rootfs, "dev"]).as_str(),
+ None,
+ MsFlags::MS_BIND | MsFlags::MS_REC,
+ None,
+ )
+ .expect("Failed to bind mount /dev directory");
+ mount::mount::<_, _, str, str>(
+ Some(builddir_source.as_str()),
+ builddir_target.as_str(),
+ None,
+ MsFlags::MS_BIND | MsFlags::MS_REC,
+ None,
+ )
+ .expect("Failed to bind mount build directory");
+
+ ns::pivot_root(&rootfs);
+ mount::mount::<str, _, str, str>(
+ None,
+ "/",
+ None,
+ MsFlags::MS_PRIVATE | MsFlags::MS_REC,
+ None,
+ )
+ .context("Failed to set MS_PRIVATE for container root")?;
+ ns::container_mounts().context("Failed to set up container mounts")?;
+
+ unistd::sethostname("rebel-builder").context("Failed to set hostname")?;
+
+ prctl::set_no_new_privs().context("set_no_new_privs()")?;
+
+ unshare(CloneFlags::CLONE_NEWUSER | CloneFlags::CLONE_NEWNS)
+ .context("Failed to create user namespace")?;
+ ns::setup_userns(BUILD_UID, BUILD_GID, Uid::from_raw(0), Gid::from_raw(0));
+
+ jobserver
+ .set_cloexec(false)
+ .context("Failed to unset O_CLOEXEC on jobserver pipe")?;
+
+ let err = Command::new("sh")
+ .args(["-ex", &paths::join(&[paths::TASK_TASKDIR, paths::TASK_RUN])])
+ .stdin(Stdio::null())
+ .stdout(log_stdout)
+ .stderr(log_stderr)
+ .env_clear()
+ .env("PATH", "/usr/sbin:/usr/bin:/sbin:/bin")
+ .env("HOME", "/build")
+ .env("MAKEFLAGS", jobserver.to_makeflags())
+ .exec();
+ eprintln!("{}", err);
+ process::exit(127);
+ };
+
+ let pid = unsafe {
+ ns::spawn(
+ CloneFlags::CLONE_NEWNS
+ | CloneFlags::CLONE_NEWPID
+ | CloneFlags::CLONE_NEWIPC
+ | CloneFlags::CLONE_NEWNET
+ | CloneFlags::CLONE_NEWUTS,
+ || exec_cmd().unwrap(),
+ )
+ }
+ .context("Failed to run task container")?;
+
+ let status = wait::waitpid(pid, None)?;
+
+ if let Err(err) = status.check() {
+ return Err(Error::new(format!(
+ "Task failed: {}\nOutput: {}",
+ err, log_filename
+ )));
+ }
+
+ Ok(())
+}
+
+fn hash_layer(input_hash: &InputHash, task: &Task) -> Result<Option<LayerHash>> {
+ let task_state_dir = paths::task_state_dir(input_hash);
+ let task_layer_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_LAYER_SUBDIR]);
+
+ (|| -> Result<Option<LayerHash>> {
+ if fs::is_dir_empty(&task_layer_dir)? {
+ return Ok(None);
+ }
+
+ let hasher = LayerHasher::new();
+ let mut buffered_writer = BufWriter::with_capacity(16 * 1024 * 1024, hasher);
+
+ tar::pack(&task.rootfs, &mut buffered_writer, &task_layer_dir)?;
+
+ let hasher = buffered_writer.into_inner()?;
+ Ok(Some(LayerHash(StringHash(hasher.finalize().into()))))
+ })()
+ .with_context(|| format!("Failed to hash layer directory {:?}", task_layer_dir))
+}
+
+fn move_layer(input_hash: &InputHash, hash: &Option<LayerHash>) -> Result<()> {
+ let task_state_dir = paths::task_state_dir(input_hash);
+ let task_layer_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_LAYER_SUBDIR]);
+
+ if let Some(hash) = hash {
+ let layer_dir = paths::layer_dir(hash);
+
+ let err = match std::fs::rename(&task_layer_dir, &layer_dir) {
+ Ok(_) => return Ok(()),
+ Err(err) => err,
+ };
+
+ if !matches!(
+ err.raw_os_error(),
+ Some(libc::EEXIST) | Some(libc::ENOTEMPTY)
+ ) {
+ return Err(err).with_context(|| {
+ format!("Failed to rename {:?} to {:?}", task_layer_dir, layer_dir)
+ });
+ }
+ }
+
+ fs::ensure_removed(&task_layer_dir)
+}
+
+fn run_and_hash_task(
+ input_hash: &InputHash,
+ task: &Task,
+ jobserver: &mut Jobserver,
+) -> Result<TaskOutput> {
+ run_task(input_hash, task, jobserver)?;
+
+ let outputs = collect_outputs(input_hash, task)?;
+
+ let layer = hash_layer(input_hash, task)?;
+ move_layer(input_hash, &layer)?;
+
+ Ok(TaskOutput {
+ input_hash: Some(*input_hash),
+ layer,
+ outputs,
+ })
+}
+
+fn load_cached(input_hash: &InputHash) -> Result<TaskOutput> {
+ let filename = paths::task_cache_filename(input_hash);
+ let file = fs::open(&filename)?;
+
+ serde_json::from_reader(file)
+ .with_context(|| format!("Failed to read task cache data from {}", filename))
+}
+
+fn save_cached(input_hash: &InputHash, output: &TaskOutput) -> Result<()> {
+ fs::mkdir(paths::task_state_dir(input_hash))?;
+
+ let tmp_filename = paths::task_cache_tmp_filename(input_hash);
+ let filename = paths::task_cache_filename(input_hash);
+
+ cjson::to_file(&tmp_filename, output)
+ .with_context(|| format!("Failed to write task cache data to {}", tmp_filename))?;
+
+ fs::rename(tmp_filename, filename)?;
+
+ Ok(())
+}
+
+trait AsSecsF32 {
+ fn as_secs_f32(&self) -> f32;
+}
+
+impl AsSecsF32 for TimeVal {
+ fn as_secs_f32(&self) -> f32 {
+ self.tv_sec() as f32 + 1e-6 * (self.tv_usec() as f32)
+ }
+}
+
+fn get_usage(total: f32) -> String {
+ let usage = resource::getrusage(resource::UsageWho::RUSAGE_CHILDREN).expect("getrusage()");
+
+ let user = usage.user_time().as_secs_f32();
+ let system = usage.system_time().as_secs_f32();
+ let cpu = (100.0 * (user + system) / total).round();
+
+ format!("{user:.2}s user {system:.2}s system {cpu:.0}% cpu")
+}
+
+pub fn handle(task: Task, jobserver: &mut Jobserver) -> Result<TaskOutput> {
+ let input_hash = input_hash(&task);
+
+ let _lock = unix::lock(paths::task_lock_filename(&input_hash), true, true)
+ .context("Failed to get task lock")?;
+
+ let cached_output = load_cached(&input_hash);
+ if !task.force_run {
+ if let Ok(task_output) = cached_output {
+ return Ok(task_output);
+ }
+ }
+
+ let token = jobserver.wait();
+
+ let start_time = Instant::now();
+ println!("Starting task {} ({})", task.label, input_hash);
+
+ let task_ret = run_and_hash_task(&input_hash, &task, jobserver);
+ let cleanup_ret = cleanup_task(&input_hash);
+
+ jobserver.post(token);
+
+ let task_output = task_ret?;
+ cleanup_ret.context("Failed to clean up after task")?;
+
+ save_cached(&input_hash, &task_output)?;
+
+ let duration = Instant::now().duration_since(start_time).as_secs_f32();
+ let usage = get_usage(duration);
+ println!(
+ "Finished task {} ({}) in {:.2}s ({})",
+ task.label, input_hash, duration, usage,
+ );
+
+ if let Ok(cached_output) = cached_output {
+ if cached_output.outputs != task_output.outputs {
+ println!(
+ "WARNING: Output hashes for task {} do not match cached result\n Cached output: {}\n New output: {}",
+ task.label,
+ cjson::to_string(&cached_output.outputs).unwrap(),
+ cjson::to_string(&task_output.outputs).unwrap(),
+ );
+ }
+ }
+
+ Ok(task_output)
+}
diff --git a/crates/rebel-runner/src/util/checkable.rs b/crates/rebel-runner/src/util/checkable.rs
new file mode 100644
index 0000000..8528d29
--- /dev/null
+++ b/crates/rebel-runner/src/util/checkable.rs
@@ -0,0 +1,37 @@
+use std::{
+ io::{Error, ErrorKind, Result},
+ process::ExitStatus,
+};
+
+use nix::sys::wait;
+
+pub trait Checkable {
+ fn check(&self) -> Result<()>;
+}
+
+impl Checkable for ExitStatus {
+ fn check(&self) -> Result<()> {
+ if self.success() {
+ Ok(())
+ } else {
+ Err(Error::new(
+ ErrorKind::Other,
+ format!("Process exited with {}", self),
+ ))
+ }
+ }
+}
+
+impl Checkable for wait::WaitStatus {
+ fn check(&self) -> Result<()> {
+ let message = match self {
+ wait::WaitStatus::Exited(_, 0) => return Ok(()),
+ wait::WaitStatus::Exited(_, code) => format!("Process exited with exit code: {}", code),
+ wait::WaitStatus::Signaled(_, signal, _) => {
+ format!("Process exited with signal: {}", signal)
+ }
+ _ => format!("Process in unexpected status: {:?}", self),
+ };
+ Err(Error::new(ErrorKind::Other, message))
+ }
+}
diff --git a/crates/rebel-runner/src/util/cjson.rs b/crates/rebel-runner/src/util/cjson.rs
new file mode 100644
index 0000000..e3840ce
--- /dev/null
+++ b/crates/rebel-runner/src/util/cjson.rs
@@ -0,0 +1,37 @@
+use std::{
+ fs::File,
+ io::{self, Write},
+ path::Path,
+};
+
+use digest::{self, Digest};
+use olpc_cjson::CanonicalFormatter;
+use serde::Serialize;
+use serde_json::error::Result;
+
+pub fn new_serializer<W: Write>(writer: W) -> serde_json::Serializer<W, CanonicalFormatter> {
+ serde_json::Serializer::with_formatter(writer, CanonicalFormatter::new())
+}
+
+pub fn to_writer<W: Write, T: ?Sized + Serialize>(writer: W, value: &T) -> Result<()> {
+ let mut ser = new_serializer(writer);
+ value.serialize(&mut ser)
+}
+
+pub fn to_file<P: AsRef<Path>, T: ?Sized + Serialize>(path: P, value: &T) -> io::Result<()> {
+ let file = File::create(path)?;
+ to_writer(&file, value)?;
+ file.sync_all()
+}
+
+pub fn to_string<T: ?Sized + Serialize>(value: &T) -> Result<String> {
+ let mut ret = Vec::new();
+ to_writer(&mut ret, value)?;
+ Ok(String::from_utf8(ret).unwrap())
+}
+
+pub fn digest<D: Digest + Write, T: ?Sized + Serialize>(value: &T) -> Result<digest::Output<D>> {
+ let mut digest = <D as Digest>::new();
+ to_writer(&mut digest, value)?;
+ Ok(digest.finalize())
+}
diff --git a/crates/rebel-runner/src/util/clone.rs b/crates/rebel-runner/src/util/clone.rs
new file mode 100644
index 0000000..51a31c3
--- /dev/null
+++ b/crates/rebel-runner/src/util/clone.rs
@@ -0,0 +1,59 @@
+use std::{mem, process};
+
+use nix::{
+ errno, sched,
+ unistd::{self, Pid},
+};
+
+#[repr(C)]
+#[derive(Debug, Default)]
+struct CloneArgs {
+ flags: u64,
+ pidfd: u64,
+ child_tid: u64,
+ parent_tid: u64,
+ exit_signal: u64,
+ stack: u64,
+ stack_size: u64,
+ tls: u64,
+}
+
+pub unsafe fn clone(flags: sched::CloneFlags) -> nix::Result<unistd::ForkResult> {
+ let mut args = CloneArgs {
+ flags: flags.bits() as u64,
+ exit_signal: libc::SIGCHLD as u64,
+ ..CloneArgs::default()
+ };
+ let size = mem::size_of_val(&args) as libc::size_t;
+
+ let pid = libc::syscall(libc::SYS_clone3, &mut args, size);
+
+ #[allow(clippy::comparison_chain)]
+ if pid < 0 {
+ Err(errno::Errno::last())
+ } else if pid == 0 {
+ Ok(unistd::ForkResult::Child)
+ } else {
+ Ok(unistd::ForkResult::Parent {
+ child: Pid::from_raw(pid as libc::pid_t),
+ })
+ }
+}
+
+pub unsafe fn spawn<F>(flags: Option<sched::CloneFlags>, f: F) -> nix::Result<Pid>
+where
+ F: FnOnce(),
+{
+ let res = if let Some(flags) = flags {
+ clone(flags)
+ } else {
+ unistd::fork()
+ };
+ match res? {
+ unistd::ForkResult::Parent { child } => Ok(child),
+ unistd::ForkResult::Child => {
+ f();
+ process::exit(0)
+ }
+ }
+}
diff --git a/crates/rebel-runner/src/util/fs.rs b/crates/rebel-runner/src/util/fs.rs
new file mode 100644
index 0000000..9e33eb7
--- /dev/null
+++ b/crates/rebel-runner/src/util/fs.rs
@@ -0,0 +1,127 @@
+use std::{
+ fs::{self, File},
+ io,
+ os::unix::prelude::*,
+ path::{Path, PathBuf},
+};
+
+use nix::{
+ fcntl::OFlag,
+ mount::{self, MsFlags},
+ unistd,
+};
+
+use rebel_common::error::*;
+
+pub fn open<P: AsRef<Path>>(path: P) -> Result<fs::File> {
+ fs::File::open(path.as_ref())
+ .with_context(|| format!("Failed to open file {:?} for reading", path.as_ref()))
+}
+
+pub fn create<P: AsRef<Path>>(path: P) -> Result<fs::File> {
+ fs::File::create(path.as_ref())
+ .with_context(|| format!("Failed to open file {:?} for writing", path.as_ref()))
+}
+
+pub fn rename<P1: AsRef<Path>, P2: AsRef<Path>>(from: P1, to: P2) -> Result<()> {
+ fs::rename(from.as_ref(), to.as_ref())
+ .with_context(|| format!("Failed to rename {:?} to {:?}", from.as_ref(), to.as_ref()))
+}
+
+// Unlike fs::copy, this doesn't preserve file flags
+pub fn copy<P1: AsRef<Path>, P2: AsRef<Path>>(from: P1, to: P2) -> Result<()> {
+ (|| -> Result<()> {
+ let mut src = open(from.as_ref())?;
+ let mut dest = create(to.as_ref())?;
+ io::copy(&mut src, &mut dest)?;
+ dest.sync_all()?;
+ Ok(())
+ })()
+ .with_context(|| format!("Failed to copy {:?} to {:?}", from.as_ref(), to.as_ref()))
+}
+
+pub fn mkdir<P: AsRef<Path>>(path: P) -> Result<()> {
+ let mut builder = fs::DirBuilder::new();
+ builder.recursive(true);
+ builder
+ .create(path.as_ref())
+ .with_context(|| format!("Failed to create directory {:?}", path.as_ref()))
+}
+
+pub fn ensure_removed<P: AsRef<Path>>(path: P) -> Result<()> {
+ let result = if path.as_ref().is_dir() {
+ fs::remove_dir_all(path.as_ref())
+ } else {
+ fs::remove_file(path.as_ref())
+ };
+ result
+ .or_else(|err| match err.kind() {
+ io::ErrorKind::NotFound => Ok(()),
+ _ => Err(err),
+ })
+ .with_context(|| format!("Failed to delete {:?}", path.as_ref()))
+}
+
+pub fn is_dir_empty<P: AsRef<Path>>(path: P) -> Result<bool> {
+ Ok(fs::read_dir(path)?.next().is_none())
+}
+
+/// Fixes up weirdness of set-group-ID or bsdgroups
+pub fn fixup_permissions<P: AsRef<Path>>(path: P) -> Result<()> {
+ let path = path.as_ref();
+ let gid = unistd::getegid();
+
+ let metadata = path
+ .metadata()
+ .with_context(|| format!("Failed to get metadata of {:?}", path))?;
+
+ if metadata.gid() != gid.as_raw() {
+ unistd::chown(path, None, Some(gid))
+ .with_context(|| format!("Failed to set group of {:?}", path))?;
+ }
+
+ let mut perms = metadata.permissions();
+ let mode = perms.mode();
+ if (mode & 0o777) != mode {
+ perms.set_mode(mode & 0o777);
+ std::fs::set_permissions(path, perms)
+ .with_context(|| format!("Failed to set mode of {:?}", path))?;
+ }
+
+ Ok(())
+}
+
+#[must_use]
+pub struct Mount(PathBuf);
+
+impl Drop for Mount {
+ fn drop(&mut self) {
+ mount::umount(&self.0)
+ .with_context(|| format!("Failed to unmount {:?}", self.0))
+ .unwrap();
+ }
+}
+
+pub fn mount<P1: AsRef<Path>, P2: AsRef<Path>>(
+ source: P1,
+ target: P2,
+ fstype: Option<&str>,
+ flags: MsFlags,
+ data: Option<&str>,
+) -> Result<Mount> {
+ mkdir(target.as_ref()).with_context(|| format!("Failed to create {:?}", target.as_ref()))?;
+
+ let canon_target = target
+ .as_ref()
+ .canonicalize()
+ .with_context(|| format!("Failed to get absolute path for {:?}", target.as_ref()))?;
+ mount::mount(Some(source.as_ref()), &canon_target, fstype, flags, data)
+ .with_context(|| format!("Failed to mount {:?}", canon_target))?;
+ Ok(Mount(canon_target))
+}
+
+pub fn pipe() -> Result<(File, File)> {
+ unistd::pipe2(OFlag::O_CLOEXEC)
+ .context("pipe2()")
+ .map(|(piper, pipew)| (File::from(piper), File::from(pipew)))
+}
diff --git a/crates/rebel-runner/src/util/mod.rs b/crates/rebel-runner/src/util/mod.rs
new file mode 100644
index 0000000..0fbe3b5
--- /dev/null
+++ b/crates/rebel-runner/src/util/mod.rs
@@ -0,0 +1,7 @@
+pub mod checkable;
+pub mod cjson;
+pub mod clone;
+pub mod fs;
+pub mod stack;
+pub mod steal;
+pub mod unix;
diff --git a/crates/rebel-runner/src/util/stack.rs b/crates/rebel-runner/src/util/stack.rs
new file mode 100644
index 0000000..15d5daf
--- /dev/null
+++ b/crates/rebel-runner/src/util/stack.rs
@@ -0,0 +1,25 @@
+use std::mem;
+
+/// Simple inefficient datastructure with guaranteed drop order
+#[derive(Debug)]
+pub enum Stack<T> {
+ Cons(Box<(T, Stack<T>)>),
+ Empty,
+}
+
+impl<T> Default for Stack<T> {
+ fn default() -> Self {
+ Self::Empty
+ }
+}
+
+impl<T> Stack<T> {
+ pub fn new() -> Self {
+ Self::Empty
+ }
+
+ pub fn push(&mut self, value: T) {
+ let tmp = mem::take(self);
+ *self = Stack::Cons(Box::new((value, tmp)));
+ }
+}
diff --git a/crates/rebel-runner/src/util/steal.rs b/crates/rebel-runner/src/util/steal.rs
new file mode 100644
index 0000000..91b2cdf
--- /dev/null
+++ b/crates/rebel-runner/src/util/steal.rs
@@ -0,0 +1,40 @@
+use std::ops::{Deref, DerefMut};
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
+pub struct Steal<T>(pub Option<T>);
+
+impl<T> Steal<T> {
+ pub fn new(value: T) -> Steal<T> {
+ Steal(Some(value))
+ }
+
+ pub fn steal(&mut self) -> T {
+ self.0
+ .take()
+ .expect("Attempted to steal already stoken value")
+ }
+}
+
+impl<T> From<T> for Steal<T> {
+ fn from(value: T) -> Self {
+ Steal::new(value)
+ }
+}
+
+impl<T> Deref for Steal<T> {
+ type Target = T;
+
+ fn deref(&self) -> &Self::Target {
+ self.0
+ .as_ref()
+ .expect("Attempted to dereference stolen value")
+ }
+}
+
+impl<T> DerefMut for Steal<T> {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ self.0
+ .as_mut()
+ .expect("Attempted to dereference stolen value")
+ }
+}
diff --git a/crates/rebel-runner/src/util/unix.rs b/crates/rebel-runner/src/util/unix.rs
new file mode 100644
index 0000000..a97b1db
--- /dev/null
+++ b/crates/rebel-runner/src/util/unix.rs
@@ -0,0 +1,86 @@
+use std::{fs::File, os::unix::prelude::*, path::Path};
+
+use nix::{
+ fcntl::{self, FcntlArg, FdFlag, Flock, OFlag},
+ sched,
+ unistd::Pid,
+};
+
+use rebel_common::error::*;
+
+use super::fs;
+
+pub fn set_blocking<Fd: AsFd>(fd: &Fd, blocking: bool) -> Result<()> {
+ let raw_fd = fd.as_fd().as_raw_fd();
+
+ let flags =
+ OFlag::from_bits_retain(fcntl::fcntl(raw_fd, FcntlArg::F_GETFL).context("fcntl(F_GETFL)")?);
+
+ let new_flags = if blocking {
+ flags & !OFlag::O_NONBLOCK
+ } else {
+ flags | OFlag::O_NONBLOCK
+ };
+
+ if new_flags != flags {
+ fcntl::fcntl(raw_fd, FcntlArg::F_SETFL(new_flags)).context("fcntl(F_SETFL)")?;
+ }
+
+ Ok(())
+}
+
+pub fn set_cloexec<Fd: AsFd>(fd: &Fd, cloexec: bool) -> Result<()> {
+ let raw_fd = fd.as_fd().as_raw_fd();
+
+ let flags = FdFlag::from_bits_retain(
+ fcntl::fcntl(raw_fd, FcntlArg::F_GETFD).context("fcntl(F_GETFD)")?,
+ );
+
+ let new_flags = if cloexec {
+ flags | FdFlag::FD_CLOEXEC
+ } else {
+ flags & !FdFlag::FD_CLOEXEC
+ };
+
+ if new_flags != flags {
+ fcntl::fcntl(raw_fd, FcntlArg::F_SETFD(new_flags)).context("fcntl(F_SETFD)")?;
+ }
+
+ Ok(())
+}
+
+pub fn nproc() -> Result<usize> {
+ const MAXCPU: usize = sched::CpuSet::count();
+
+ let affinity = sched::sched_getaffinity(Pid::from_raw(0)).context("sched_getaffinity()")?;
+
+ let mut count = 0;
+
+ for cpu in 0..MAXCPU {
+ if affinity.is_set(cpu).unwrap() {
+ count += 1;
+ }
+ }
+
+ Ok(count)
+}
+
+pub fn lock<P: AsRef<Path>>(path: P, exclusive: bool, blocking: bool) -> Result<Flock<File>> {
+ use fcntl::FlockArg::*;
+
+ if let Some(parent) = path.as_ref().parent() {
+ fs::mkdir(parent)?;
+ }
+
+ let arg = match (exclusive, blocking) {
+ (true, true) => LockExclusive,
+ (true, false) => LockExclusiveNonblock,
+ (false, true) => LockShared,
+ (false, false) => LockSharedNonblock,
+ };
+
+ let file = fs::create(path.as_ref())?;
+ fcntl::Flock::lock(file, arg)
+ .map_err(|(_, errno)| errno)
+ .with_context(|| format!("flock failed on {:?}", path.as_ref()))
+}