summaryrefslogtreecommitdiffstats
path: root/crates
diff options
context:
space:
mode:
authorMatthias Schiffer <mschiffer@universe-factory.net>2021-10-25 00:19:45 +0200
committerMatthias Schiffer <mschiffer@universe-factory.net>2021-10-25 00:19:45 +0200
commit34ac18d20c13a78914d447fee83204811a27b1e4 (patch)
tree56763d4ea46927105fcc6a71e03d5bd75a6947a6 /crates
parenta1a185370da27f2cc3df84d3a8d7141f9ce7db16 (diff)
downloadrebel-34ac18d20c13a78914d447fee83204811a27b1e4.tar
rebel-34ac18d20c13a78914d447fee83204811a27b1e4.zip
Move runner into separate crate
Diffstat (limited to 'crates')
-rw-r--r--crates/runner/Cargo.toml23
-rw-r--r--crates/runner/src/init.rs71
-rw-r--r--crates/runner/src/jobserver.rs77
-rw-r--r--crates/runner/src/lib.rs139
-rw-r--r--crates/runner/src/ns.rs84
-rw-r--r--crates/runner/src/paths.rs106
-rw-r--r--crates/runner/src/tar.rs87
-rw-r--r--crates/runner/src/task.rs460
-rw-r--r--crates/runner/src/util/cjson.rs31
-rw-r--r--crates/runner/src/util/clone.rs58
-rw-r--r--crates/runner/src/util/fs.rs122
-rw-r--r--crates/runner/src/util/mod.rs42
-rw-r--r--crates/runner/src/util/unix.rs61
13 files changed, 1361 insertions, 0 deletions
diff --git a/crates/runner/Cargo.toml b/crates/runner/Cargo.toml
new file mode 100644
index 0000000..b69e51e
--- /dev/null
+++ b/crates/runner/Cargo.toml
@@ -0,0 +1,23 @@
+[package]
+name = "rebel-runner"
+version = "0.1.0"
+authors = ["Matthias Schiffer <mschiffer@universe-factory.net>"]
+license = "MIT"
+edition = "2018"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+common = { path = "../common", package = "rebel-common" }
+
+blake3 = { version = "1.0.0", features = ["traits-preview"] }
+capctl = "0.2.0"
+digest = "0.9.0"
+ipc-channel = { git = "https://github.com/servo/ipc-channel.git" }
+libc = "0.2.84"
+nix = "0.23.0"
+olpc-cjson = "0.1.0"
+serde = { version = "1", features = ["derive"] }
+serde_json = "1.0.62"
+tar = "0.4.32"
+tee_readwrite = "0.1.0"
diff --git a/crates/runner/src/init.rs b/crates/runner/src/init.rs
new file mode 100644
index 0000000..783faf4
--- /dev/null
+++ b/crates/runner/src/init.rs
@@ -0,0 +1,71 @@
+use std::fs::File;
+
+use nix::mount::{self, MsFlags};
+
+use common::error::*;
+
+use super::{tar, util::fs};
+use crate::paths;
+
+fn prepare_rootfs(rootfs: &str) -> Result<()> {
+ tar::unpack(File::open(paths::ROOTFS_ARCHIVE)?, rootfs)
+ .context("Unpacking build container rootfs failed")?;
+
+ mount::mount::<_, _, str, str>(Some(rootfs), rootfs, None, MsFlags::MS_BIND, None)
+ .context("Failed to bind mount container rootfs")?;
+
+ for dir in IntoIterator::into_iter(["pts", "shm"]) {
+ fs::mkdir(paths::join(&[rootfs, "dev", dir]))?;
+ }
+
+ for (link, target) in IntoIterator::into_iter([
+ ("fd", "/proc/self/fd"),
+ ("stdin", "/proc/self/fd/0"),
+ ("stdout", "/proc/self/fd/1"),
+ ("stderr", "/proc/self/fd/2"),
+ ("ptmx", "pts/ptmx"),
+ ]) {
+ let path = paths::join(&[rootfs, "dev", link]);
+ std::os::unix::fs::symlink(target, &path)
+ .with_context(|| format!("Failed to create link {}", path))?;
+ }
+
+ for dev in IntoIterator::into_iter(["null", "zero", "full", "random", "urandom", "tty"]) {
+ let source = paths::join(&["/dev", dev]);
+ let target = paths::join(&[rootfs, "dev", dev]);
+ fs::create(&target)?;
+ mount::mount::<str, str, str, str>(Some(&source), &target, None, MsFlags::MS_BIND, None)
+ .with_context(|| format!("Failed to bind mount {}", source))?;
+ }
+
+ mount::mount::<str, _, str, str>(
+ None,
+ rootfs,
+ None,
+ MsFlags::MS_REMOUNT | MsFlags::MS_BIND | MsFlags::MS_RDONLY,
+ None,
+ )
+ .context("Failed to mount container rootfs read-only")?;
+
+ Ok(())
+}
+
+pub fn init_runner() -> Result<()> {
+ fs::mkdir(paths::LAYER_STATE_DIR)?;
+ fs::mkdir(paths::OUTPUT_STATE_DIR)?;
+
+ mount::mount::<_, _, _, str>(
+ Some("buildtmp"),
+ paths::TMP_DIR,
+ Some("tmpfs"),
+ MsFlags::empty(),
+ None,
+ )
+ .context("Mounting build tmpfs failed")?;
+ mount::mount::<str, _, str, str>(None, paths::TMP_DIR, None, MsFlags::MS_PRIVATE, None)
+ .context("Failed to set MS_PRIVATE for build tmpfs")?;
+
+ prepare_rootfs(paths::ROOTFS_DIR)?;
+
+ Ok(())
+}
diff --git a/crates/runner/src/jobserver.rs b/crates/runner/src/jobserver.rs
new file mode 100644
index 0000000..3b6c856
--- /dev/null
+++ b/crates/runner/src/jobserver.rs
@@ -0,0 +1,77 @@
+use std::{os::unix::prelude::RawFd, slice};
+
+use nix::{errno::Errno, fcntl::OFlag, poll, unistd};
+
+use common::error::*;
+
+use super::util::unix;
+
+#[derive(Debug)]
+pub struct Jobserver {
+ tokens: usize,
+ r: RawFd,
+ w: RawFd,
+}
+
+impl Jobserver {
+ pub fn new(tokens: usize) -> Result<Jobserver> {
+ let (r, w) = unistd::pipe2(OFlag::O_CLOEXEC | OFlag::O_NONBLOCK).context("pipe()")?;
+
+ for _ in 0..tokens {
+ if let Err(_) = unistd::write(w, b"+") {
+ break;
+ }
+ }
+ unix::set_blocking(w, true)?;
+
+ Ok(Jobserver { tokens, r, w })
+ }
+
+ pub fn wait(&mut self) -> u8 {
+ loop {
+ poll::poll(
+ &mut [poll::PollFd::new(self.r, poll::PollFlags::POLLIN)],
+ -1,
+ )
+ .expect("poll()");
+
+ let mut token = 0;
+ match unistd::read(self.r, slice::from_mut(&mut token)) {
+ Ok(n) => {
+ assert!(n == 1);
+ return token;
+ }
+ Err(Errno::EAGAIN) => {
+ // Token was sniped by another task
+ continue;
+ }
+ error @ Err(_) => {
+ error.expect("read()");
+ }
+ }
+ }
+ }
+
+ pub fn post(&mut self, token: u8) {
+ let n = unistd::write(self.w, slice::from_ref(&token)).expect("write()");
+ assert!(n == 1);
+ }
+
+ pub fn set_cloexec(&mut self, cloexec: bool) -> Result<()> {
+ unix::set_cloexec(self.r, cloexec)?;
+ unix::set_cloexec(self.w, cloexec)?;
+ Ok(())
+ }
+
+ pub fn to_makeflags(&self) -> String {
+ format!(" -j{} --jobserver-auth={},{}", self.tokens, self.r, self.w)
+ }
+}
+
+impl Drop for Jobserver {
+ fn drop(&mut self) {
+ // FIXME Logging
+ let _ = unistd::close(self.r);
+ let _ = unistd::close(self.w);
+ }
+}
diff --git a/crates/runner/src/lib.rs b/crates/runner/src/lib.rs
new file mode 100644
index 0000000..658a833
--- /dev/null
+++ b/crates/runner/src/lib.rs
@@ -0,0 +1,139 @@
+mod init;
+mod jobserver;
+mod ns;
+pub mod paths;
+mod tar;
+mod task;
+mod util;
+
+use capctl::prctl;
+use ipc_channel::ipc;
+use nix::{
+ sched::CloneFlags,
+ sys::{signal, stat, wait},
+ unistd::{self, Gid, Uid},
+};
+use serde::{Deserialize, Serialize};
+
+use common::{error::*, types::*};
+
+use self::{
+ jobserver::Jobserver,
+ util::{clone, unix},
+};
+
+#[derive(Debug, Deserialize, Serialize)]
+enum Message {
+ Request(Task, ipc::IpcSender<Result<TaskOutput>>),
+}
+
+fn handle_request(
+ jobserver: Jobserver,
+ channel: ipc::IpcReceiver<Message>,
+ task: Task,
+ reply_channel: ipc::IpcSender<Result<TaskOutput>>,
+) -> (Jobserver, ipc::IpcReceiver<Message>) {
+ let child = |(mut jobserver, channel): (Jobserver, ipc::IpcReceiver<Message>)| {
+ drop(channel);
+ unsafe { signal::signal(signal::Signal::SIGCHLD, signal::SigHandler::SigDfl) }.unwrap();
+
+ prctl::set_name(&task.label).expect("prctl(PR_SET_NAME)");
+
+ let token = jobserver.wait();
+ let (pid, mut jobserver) = unsafe {
+ clone::spawn(None, jobserver, |jobserver| {
+ let result = task::handle(task, jobserver);
+ reply_channel.send(result).expect("IPC send() failed");
+ })
+ }
+ .expect("fork()");
+ let wait_res = wait::waitpid(pid, None);
+ jobserver.post(token);
+ wait_res.expect("waitpid()");
+ };
+
+ unsafe { clone::spawn(None, (jobserver, channel), child) }
+ .expect("fork()")
+ .1
+}
+
+fn runner_loop(mut channel: ipc::IpcReceiver<Message>) {
+ let nproc = unix::nproc().expect("Failed to get number of available CPUs");
+ let mut jobserver = Jobserver::new(nproc).expect("Failed to initialize jobserver pipe");
+
+ while let Ok(msg) = channel.recv() {
+ match msg {
+ Message::Request(task, reply_channel) => {
+ let ret = handle_request(jobserver, channel, task, reply_channel);
+ jobserver = ret.0;
+ channel = ret.1;
+ }
+ }
+ }
+}
+
+fn runner(uid: Uid, gid: Gid, channel: ipc::IpcReceiver<Message>) {
+ ns::mount_proc();
+ ns::setup_userns(Uid::from_raw(0), Gid::from_raw(0), uid, gid);
+
+ stat::umask(stat::Mode::from_bits_truncate(0o022));
+
+ init::init_runner().unwrap();
+
+ let msg_handler = unsafe {
+ clone::spawn(None, (), |()| {
+ signal::signal(signal::Signal::SIGCHLD, signal::SigHandler::SigIgn).unwrap();
+ runner_loop(channel);
+ })
+ }
+ .expect("fork()")
+ .0;
+
+ loop {
+ let status = wait::wait().expect("wait()");
+ if status.pid() == Some(msg_handler) {
+ break;
+ }
+ }
+}
+
+pub struct Runner {
+ channel: ipc::IpcSender<Message>,
+}
+
+impl Runner {
+ /// Creates a new container runner
+ ///
+ /// Unsafe: Do not call in multithreaded processes
+ pub unsafe fn new() -> Result<Self> {
+ let uid = unistd::geteuid();
+ let gid = unistd::getegid();
+
+ let (tx, rx) = ipc::channel().expect("IPC channel creation failed");
+
+ let (tx, _rx) = clone::spawn(
+ Some(CloneFlags::CLONE_NEWUSER | CloneFlags::CLONE_NEWNS | CloneFlags::CLONE_NEWPID),
+ (tx, rx),
+ |(tx, rx)| {
+ drop(tx);
+ runner(uid, gid, rx);
+ },
+ )
+ .expect("clone()")
+ .1;
+
+ Ok(Runner { channel: tx })
+ }
+}
+
+impl Runner {
+ pub fn spawn(&self, task: &Task) -> ipc::IpcReceiver<Result<TaskOutput>> {
+ let (reply_tx, reply_rx) = ipc::channel().expect("IPC channel creation failed");
+
+ self.channel
+ .send(Message::Request(task.clone(), reply_tx))
+ .expect("ContainerRunner task submission failed");
+
+ reply_rx
+ }
+}
diff --git a/crates/runner/src/ns.rs b/crates/runner/src/ns.rs
new file mode 100644
index 0000000..a075931
--- /dev/null
+++ b/crates/runner/src/ns.rs
@@ -0,0 +1,84 @@
+use nix::{
+ mount::{self, MntFlags, MsFlags},
+ sched::CloneFlags,
+ unistd::{self, Gid, Pid, Uid},
+};
+
+use common::error::*;
+
+use super::util::clone;
+
+pub fn mount_proc() {
+ mount::mount::<_, _, _, str>(Some("proc"), "/proc", Some("proc"), MsFlags::empty(), None)
+ .expect("Failed to mount /proc");
+}
+
+pub fn setup_userns(inner_uid: Uid, inner_gid: Gid, outer_uid: Uid, outer_gid: Gid) {
+ std::fs::write("/proc/self/setgroups", "deny").expect("Failed to write /proc/self/setgroups");
+ std::fs::write(
+ "/proc/self/uid_map",
+ &format!("{} {} 1", inner_uid, outer_uid),
+ )
+ .expect("Failed to write /proc/self/uid_map");
+ std::fs::write(
+ "/proc/self/gid_map",
+ &format!("{} {} 1", inner_gid, outer_gid),
+ )
+ .expect("Failed to write /proc/self/gid_map");
+}
+
+pub unsafe fn spawn<T, F>(flags: CloneFlags, arg: T, f: F) -> nix::Result<(Pid, T)>
+where
+ F: FnOnce(T),
+{
+ assert!(flags.contains(CloneFlags::CLONE_NEWNS) || !flags.contains(CloneFlags::CLONE_NEWPID));
+
+ clone::spawn(Some(flags), arg, |arg| {
+ if flags.contains(CloneFlags::CLONE_NEWPID) {
+ mount_proc();
+ }
+ f(arg)
+ })
+}
+
+pub fn pivot_root(path: &str) {
+ (|| -> Result<()> {
+ unistd::chdir(path).context("chdir()")?;
+ mount::mount::<_, _, str, str>(Some("/proc"), "proc", None, MsFlags::MS_BIND, None)
+ .context("Failed to bind mount /proc")?;
+ unistd::pivot_root(".", ".").context("pivot_root()")?;
+ mount::umount2(".", MntFlags::MNT_DETACH).context("umount2()")?;
+ unistd::chdir("/").context("chdir(\"/\")")?;
+ Ok(())
+ })()
+ .expect("Failed to pivot root");
+}
+
+pub fn container_mounts() -> Result<()> {
+ mount::mount(
+ Some("tmp"),
+ "/tmp",
+ Some("tmpfs"),
+ MsFlags::MS_NODEV | MsFlags::MS_NOSUID,
+ Some("mode=1777,size=1048576k"),
+ )
+ .context("Failed to mount /tmp")?;
+ mount::mount(
+ Some("devpts"),
+ "/dev/pts",
+ Some("devpts"),
+ MsFlags::MS_NOSUID | MsFlags::MS_NOEXEC,
+ Some("newinstance,ptmxmode=0666,mode=0620"),
+ )
+ .context("Failed to mount /dev/pts")?;
+ mount::mount(
+ Some("shm"),
+ "/dev/shm",
+ Some("tmpfs"),
+ MsFlags::MS_NOSUID | MsFlags::MS_NOEXEC | MsFlags::MS_NODEV,
+ Some("mode=1777,size=65536k"),
+ )
+ .context("Failed to mount /dev/shm")?;
+
+ Ok(())
+}
diff --git a/crates/runner/src/paths.rs b/crates/runner/src/paths.rs
new file mode 100644
index 0000000..632f030
--- /dev/null
+++ b/crates/runner/src/paths.rs
@@ -0,0 +1,106 @@
+//! Build directory structure used through rebel
+//!
+//! # Current structure
+//!
+//! ```text
+//! build/
+//! ├── rootfs.tar
+//! ├── downloads/
+//! │   └── ...
+//! ├── state/
+//! │   ├── output/
+//! │   │  ├── <input hash>.tar.tmp # during packing
+//! │   │  ├── <archive hash>.tar # files are renamed when packing is finished
+//! │   │   └── ...
+//! │   ├── layer/
+//! │   │ ├── <layer hash>/ # overlayfs layer dir of finished tasks
+//! │   │ └── ...
+//! │   └── task/
+//! │     ├── <input hash>/
+//! │ │ ├── layer/ # overlayfs layer dir (moved to layer/ after build)
+//! │ │ ├── work/ # overlayfs work dir (discarded after build)
+//! │ │ ├── task.json.tmp # during write
+//! │ │ └── task.json # after write
+//! │   └── ...
+//! └── tmp/ # tmpfs (mounted on start of rebel)
+//!    ├── rootfs/ # unpacked rootfs.tar
+//!    └── task/
+//!    └── <input hash>/
+//! ├── build/ # mount point for /build directory
+//! │ ├── downloads/ # downloaded sources
+//! │ ├── dest/ # collected as output after build
+//! │ ├── sysroot/ # sysroot mountpoint
+//! │ ├── task/ # internal runner files
+//! │ └── work/ # build overlay mountpoint
+//! ├── rootfs/ # rootfs overlay mountpoint
+//! └── depends/ # overlayed on rootfs in container
+//! ```
+
+use common::string_hash::*;
+
+pub const ROOTFS_ARCHIVE: &str = "build/rootfs.tar";
+
+pub const DOWNLOADS_DIR: &str = "build/downloads";
+
+pub const TMP_DIR: &str = "build/tmp";
+pub const ROOTFS_DIR: &str = "build/tmp/rootfs";
+pub const TASK_TMP_DIR: &str = "build/tmp/task";
+
+pub const TASK_TMP_ROOTFS_SUBDIR: &str = "rootfs";
+pub const TASK_TMP_DEPENDS_SUBDIR: &str = "depends";
+
+pub const OUTPUT_STATE_DIR: &str = "build/state/output";
+pub const TASK_STATE_DIR: &str = "build/state/task";
+pub const LAYER_STATE_DIR: &str = "build/state/layer";
+
+pub const TASK_STATE_LAYER_SUBDIR: &str = "layer";
+pub const TASK_STATE_WORK_SUBDIR: &str = "work";
+
+pub const TASK_BUILDDIR: &str = "build";
+pub const TASK_DESTDIR: &str = "build/dest";
+pub const TASK_DLDIR: &str = "build/downloads";
+pub const TASK_TASKDIR: &str = "build/task";
+pub const TASK_WORKDIR: &str = "build/work";
+pub const TASK_SYSROOT: &str = "opt/toolchain/sysroot";
+
+pub const TASK_RUN: &str = "run";
+
+pub fn join(paths: &[&str]) -> String {
+ paths.join("/")
+}
+
+pub fn abs(path: &str) -> String {
+ join(&["", path])
+}
+
+pub fn task_tmp_dir(hash: &InputHash) -> String {
+ join(&[TASK_TMP_DIR, &hash.to_string()])
+}
+
+pub fn task_state_dir(hash: &InputHash) -> String {
+ join(&[TASK_STATE_DIR, &hash.to_string()])
+}
+
+pub fn task_cache_tmp_filename(hash: &InputHash) -> String {
+ join(&[TASK_STATE_DIR, &hash.to_string(), "task.json.tmp"])
+}
+
+pub fn task_cache_filename(hash: &InputHash) -> String {
+ join(&[TASK_STATE_DIR, &hash.to_string(), "task.json"])
+}
+
+pub fn task_log_filename(hash: &InputHash) -> String {
+ join(&[TASK_STATE_DIR, &hash.to_string(), "task.log"])
+}
+
+pub fn layer_dir(hash: &LayerHash) -> String {
+ join(&[LAYER_STATE_DIR, &hash.to_string()])
+}
+
+pub fn archive_tmp_filename(hash: &InputHash) -> String {
+ join(&[OUTPUT_STATE_DIR, &format!("{}.tar.tmp", hash)])
+}
+
+pub fn archive_filename(hash: &ArchiveHash) -> String {
+ join(&[OUTPUT_STATE_DIR, &format!("{}.tar", hash)])
+}
diff --git a/crates/runner/src/tar.rs b/crates/runner/src/tar.rs
new file mode 100644
index 0000000..9306775
--- /dev/null
+++ b/crates/runner/src/tar.rs
@@ -0,0 +1,87 @@
+use std::{
+ fs::File,
+ io::{self, Read, Write},
+ os::unix::prelude::CommandExt,
+ path::Path,
+ process::{self, Command, Stdio},
+};
+
+use nix::{
+ mount::{self, MsFlags},
+ sched::CloneFlags,
+ sys::wait,
+};
+
+use common::error::*;
+
+use super::{
+ ns,
+ util::{fs, Checkable},
+};
+use crate::paths;
+
+pub fn pack<W: Write, P: AsRef<Path>>(archive: &mut W, source: P) -> Result<()> {
+ let (mut piper, pipew) = fs::pipe()?;
+
+ let exec_tar = |stdout: File| -> Result<()> {
+ // We are in our own mount namespace, so mounting into the shared ROOTFS_DIR is fine
+ let mount_target = paths::join(&[paths::ROOTFS_DIR, paths::TASK_BUILDDIR]);
+ mount::mount::<_, _, str, str>(
+ Some(source.as_ref()),
+ mount_target.as_str(),
+ None,
+ MsFlags::MS_BIND,
+ None,
+ )?;
+
+ ns::pivot_root(paths::ROOTFS_DIR);
+
+ let err = Command::new("tar")
+ .args(&[
+ "-c",
+ "--sort=name",
+ "--numeric-owner",
+ "--owner=0",
+ "--group=0",
+ "--mtime=@0",
+ ".",
+ ])
+ .stdin(Stdio::null())
+ .stdout(stdout)
+ .current_dir(paths::TASK_BUILDDIR)
+ .env_clear()
+ .env("PATH", "/usr/sbin:/usr/bin:/sbin:/bin")
+ .exec();
+ eprintln!("{}", err);
+ process::exit(127);
+ };
+
+ let (pid, pipew) = unsafe {
+ ns::spawn(CloneFlags::CLONE_NEWNS, pipew, |pipew| {
+ exec_tar(pipew).unwrap()
+ })
+ }
+ .context("Failed to run tar")?;
+
+ drop(pipew);
+ let result = io::copy(&mut piper, archive).context("Failed to write TAR archive");
+
+ wait::waitpid(pid, None)?
+ .check()
+ .context("tar did not exit successfully")?;
+
+ result?;
+ Ok(())
+}
+
+pub fn unpack<R: Read, P: AsRef<Path>>(archive: R, dest: P) -> Result<()> {
+ fs::mkdir(&dest)?;
+
+ let mut ar = tar::Archive::new(archive);
+ ar.set_preserve_permissions(true);
+ ar.set_preserve_mtime(true);
+ ar.set_unpack_xattrs(true);
+ ar.set_overwrite(false);
+
+ ar.unpack(dest).context("Failed to unpack TAR archive")
+}
diff --git a/crates/runner/src/task.rs b/crates/runner/src/task.rs
new file mode 100644
index 0000000..296c1a0
--- /dev/null
+++ b/crates/runner/src/task.rs
@@ -0,0 +1,460 @@
+use std::{
+ collections::HashMap,
+ io::BufWriter,
+ os::unix::prelude::CommandExt,
+ path::{Path, PathBuf},
+ process::{self, Command, Stdio},
+ time::Instant,
+};
+
+use capctl::prctl;
+use nix::{
+ mount::{self, MsFlags},
+ sched::{unshare, CloneFlags},
+ sys::wait,
+ unistd::{self, Gid, Uid},
+};
+use serde::Serialize;
+use tee_readwrite::TeeWriter;
+
+use common::{error::*, string_hash::*, types::*};
+
+use super::{
+ jobserver::Jobserver,
+ ns, tar,
+ util::{cjson, fs, Checkable},
+};
+use crate::paths;
+
+const BUILD_UID: Uid = Uid::from_raw(1000);
+const BUILD_GID: Gid = Gid::from_raw(1000);
+
+type InputHasher = blake3::Hasher;
+type DependencyHasher = blake3::Hasher;
+type LayerHasher = blake3::Hasher;
+type ArchiveHasher = blake3::Hasher;
+
+fn dependency_hash(dep: &Dependency) -> DependencyHash {
+ DependencyHash(StringHash(
+ cjson::digest::<DependencyHasher, _>(dep).unwrap().into(),
+ ))
+}
+
+fn input_hash(task: &Task) -> InputHash {
+ #[derive(Debug, Serialize)]
+ struct HashInput<'a> {
+ pub command: &'a str,
+ pub inherit: &'a [LayerHash],
+ pub depends: HashMap<DependencyHash, &'a Dependency>,
+ pub outputs: &'a HashMap<String, String>,
+ }
+ let input = HashInput {
+ command: &task.command,
+ inherit: &task.inherit,
+ depends: task
+ .depends
+ .iter()
+ .map(|dep| (dependency_hash(dep), dep))
+ .collect(),
+ outputs: &task.outputs,
+ };
+
+ InputHash(StringHash(
+ cjson::digest::<InputHasher, _>(&input).unwrap().into(),
+ ))
+}
+
+fn init_task(input_hash: &InputHash, task: &Task) -> Result<fs::Mount> {
+ // Remove metadata first to ensure task invalidation
+ fs::ensure_removed(&paths::task_cache_filename(input_hash))?;
+
+ let task_state_dir = paths::task_state_dir(input_hash);
+ fs::ensure_removed(&task_state_dir)?;
+ fs::mkdir(&task_state_dir)?;
+
+ let task_layer_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_LAYER_SUBDIR]);
+ fs::mkdir(&task_layer_dir)?;
+ fs::fixup_permissions(&task_layer_dir)?;
+
+ let task_tmp_dir = paths::task_tmp_dir(input_hash);
+
+ let taskdir = paths::join(&[&task_tmp_dir, paths::TASK_TASKDIR]);
+ fs::mkdir(&taskdir)?;
+ let runfile = paths::join(&[&taskdir, paths::TASK_RUN]);
+ std::fs::write(&runfile, &task.command)
+ .with_context(|| format!("Failed to write {}", runfile))?;
+
+ let mount_target = paths::join(&[&task_tmp_dir, paths::TASK_WORKDIR]);
+ let mount = if task.inherit.is_empty() {
+ fs::mount(task_layer_dir, &mount_target, None, MsFlags::MS_BIND, None)
+ .with_context(|| format!("Failed to bind mount to {:?}", mount_target))?
+ } else {
+ let task_work_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_WORK_SUBDIR]);
+ fs::mkdir(&task_work_dir)?;
+ fs::fixup_permissions(&task_work_dir)?;
+
+ let lower = task
+ .inherit
+ .iter()
+ .rev()
+ .map(paths::layer_dir)
+ .collect::<Vec<_>>()
+ .join(":");
+ let options = format!(
+ "xino=off,index=off,metacopy=off,lowerdir={lower},upperdir={upper},workdir={work}",
+ lower = lower,
+ upper = task_layer_dir,
+ work = task_work_dir
+ );
+ fs::mount(
+ "overlay",
+ &mount_target,
+ Some("overlay"),
+ MsFlags::empty(),
+ Some(&options),
+ )
+ .with_context(|| format!("Failed to mount work overlay to {:?}", mount_target))?
+ };
+
+ Ok(mount)
+}
+
+fn init_task_rootfs(input_hash: &InputHash) -> Result<fs::Mount> {
+ let task_tmp_dir = paths::task_tmp_dir(input_hash);
+ let depends_dir = paths::join(&[&task_tmp_dir, paths::TASK_TMP_DEPENDS_SUBDIR]);
+ let mount_target = paths::join(&[&task_tmp_dir, paths::TASK_TMP_ROOTFS_SUBDIR]);
+
+ let lower = [&depends_dir, paths::ROOTFS_DIR].join(":");
+ let options = format!(
+ "xino=off,index=off,metacopy=off,lowerdir={lower}",
+ lower = lower,
+ );
+
+ let mount = fs::mount(
+ "overlay",
+ &mount_target,
+ Some("overlay"),
+ MsFlags::empty(),
+ Some(&options),
+ )
+ .with_context(|| format!("Failed to mount rootfs overlay to {:?}", mount_target))?;
+
+ Ok(mount)
+}
+
+fn cleanup_task(input_hash: &InputHash) -> Result<()> {
+ let task_tmp_dir = paths::task_tmp_dir(input_hash);
+
+ fs::ensure_removed(&task_tmp_dir).context("Failed to remove task tmp dir")?;
+
+ let task_state_dir = paths::task_state_dir(input_hash);
+ let task_work_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_WORK_SUBDIR]);
+ fs::ensure_removed(&task_work_dir).context("Failed to remove overlayfs workdir")?;
+
+ Ok(())
+}
+
+fn unpack_dependency<P1: AsRef<Path>, P2: AsRef<Path>>(filename: P1, dest: P2) -> Result<()> {
+ let file = fs::open(filename.as_ref())?;
+ tar::unpack(file, dest.as_ref()).with_context(|| {
+ format!(
+ "Failed to unpack {:?} to {:?}",
+ filename.as_ref(),
+ dest.as_ref()
+ )
+ })
+}
+
+fn unpack_dependencies(input_hash: &InputHash, task: &Task) -> Result<()> {
+ let task_tmp_dir = paths::task_tmp_dir(input_hash);
+ let downloads_dir = paths::join(&[&task_tmp_dir, paths::TASK_DLDIR]);
+ let depends_dir = paths::join(&[&task_tmp_dir, paths::TASK_TMP_DEPENDS_SUBDIR]);
+
+ fs::mkdir(&downloads_dir)?;
+ fs::mkdir(&depends_dir)?;
+
+ for dep in &task.depends {
+ match dep {
+ Dependency::Fetch { name, .. } => {
+ fs::copy(
+ paths::join(&[paths::DOWNLOADS_DIR, name]),
+ paths::join(&[&downloads_dir, name]),
+ )?;
+ }
+ Dependency::Task { output, path } => {
+ unpack_dependency(
+ paths::archive_filename(output),
+ paths::join(&[&depends_dir, path]),
+ )?;
+ }
+ }
+ }
+
+ Ok(())
+}
+
+fn collect_output(input_hash: &InputHash, path: &str) -> Result<Option<ArchiveHash>> {
+ let source: PathBuf = [&paths::task_tmp_dir(input_hash), paths::TASK_DESTDIR, path]
+ .iter()
+ .collect();
+ if !source.is_dir() {
+ return Ok(None);
+ }
+
+ let filename = paths::archive_tmp_filename(input_hash);
+
+ let hash = (|| -> Result<ArchiveHash> {
+ let file = fs::create(&filename)?;
+ let hasher = ArchiveHasher::new();
+ let writer = TeeWriter::new(file, hasher);
+ let mut buffered_writer = BufWriter::with_capacity(16 * 1024 * 1024, writer);
+
+ super::tar::pack(&mut buffered_writer, &source)?;
+
+ let writer = buffered_writer.into_inner()?;
+ let (file, hasher) = writer.into_inner();
+ file.sync_all()?;
+ drop(file);
+
+ Ok(ArchiveHash(StringHash(hasher.finalize().into())))
+ })()
+ .with_context(|| format!("Failed to pack {:?} to {:?}", source, filename))?;
+
+ fs::rename(filename, paths::archive_filename(&hash))?;
+
+ Ok(Some(hash))
+}
+
+fn collect_outputs(input_hash: &InputHash, task: &Task) -> Result<HashMap<String, ArchiveHash>> {
+ let mut ret = HashMap::new();
+
+ for (name, path) in &task.outputs {
+ if let Some(hash) = collect_output(input_hash, path)? {
+ ret.insert(name.clone(), hash);
+ }
+ }
+
+ Ok(ret)
+}
+
+fn run_task(input_hash: &InputHash, task: &Task, mut jobserver: Jobserver) -> Result<()> {
+ let _workdir_mount = init_task(input_hash, task).context("Failed to initialize task")?;
+ unpack_dependencies(input_hash, task).context("Failed to unpack dependencies")?;
+ let _rootfs_mount = init_task_rootfs(input_hash).context("Failed to initialize task rootfs")?;
+
+ let task_tmp_dir = paths::task_tmp_dir(input_hash);
+ let rootfs = paths::join(&[&task_tmp_dir, paths::TASK_TMP_ROOTFS_SUBDIR]);
+
+ let builddir_source = paths::join(&[&task_tmp_dir, paths::TASK_BUILDDIR]);
+ let builddir_target = paths::join(&[&rootfs, paths::TASK_BUILDDIR]);
+
+ let log_filename = paths::task_log_filename(input_hash);
+
+ let mut exec_cmd = || -> Result<()> {
+ let log = fs::create(&log_filename)?;
+
+ let log_stdout = log
+ .try_clone()
+ .context("Failed to duplicate log file descriptor")?;
+ let log_stderr = log
+ .try_clone()
+ .context("Failed to duplicate log file descriptor")?;
+
+ mount::mount::<_, _, str, str>(
+ Some(paths::join(&[paths::ROOTFS_DIR, "dev"]).as_str()),
+ paths::join(&[&rootfs, "dev"]).as_str(),
+ None,
+ MsFlags::MS_BIND | MsFlags::MS_REC,
+ None,
+ )
+ .expect("Failed to bind mount /dev directory");
+ mount::mount::<_, _, str, str>(
+ Some(builddir_source.as_str()),
+ builddir_target.as_str(),
+ None,
+ MsFlags::MS_BIND | MsFlags::MS_REC,
+ None,
+ )
+ .expect("Failed to bind mount build directory");
+
+ ns::pivot_root(&rootfs);
+ mount::mount::<str, _, str, str>(
+ None,
+ "/",
+ None,
+ MsFlags::MS_PRIVATE | MsFlags::MS_REC,
+ None,
+ )
+ .context("Failed to set MS_PRIVATE for container root")?;
+ ns::container_mounts().context("Failed to set up container mounts")?;
+
+ unistd::sethostname("rebel-builder").context("Failed to set hostname")?;
+
+ prctl::set_no_new_privs().context("set_no_new_privs()")?;
+
+ unshare(CloneFlags::CLONE_NEWUSER | CloneFlags::CLONE_NEWNS)
+ .context("Failed to create user namespace")?;
+ ns::setup_userns(BUILD_UID, BUILD_GID, Uid::from_raw(0), Gid::from_raw(0));
+
+ jobserver
+ .set_cloexec(false)
+ .context("Failed to unset O_CLOEXEC on jobserver pipe")?;
+
+ let err = Command::new("sh")
+ .args(&[
+ "-ex",
+ &paths::abs(&paths::join(&[paths::TASK_TASKDIR, paths::TASK_RUN])),
+ ])
+ .stdin(Stdio::null())
+ .stdout(log_stdout)
+ .stderr(log_stderr)
+ .current_dir(paths::TASK_WORKDIR)
+ .env_clear()
+ .env("PATH", "/usr/sbin:/usr/bin:/sbin:/bin")
+ .env("HOME", "/build")
+ .env("INPUT_HASH", input_hash.to_string())
+ .env("MAKEFLAGS", jobserver.to_makeflags())
+ .exec();
+ eprintln!("{}", err);
+ process::exit(127);
+ };
+
+ let (pid, ()) = unsafe {
+ ns::spawn(
+ CloneFlags::CLONE_NEWNS
+ | CloneFlags::CLONE_NEWPID
+ | CloneFlags::CLONE_NEWIPC
+ | CloneFlags::CLONE_NEWNET
+ | CloneFlags::CLONE_NEWUTS,
+ (),
+ |()| exec_cmd().unwrap(),
+ )
+ }
+ .context("Failed to run task container")?;
+
+ let status = wait::waitpid(pid, None)?;
+
+ if let Err(err) = status.check() {
+ return Err(Error::new(format!(
+ "Task failed: {}\nOutput: {}",
+ err, log_filename
+ )));
+ }
+
+ Ok(())
+}
+
+fn hash_layer(input_hash: &InputHash) -> Result<Option<LayerHash>> {
+ let task_state_dir = paths::task_state_dir(input_hash);
+ let task_layer_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_LAYER_SUBDIR]);
+
+ (|| -> Result<Option<LayerHash>> {
+ if fs::is_dir_empty(&task_layer_dir)? {
+ return Ok(None);
+ }
+
+ let hasher = LayerHasher::new();
+ let mut buffered_writer = BufWriter::with_capacity(16 * 1024 * 1024, hasher);
+
+ tar::pack(&mut buffered_writer, &task_layer_dir)?;
+
+ let hasher = buffered_writer.into_inner()?;
+ Ok(Some(LayerHash(StringHash(hasher.finalize().into()))))
+ })()
+ .with_context(|| format!("Failed to hash layer directory {:?}", task_layer_dir))
+}
+
+fn move_layer(input_hash: &InputHash, hash: &Option<LayerHash>) -> Result<()> {
+ let task_state_dir = paths::task_state_dir(input_hash);
+ let task_layer_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_LAYER_SUBDIR]);
+
+ if let Some(hash) = hash {
+ let layer_dir = paths::layer_dir(hash);
+
+ let err = match std::fs::rename(&task_layer_dir, &layer_dir) {
+ Ok(_) => return Ok(()),
+ Err(err) => err,
+ };
+
+ if !matches!(
+ err.raw_os_error(),
+ Some(libc::EEXIST) | Some(libc::ENOTEMPTY)
+ ) {
+ return Err(err).with_context(|| {
+ format!("Failed to rename {:?} to {:?}", task_layer_dir, layer_dir)
+ });
+ }
+ }
+
+ fs::ensure_removed(&task_layer_dir)
+}
+
+fn run_and_hash_task(
+ input_hash: &InputHash,
+ task: &Task,
+ jobserver: Jobserver,
+) -> Result<TaskOutput> {
+ run_task(input_hash, task, jobserver)?;
+
+ let outputs = collect_outputs(input_hash, task)?;
+
+ let layer = hash_layer(input_hash)?;
+ move_layer(input_hash, &layer)?;
+
+ Ok(TaskOutput {
+ input_hash: *input_hash,
+ layer,
+ outputs,
+ })
+}
+
+fn load_cached(input_hash: &InputHash) -> Result<TaskOutput> {
+ let filename = paths::task_cache_filename(input_hash);
+ let file = fs::open(&filename)?;
+
+ serde_json::from_reader(file)
+ .with_context(|| format!("Failed to read task cache data from {}", filename))
+}
+
+fn save_cached(input_hash: &InputHash, output: &TaskOutput) -> Result<()> {
+ fs::mkdir(&paths::task_state_dir(input_hash))?;
+
+ let tmp_filename = paths::task_cache_tmp_filename(input_hash);
+ let filename = paths::task_cache_filename(input_hash);
+
+ cjson::to_file(&tmp_filename, output)
+ .with_context(|| format!("Failed to write task cache data to {}", tmp_filename))?;
+
+ fs::rename(tmp_filename, filename)?;
+
+ Ok(())
+}
+
+pub fn handle(task: Task, jobserver: Jobserver) -> Result<TaskOutput> {
+ let input_hash = input_hash(&task);
+
+ if let Ok(task_output) = load_cached(&input_hash) {
+ return Ok(task_output);
+ }
+
+ let start_time = Instant::now();
+ println!("Starting task {} ({})", task.label, input_hash);
+
+ let task_ret = run_and_hash_task(&input_hash, &task, jobserver);
+ let cleanup_ret = cleanup_task(&input_hash);
+
+ let task_output = task_ret?;
+ cleanup_ret.context("Failed to clean up after task")?;
+
+ save_cached(&input_hash, &task_output)?;
+
+ let duration = Instant::now().duration_since(start_time);
+ println!(
+ "Finished task {} ({}) in {}",
+ task.label,
+ input_hash,
+ duration.as_secs_f32()
+ );
+
+ Ok(task_output)
+}
diff --git a/crates/runner/src/util/cjson.rs b/crates/runner/src/util/cjson.rs
new file mode 100644
index 0000000..5990943
--- /dev/null
+++ b/crates/runner/src/util/cjson.rs
@@ -0,0 +1,31 @@
+use std::{
+ fs::File,
+ io::{self, Write},
+ path::Path,
+};
+
+use digest::{self, Digest};
+use olpc_cjson::CanonicalFormatter;
+use serde::Serialize;
+use serde_json::error::Result;
+
+pub fn new_serializer<W: Write>(writer: W) -> serde_json::Serializer<W, CanonicalFormatter> {
+ serde_json::Serializer::with_formatter(writer, CanonicalFormatter::new())
+}
+
+pub fn to_writer<W: Write, T: ?Sized + Serialize>(writer: W, value: &T) -> Result<()> {
+ let mut ser = new_serializer(writer);
+ value.serialize(&mut ser)
+}
+
+pub fn to_file<P: AsRef<Path>, T: ?Sized + Serialize>(path: P, value: &T) -> io::Result<()> {
+ let file = File::create(path)?;
+ to_writer(&file, value)?;
+ file.sync_all()
+}
+
+pub fn digest<D: Digest + Write, T: ?Sized + Serialize>(value: &T) -> Result<digest::Output<D>> {
+ let mut digest = <D as Digest>::new();
+ to_writer(&mut digest, value)?;
+ Ok(digest.finalize())
+}
diff --git a/crates/runner/src/util/clone.rs b/crates/runner/src/util/clone.rs
new file mode 100644
index 0000000..4835b53
--- /dev/null
+++ b/crates/runner/src/util/clone.rs
@@ -0,0 +1,58 @@
+use std::{mem, process};
+
+use nix::{errno, sched, unistd};
+
+#[repr(C)]
+#[derive(Debug, Default)]
+struct CloneArgs {
+ flags: u64,
+ pidfd: u64,
+ child_tid: u64,
+ parent_tid: u64,
+ exit_signal: u64,
+ stack: u64,
+ stack_size: u64,
+ tls: u64,
+}
+
+pub unsafe fn clone(flags: sched::CloneFlags) -> nix::Result<unistd::ForkResult> {
+ let mut args = CloneArgs {
+ flags: flags.bits() as u64,
+ exit_signal: libc::SIGCHLD as u64,
+ ..CloneArgs::default()
+ };
+ let size = mem::size_of_val(&args) as libc::size_t;
+
+ let pid = libc::syscall(libc::SYS_clone3, &mut args, size);
+ if pid < 0 {
+ Err(errno::Errno::last())
+ } else if pid == 0 {
+ Ok(unistd::ForkResult::Child)
+ } else {
+ Ok(unistd::ForkResult::Parent {
+ child: unistd::Pid::from_raw(pid as libc::pid_t),
+ })
+ }
+}
+
+pub unsafe fn spawn<T, F>(
+ flags: Option<sched::CloneFlags>,
+ arg: T,
+ f: F,
+) -> nix::Result<(unistd::Pid, T)>
+where
+ F: FnOnce(T),
+{
+ let res = if let Some(flags) = flags {
+ clone(flags)
+ } else {
+ unistd::fork()
+ };
+ match res? {
+ unistd::ForkResult::Parent { child } => Ok((child, arg)),
+ unistd::ForkResult::Child => {
+ f(arg);
+ process::exit(0)
+ }
+ }
+}
diff --git a/crates/runner/src/util/fs.rs b/crates/runner/src/util/fs.rs
new file mode 100644
index 0000000..099a339
--- /dev/null
+++ b/crates/runner/src/util/fs.rs
@@ -0,0 +1,122 @@
+use std::{
+ fs::{self, File},
+ io,
+ os::unix::prelude::*,
+ path::{Path, PathBuf},
+};
+
+use nix::{
+ fcntl::OFlag,
+ mount::{self, MsFlags},
+ unistd,
+};
+
+use common::error::*;
+
+pub fn open<P: AsRef<Path>>(path: P) -> Result<fs::File> {
+ fs::File::open(path.as_ref())
+ .with_context(|| format!("Failed to open file {:?} for reading", path.as_ref()))
+}
+
+pub fn create<P: AsRef<Path>>(path: P) -> Result<fs::File> {
+ fs::File::create(path.as_ref())
+ .with_context(|| format!("Failed to open file {:?} for writing", path.as_ref()))
+}
+
+pub fn rename<P1: AsRef<Path>, P2: AsRef<Path>>(from: P1, to: P2) -> Result<()> {
+ fs::rename(from.as_ref(), to.as_ref())
+ .with_context(|| format!("Failed to rename {:?} to {:?}", from.as_ref(), to.as_ref()))
+}
+
+// Unlike fs::copy, this doesn't preserve file flags
+pub fn copy<P1: AsRef<Path>, P2: AsRef<Path>>(from: P1, to: P2) -> Result<()> {
+ (|| -> Result<()> {
+ let mut src = open(from.as_ref())?;
+ let mut dest = create(to.as_ref())?;
+ io::copy(&mut src, &mut dest)?;
+ dest.sync_all()?;
+ Ok(())
+ })()
+ .with_context(|| format!("Failed to copy {:?} to {:?}", from.as_ref(), to.as_ref()))
+}
+
+pub fn mkdir<P: AsRef<Path>>(path: P) -> Result<()> {
+ let mut builder = fs::DirBuilder::new();
+ builder.recursive(true);
+ builder
+ .create(path.as_ref())
+ .with_context(|| format!("Failed to create directory {:?}", path.as_ref()))
+}
+
+pub fn ensure_removed<P: AsRef<Path>>(path: P) -> Result<()> {
+ fs::remove_dir_all(path.as_ref())
+ .or_else(|err| match err.kind() {
+ io::ErrorKind::NotFound => Ok(()),
+ _ => Err(err),
+ })
+ .with_context(|| format!("Failed to delete directory {:?}", path.as_ref()))
+}
+
+pub fn is_dir_empty<P: AsRef<Path>>(path: P) -> Result<bool> {
+ Ok(fs::read_dir(path)?.next().is_none())
+}
+
+/// Fixes up weirdness of set-group-ID or bsdgroups
+pub fn fixup_permissions<P: AsRef<Path>>(path: P) -> Result<()> {
+ let path = path.as_ref();
+ let gid = unistd::getegid();
+
+ let metadata = path
+ .metadata()
+ .with_context(|| format!("Failed to get metadata of {:?}", path))?;
+
+ if metadata.gid() != gid.as_raw() {
+ unistd::chown(path, None, Some(gid))
+ .with_context(|| format!("Failed to set group of {:?}", path))?;
+ }
+
+ let mut perms = metadata.permissions();
+ let mode = perms.mode();
+ if (mode & 0o777) != mode {
+ perms.set_mode(mode & 0o777);
+ std::fs::set_permissions(path, perms)
+ .with_context(|| format!("Failed to set mode of {:?}", path))?;
+ }
+
+ Ok(())
+}
+
+#[must_use]
+pub struct Mount(PathBuf);
+
+impl Drop for Mount {
+ fn drop(&mut self) {
+ mount::umount(&self.0)
+ .with_context(|| format!("Failed to unmount {:?}", self.0))
+ .unwrap();
+ }
+}
+
+pub fn mount<P1: AsRef<Path>, P2: AsRef<Path>>(
+ source: P1,
+ target: P2,
+ fstype: Option<&str>,
+ flags: MsFlags,
+ data: Option<&str>,
+) -> Result<Mount> {
+ mkdir(target.as_ref()).with_context(|| format!("Failed to create {:?}", target.as_ref()))?;
+
+ let canon_target = target
+ .as_ref()
+ .canonicalize()
+ .with_context(|| format!("Failed to get absolute path for {:?}", target.as_ref()))?;
+ mount::mount(Some(source.as_ref()), &canon_target, fstype, flags, data)
+ .with_context(|| format!("Failed to mount {:?}", canon_target))?;
+ Ok(Mount(canon_target))
+}
+
+pub fn pipe() -> Result<(File, File)> {
+ unistd::pipe2(OFlag::O_CLOEXEC)
+ .context("pipe2()")
+ .map(|(piper, pipew)| unsafe { (File::from_raw_fd(piper), File::from_raw_fd(pipew)) })
+}
diff --git a/crates/runner/src/util/mod.rs b/crates/runner/src/util/mod.rs
new file mode 100644
index 0000000..eff589d
--- /dev/null
+++ b/crates/runner/src/util/mod.rs
@@ -0,0 +1,42 @@
+pub mod cjson;
+pub mod clone;
+pub mod fs;
+pub mod unix;
+
+use std::{
+ io::{Error, ErrorKind, Result},
+ process::ExitStatus,
+};
+
+use nix::sys::wait;
+
+pub trait Checkable {
+ fn check(&self) -> Result<()>;
+}
+
+impl Checkable for ExitStatus {
+ fn check(&self) -> Result<()> {
+ if self.success() {
+ Ok(())
+ } else {
+ Err(Error::new(
+ ErrorKind::Other,
+ format!("Process exited with {}", self),
+ ))
+ }
+ }
+}
+
+impl Checkable for wait::WaitStatus {
+ fn check(&self) -> Result<()> {
+ let message = match self {
+ wait::WaitStatus::Exited(_, 0) => return Ok(()),
+ wait::WaitStatus::Exited(_, code) => format!("Process exited with exit code: {}", code),
+ wait::WaitStatus::Signaled(_, signal, _) => {
+ format!("Process exited with signal: {}", signal)
+ }
+ _ => format!("Process in unexpected status: {:?}", self),
+ };
+ Err(Error::new(ErrorKind::Other, message))
+ }
+}
diff --git a/crates/runner/src/util/unix.rs b/crates/runner/src/util/unix.rs
new file mode 100644
index 0000000..48db764
--- /dev/null
+++ b/crates/runner/src/util/unix.rs
@@ -0,0 +1,61 @@
+use std::os::unix::prelude::*;
+
+use nix::{
+ fcntl::{self, FcntlArg, FdFlag, OFlag},
+ sched,
+ unistd::Pid,
+};
+
+use common::error::*;
+
+pub fn set_blocking(fd: RawFd, blocking: bool) -> Result<()> {
+ let flags = unsafe {
+ OFlag::from_bits_unchecked(fcntl::fcntl(fd, FcntlArg::F_GETFL).context("fcntl(F_GETFL)")?)
+ };
+
+ let new_flags = if blocking {
+ flags & !OFlag::O_NONBLOCK
+ } else {
+ flags | OFlag::O_NONBLOCK
+ };
+
+ if new_flags != flags {
+ fcntl::fcntl(fd, FcntlArg::F_SETFL(new_flags)).context("fcntl(F_SETFL)")?;
+ }
+
+ Ok(())
+}
+
+pub fn set_cloexec(fd: RawFd, cloexec: bool) -> Result<()> {
+ let flags = unsafe {
+ FdFlag::from_bits_unchecked(fcntl::fcntl(fd, FcntlArg::F_GETFD).context("fcntl(F_GETFD)")?)
+ };
+
+ let new_flags = if cloexec {
+ flags | FdFlag::FD_CLOEXEC
+ } else {
+ flags & !FdFlag::FD_CLOEXEC
+ };
+
+ if new_flags != flags {
+ fcntl::fcntl(fd, FcntlArg::F_SETFD(new_flags)).context("fcntl(F_SETFD)")?;
+ }
+
+ Ok(())
+}
+
+pub fn nproc() -> Result<usize> {
+ const MAXCPU: usize = sched::CpuSet::count();
+
+ let affinity = sched::sched_getaffinity(Pid::from_raw(0)).context("sched_getaffinity()")?;
+
+ let mut count = 0;
+
+ for cpu in 0..MAXCPU {
+ if affinity.is_set(cpu).unwrap() {
+ count += 1;
+ }
+ }
+
+ Ok(count)
+}