diff options
author | Matthias Schiffer <mschiffer@universe-factory.net> | 2021-10-25 00:19:45 +0200 |
---|---|---|
committer | Matthias Schiffer <mschiffer@universe-factory.net> | 2021-10-25 00:19:45 +0200 |
commit | 34ac18d20c13a78914d447fee83204811a27b1e4 (patch) | |
tree | 56763d4ea46927105fcc6a71e03d5bd75a6947a6 /crates | |
parent | a1a185370da27f2cc3df84d3a8d7141f9ce7db16 (diff) | |
download | rebel-34ac18d20c13a78914d447fee83204811a27b1e4.tar rebel-34ac18d20c13a78914d447fee83204811a27b1e4.zip |
Move runner into separate crate
Diffstat (limited to 'crates')
-rw-r--r-- | crates/runner/Cargo.toml | 23 | ||||
-rw-r--r-- | crates/runner/src/init.rs | 71 | ||||
-rw-r--r-- | crates/runner/src/jobserver.rs | 77 | ||||
-rw-r--r-- | crates/runner/src/lib.rs | 139 | ||||
-rw-r--r-- | crates/runner/src/ns.rs | 84 | ||||
-rw-r--r-- | crates/runner/src/paths.rs | 106 | ||||
-rw-r--r-- | crates/runner/src/tar.rs | 87 | ||||
-rw-r--r-- | crates/runner/src/task.rs | 460 | ||||
-rw-r--r-- | crates/runner/src/util/cjson.rs | 31 | ||||
-rw-r--r-- | crates/runner/src/util/clone.rs | 58 | ||||
-rw-r--r-- | crates/runner/src/util/fs.rs | 122 | ||||
-rw-r--r-- | crates/runner/src/util/mod.rs | 42 | ||||
-rw-r--r-- | crates/runner/src/util/unix.rs | 61 |
13 files changed, 1361 insertions, 0 deletions
diff --git a/crates/runner/Cargo.toml b/crates/runner/Cargo.toml new file mode 100644 index 0000000..b69e51e --- /dev/null +++ b/crates/runner/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "rebel-runner" +version = "0.1.0" +authors = ["Matthias Schiffer <mschiffer@universe-factory.net>"] +license = "MIT" +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +common = { path = "../common", package = "rebel-common" } + +blake3 = { version = "1.0.0", features = ["traits-preview"] } +capctl = "0.2.0" +digest = "0.9.0" +ipc-channel = { git = "https://github.com/servo/ipc-channel.git" } +libc = "0.2.84" +nix = "0.23.0" +olpc-cjson = "0.1.0" +serde = { version = "1", features = ["derive"] } +serde_json = "1.0.62" +tar = "0.4.32" +tee_readwrite = "0.1.0" diff --git a/crates/runner/src/init.rs b/crates/runner/src/init.rs new file mode 100644 index 0000000..783faf4 --- /dev/null +++ b/crates/runner/src/init.rs @@ -0,0 +1,71 @@ +use std::fs::File; + +use nix::mount::{self, MsFlags}; + +use common::error::*; + +use super::{tar, util::fs}; +use crate::paths; + +fn prepare_rootfs(rootfs: &str) -> Result<()> { + tar::unpack(File::open(paths::ROOTFS_ARCHIVE)?, rootfs) + .context("Unpacking build container rootfs failed")?; + + mount::mount::<_, _, str, str>(Some(rootfs), rootfs, None, MsFlags::MS_BIND, None) + .context("Failed to bind mount container rootfs")?; + + for dir in IntoIterator::into_iter(["pts", "shm"]) { + fs::mkdir(paths::join(&[rootfs, "dev", dir]))?; + } + + for (link, target) in IntoIterator::into_iter([ + ("fd", "/proc/self/fd"), + ("stdin", "/proc/self/fd/0"), + ("stdout", "/proc/self/fd/1"), + ("stderr", "/proc/self/fd/2"), + ("ptmx", "pts/ptmx"), + ]) { + let path = paths::join(&[rootfs, "dev", link]); + std::os::unix::fs::symlink(target, &path) + .with_context(|| format!("Failed to create link {}", path))?; + } + + for dev in IntoIterator::into_iter(["null", "zero", "full", "random", "urandom", "tty"]) { + let source = paths::join(&["/dev", dev]); + let target = paths::join(&[rootfs, "dev", dev]); + fs::create(&target)?; + mount::mount::<str, str, str, str>(Some(&source), &target, None, MsFlags::MS_BIND, None) + .with_context(|| format!("Failed to bind mount {}", source))?; + } + + mount::mount::<str, _, str, str>( + None, + rootfs, + None, + MsFlags::MS_REMOUNT | MsFlags::MS_BIND | MsFlags::MS_RDONLY, + None, + ) + .context("Failed to mount container rootfs read-only")?; + + Ok(()) +} + +pub fn init_runner() -> Result<()> { + fs::mkdir(paths::LAYER_STATE_DIR)?; + fs::mkdir(paths::OUTPUT_STATE_DIR)?; + + mount::mount::<_, _, _, str>( + Some("buildtmp"), + paths::TMP_DIR, + Some("tmpfs"), + MsFlags::empty(), + None, + ) + .context("Mounting build tmpfs failed")?; + mount::mount::<str, _, str, str>(None, paths::TMP_DIR, None, MsFlags::MS_PRIVATE, None) + .context("Failed to set MS_PRIVATE for build tmpfs")?; + + prepare_rootfs(paths::ROOTFS_DIR)?; + + Ok(()) +} diff --git a/crates/runner/src/jobserver.rs b/crates/runner/src/jobserver.rs new file mode 100644 index 0000000..3b6c856 --- /dev/null +++ b/crates/runner/src/jobserver.rs @@ -0,0 +1,77 @@ +use std::{os::unix::prelude::RawFd, slice}; + +use nix::{errno::Errno, fcntl::OFlag, poll, unistd}; + +use common::error::*; + +use super::util::unix; + +#[derive(Debug)] +pub struct Jobserver { + tokens: usize, + r: RawFd, + w: RawFd, +} + +impl Jobserver { + pub fn new(tokens: usize) -> Result<Jobserver> { + let (r, w) = unistd::pipe2(OFlag::O_CLOEXEC | OFlag::O_NONBLOCK).context("pipe()")?; + + for _ in 0..tokens { + if let Err(_) = unistd::write(w, b"+") { + break; + } + } + unix::set_blocking(w, true)?; + + Ok(Jobserver { tokens, r, w }) + } + + pub fn wait(&mut self) -> u8 { + loop { + poll::poll( + &mut [poll::PollFd::new(self.r, poll::PollFlags::POLLIN)], + -1, + ) + .expect("poll()"); + + let mut token = 0; + match unistd::read(self.r, slice::from_mut(&mut token)) { + Ok(n) => { + assert!(n == 1); + return token; + } + Err(Errno::EAGAIN) => { + // Token was sniped by another task + continue; + } + error @ Err(_) => { + error.expect("read()"); + } + } + } + } + + pub fn post(&mut self, token: u8) { + let n = unistd::write(self.w, slice::from_ref(&token)).expect("write()"); + assert!(n == 1); + } + + pub fn set_cloexec(&mut self, cloexec: bool) -> Result<()> { + unix::set_cloexec(self.r, cloexec)?; + unix::set_cloexec(self.w, cloexec)?; + Ok(()) + } + + pub fn to_makeflags(&self) -> String { + format!(" -j{} --jobserver-auth={},{}", self.tokens, self.r, self.w) + } +} + +impl Drop for Jobserver { + fn drop(&mut self) { + // FIXME Logging + let _ = unistd::close(self.r); + let _ = unistd::close(self.w); + } +} diff --git a/crates/runner/src/lib.rs b/crates/runner/src/lib.rs new file mode 100644 index 0000000..658a833 --- /dev/null +++ b/crates/runner/src/lib.rs @@ -0,0 +1,139 @@ +mod init; +mod jobserver; +mod ns; +pub mod paths; +mod tar; +mod task; +mod util; + +use capctl::prctl; +use ipc_channel::ipc; +use nix::{ + sched::CloneFlags, + sys::{signal, stat, wait}, + unistd::{self, Gid, Uid}, +}; +use serde::{Deserialize, Serialize}; + +use common::{error::*, types::*}; + +use self::{ + jobserver::Jobserver, + util::{clone, unix}, +}; + +#[derive(Debug, Deserialize, Serialize)] +enum Message { + Request(Task, ipc::IpcSender<Result<TaskOutput>>), +} + +fn handle_request( + jobserver: Jobserver, + channel: ipc::IpcReceiver<Message>, + task: Task, + reply_channel: ipc::IpcSender<Result<TaskOutput>>, +) -> (Jobserver, ipc::IpcReceiver<Message>) { + let child = |(mut jobserver, channel): (Jobserver, ipc::IpcReceiver<Message>)| { + drop(channel); + unsafe { signal::signal(signal::Signal::SIGCHLD, signal::SigHandler::SigDfl) }.unwrap(); + + prctl::set_name(&task.label).expect("prctl(PR_SET_NAME)"); + + let token = jobserver.wait(); + let (pid, mut jobserver) = unsafe { + clone::spawn(None, jobserver, |jobserver| { + let result = task::handle(task, jobserver); + reply_channel.send(result).expect("IPC send() failed"); + }) + } + .expect("fork()"); + let wait_res = wait::waitpid(pid, None); + jobserver.post(token); + wait_res.expect("waitpid()"); + }; + + unsafe { clone::spawn(None, (jobserver, channel), child) } + .expect("fork()") + .1 +} + +fn runner_loop(mut channel: ipc::IpcReceiver<Message>) { + let nproc = unix::nproc().expect("Failed to get number of available CPUs"); + let mut jobserver = Jobserver::new(nproc).expect("Failed to initialize jobserver pipe"); + + while let Ok(msg) = channel.recv() { + match msg { + Message::Request(task, reply_channel) => { + let ret = handle_request(jobserver, channel, task, reply_channel); + jobserver = ret.0; + channel = ret.1; + } + } + } +} + +fn runner(uid: Uid, gid: Gid, channel: ipc::IpcReceiver<Message>) { + ns::mount_proc(); + ns::setup_userns(Uid::from_raw(0), Gid::from_raw(0), uid, gid); + + stat::umask(stat::Mode::from_bits_truncate(0o022)); + + init::init_runner().unwrap(); + + let msg_handler = unsafe { + clone::spawn(None, (), |()| { + signal::signal(signal::Signal::SIGCHLD, signal::SigHandler::SigIgn).unwrap(); + runner_loop(channel); + }) + } + .expect("fork()") + .0; + + loop { + let status = wait::wait().expect("wait()"); + if status.pid() == Some(msg_handler) { + break; + } + } +} + +pub struct Runner { + channel: ipc::IpcSender<Message>, +} + +impl Runner { + /// Creates a new container runner + /// + /// Unsafe: Do not call in multithreaded processes + pub unsafe fn new() -> Result<Self> { + let uid = unistd::geteuid(); + let gid = unistd::getegid(); + + let (tx, rx) = ipc::channel().expect("IPC channel creation failed"); + + let (tx, _rx) = clone::spawn( + Some(CloneFlags::CLONE_NEWUSER | CloneFlags::CLONE_NEWNS | CloneFlags::CLONE_NEWPID), + (tx, rx), + |(tx, rx)| { + drop(tx); + runner(uid, gid, rx); + }, + ) + .expect("clone()") + .1; + + Ok(Runner { channel: tx }) + } +} + +impl Runner { + pub fn spawn(&self, task: &Task) -> ipc::IpcReceiver<Result<TaskOutput>> { + let (reply_tx, reply_rx) = ipc::channel().expect("IPC channel creation failed"); + + self.channel + .send(Message::Request(task.clone(), reply_tx)) + .expect("ContainerRunner task submission failed"); + + reply_rx + } +} diff --git a/crates/runner/src/ns.rs b/crates/runner/src/ns.rs new file mode 100644 index 0000000..a075931 --- /dev/null +++ b/crates/runner/src/ns.rs @@ -0,0 +1,84 @@ +use nix::{ + mount::{self, MntFlags, MsFlags}, + sched::CloneFlags, + unistd::{self, Gid, Pid, Uid}, +}; + +use common::error::*; + +use super::util::clone; + +pub fn mount_proc() { + mount::mount::<_, _, _, str>(Some("proc"), "/proc", Some("proc"), MsFlags::empty(), None) + .expect("Failed to mount /proc"); +} + +pub fn setup_userns(inner_uid: Uid, inner_gid: Gid, outer_uid: Uid, outer_gid: Gid) { + std::fs::write("/proc/self/setgroups", "deny").expect("Failed to write /proc/self/setgroups"); + std::fs::write( + "/proc/self/uid_map", + &format!("{} {} 1", inner_uid, outer_uid), + ) + .expect("Failed to write /proc/self/uid_map"); + std::fs::write( + "/proc/self/gid_map", + &format!("{} {} 1", inner_gid, outer_gid), + ) + .expect("Failed to write /proc/self/gid_map"); +} + +pub unsafe fn spawn<T, F>(flags: CloneFlags, arg: T, f: F) -> nix::Result<(Pid, T)> +where + F: FnOnce(T), +{ + assert!(flags.contains(CloneFlags::CLONE_NEWNS) || !flags.contains(CloneFlags::CLONE_NEWPID)); + + clone::spawn(Some(flags), arg, |arg| { + if flags.contains(CloneFlags::CLONE_NEWPID) { + mount_proc(); + } + f(arg) + }) +} + +pub fn pivot_root(path: &str) { + (|| -> Result<()> { + unistd::chdir(path).context("chdir()")?; + mount::mount::<_, _, str, str>(Some("/proc"), "proc", None, MsFlags::MS_BIND, None) + .context("Failed to bind mount /proc")?; + unistd::pivot_root(".", ".").context("pivot_root()")?; + mount::umount2(".", MntFlags::MNT_DETACH).context("umount2()")?; + unistd::chdir("/").context("chdir(\"/\")")?; + Ok(()) + })() + .expect("Failed to pivot root"); +} + +pub fn container_mounts() -> Result<()> { + mount::mount( + Some("tmp"), + "/tmp", + Some("tmpfs"), + MsFlags::MS_NODEV | MsFlags::MS_NOSUID, + Some("mode=1777,size=1048576k"), + ) + .context("Failed to mount /tmp")?; + mount::mount( + Some("devpts"), + "/dev/pts", + Some("devpts"), + MsFlags::MS_NOSUID | MsFlags::MS_NOEXEC, + Some("newinstance,ptmxmode=0666,mode=0620"), + ) + .context("Failed to mount /dev/pts")?; + mount::mount( + Some("shm"), + "/dev/shm", + Some("tmpfs"), + MsFlags::MS_NOSUID | MsFlags::MS_NOEXEC | MsFlags::MS_NODEV, + Some("mode=1777,size=65536k"), + ) + .context("Failed to mount /dev/shm")?; + + Ok(()) +} diff --git a/crates/runner/src/paths.rs b/crates/runner/src/paths.rs new file mode 100644 index 0000000..632f030 --- /dev/null +++ b/crates/runner/src/paths.rs @@ -0,0 +1,106 @@ +//! Build directory structure used through rebel +//! +//! # Current structure +//! +//! ```text +//! build/ +//! ├── rootfs.tar +//! ├── downloads/ +//! │ └── ... +//! ├── state/ +//! │ ├── output/ +//! │ │ ├── <input hash>.tar.tmp # during packing +//! │ │ ├── <archive hash>.tar # files are renamed when packing is finished +//! │ │ └── ... +//! │ ├── layer/ +//! │ │ ├── <layer hash>/ # overlayfs layer dir of finished tasks +//! │ │ └── ... +//! │ └── task/ +//! │ ├── <input hash>/ +//! │ │ ├── layer/ # overlayfs layer dir (moved to layer/ after build) +//! │ │ ├── work/ # overlayfs work dir (discarded after build) +//! │ │ ├── task.json.tmp # during write +//! │ │ └── task.json # after write +//! │ └── ... +//! └── tmp/ # tmpfs (mounted on start of rebel) +//! ├── rootfs/ # unpacked rootfs.tar +//! └── task/ +//! └── <input hash>/ +//! ├── build/ # mount point for /build directory +//! │ ├── downloads/ # downloaded sources +//! │ ├── dest/ # collected as output after build +//! │ ├── sysroot/ # sysroot mountpoint +//! │ ├── task/ # internal runner files +//! │ └── work/ # build overlay mountpoint +//! ├── rootfs/ # rootfs overlay mountpoint +//! └── depends/ # overlayed on rootfs in container +//! ``` + +use common::string_hash::*; + +pub const ROOTFS_ARCHIVE: &str = "build/rootfs.tar"; + +pub const DOWNLOADS_DIR: &str = "build/downloads"; + +pub const TMP_DIR: &str = "build/tmp"; +pub const ROOTFS_DIR: &str = "build/tmp/rootfs"; +pub const TASK_TMP_DIR: &str = "build/tmp/task"; + +pub const TASK_TMP_ROOTFS_SUBDIR: &str = "rootfs"; +pub const TASK_TMP_DEPENDS_SUBDIR: &str = "depends"; + +pub const OUTPUT_STATE_DIR: &str = "build/state/output"; +pub const TASK_STATE_DIR: &str = "build/state/task"; +pub const LAYER_STATE_DIR: &str = "build/state/layer"; + +pub const TASK_STATE_LAYER_SUBDIR: &str = "layer"; +pub const TASK_STATE_WORK_SUBDIR: &str = "work"; + +pub const TASK_BUILDDIR: &str = "build"; +pub const TASK_DESTDIR: &str = "build/dest"; +pub const TASK_DLDIR: &str = "build/downloads"; +pub const TASK_TASKDIR: &str = "build/task"; +pub const TASK_WORKDIR: &str = "build/work"; +pub const TASK_SYSROOT: &str = "opt/toolchain/sysroot"; + +pub const TASK_RUN: &str = "run"; + +pub fn join(paths: &[&str]) -> String { + paths.join("/") +} + +pub fn abs(path: &str) -> String { + join(&["", path]) +} + +pub fn task_tmp_dir(hash: &InputHash) -> String { + join(&[TASK_TMP_DIR, &hash.to_string()]) +} + +pub fn task_state_dir(hash: &InputHash) -> String { + join(&[TASK_STATE_DIR, &hash.to_string()]) +} + +pub fn task_cache_tmp_filename(hash: &InputHash) -> String { + join(&[TASK_STATE_DIR, &hash.to_string(), "task.json.tmp"]) +} + +pub fn task_cache_filename(hash: &InputHash) -> String { + join(&[TASK_STATE_DIR, &hash.to_string(), "task.json"]) +} + +pub fn task_log_filename(hash: &InputHash) -> String { + join(&[TASK_STATE_DIR, &hash.to_string(), "task.log"]) +} + +pub fn layer_dir(hash: &LayerHash) -> String { + join(&[LAYER_STATE_DIR, &hash.to_string()]) +} + +pub fn archive_tmp_filename(hash: &InputHash) -> String { + join(&[OUTPUT_STATE_DIR, &format!("{}.tar.tmp", hash)]) +} + +pub fn archive_filename(hash: &ArchiveHash) -> String { + join(&[OUTPUT_STATE_DIR, &format!("{}.tar", hash)]) +} diff --git a/crates/runner/src/tar.rs b/crates/runner/src/tar.rs new file mode 100644 index 0000000..9306775 --- /dev/null +++ b/crates/runner/src/tar.rs @@ -0,0 +1,87 @@ +use std::{ + fs::File, + io::{self, Read, Write}, + os::unix::prelude::CommandExt, + path::Path, + process::{self, Command, Stdio}, +}; + +use nix::{ + mount::{self, MsFlags}, + sched::CloneFlags, + sys::wait, +}; + +use common::error::*; + +use super::{ + ns, + util::{fs, Checkable}, +}; +use crate::paths; + +pub fn pack<W: Write, P: AsRef<Path>>(archive: &mut W, source: P) -> Result<()> { + let (mut piper, pipew) = fs::pipe()?; + + let exec_tar = |stdout: File| -> Result<()> { + // We are in our own mount namespace, so mounting into the shared ROOTFS_DIR is fine + let mount_target = paths::join(&[paths::ROOTFS_DIR, paths::TASK_BUILDDIR]); + mount::mount::<_, _, str, str>( + Some(source.as_ref()), + mount_target.as_str(), + None, + MsFlags::MS_BIND, + None, + )?; + + ns::pivot_root(paths::ROOTFS_DIR); + + let err = Command::new("tar") + .args(&[ + "-c", + "--sort=name", + "--numeric-owner", + "--owner=0", + "--group=0", + "--mtime=@0", + ".", + ]) + .stdin(Stdio::null()) + .stdout(stdout) + .current_dir(paths::TASK_BUILDDIR) + .env_clear() + .env("PATH", "/usr/sbin:/usr/bin:/sbin:/bin") + .exec(); + eprintln!("{}", err); + process::exit(127); + }; + + let (pid, pipew) = unsafe { + ns::spawn(CloneFlags::CLONE_NEWNS, pipew, |pipew| { + exec_tar(pipew).unwrap() + }) + } + .context("Failed to run tar")?; + + drop(pipew); + let result = io::copy(&mut piper, archive).context("Failed to write TAR archive"); + + wait::waitpid(pid, None)? + .check() + .context("tar did not exit successfully")?; + + result?; + Ok(()) +} + +pub fn unpack<R: Read, P: AsRef<Path>>(archive: R, dest: P) -> Result<()> { + fs::mkdir(&dest)?; + + let mut ar = tar::Archive::new(archive); + ar.set_preserve_permissions(true); + ar.set_preserve_mtime(true); + ar.set_unpack_xattrs(true); + ar.set_overwrite(false); + + ar.unpack(dest).context("Failed to unpack TAR archive") +} diff --git a/crates/runner/src/task.rs b/crates/runner/src/task.rs new file mode 100644 index 0000000..296c1a0 --- /dev/null +++ b/crates/runner/src/task.rs @@ -0,0 +1,460 @@ +use std::{ + collections::HashMap, + io::BufWriter, + os::unix::prelude::CommandExt, + path::{Path, PathBuf}, + process::{self, Command, Stdio}, + time::Instant, +}; + +use capctl::prctl; +use nix::{ + mount::{self, MsFlags}, + sched::{unshare, CloneFlags}, + sys::wait, + unistd::{self, Gid, Uid}, +}; +use serde::Serialize; +use tee_readwrite::TeeWriter; + +use common::{error::*, string_hash::*, types::*}; + +use super::{ + jobserver::Jobserver, + ns, tar, + util::{cjson, fs, Checkable}, +}; +use crate::paths; + +const BUILD_UID: Uid = Uid::from_raw(1000); +const BUILD_GID: Gid = Gid::from_raw(1000); + +type InputHasher = blake3::Hasher; +type DependencyHasher = blake3::Hasher; +type LayerHasher = blake3::Hasher; +type ArchiveHasher = blake3::Hasher; + +fn dependency_hash(dep: &Dependency) -> DependencyHash { + DependencyHash(StringHash( + cjson::digest::<DependencyHasher, _>(dep).unwrap().into(), + )) +} + +fn input_hash(task: &Task) -> InputHash { + #[derive(Debug, Serialize)] + struct HashInput<'a> { + pub command: &'a str, + pub inherit: &'a [LayerHash], + pub depends: HashMap<DependencyHash, &'a Dependency>, + pub outputs: &'a HashMap<String, String>, + } + let input = HashInput { + command: &task.command, + inherit: &task.inherit, + depends: task + .depends + .iter() + .map(|dep| (dependency_hash(dep), dep)) + .collect(), + outputs: &task.outputs, + }; + + InputHash(StringHash( + cjson::digest::<InputHasher, _>(&input).unwrap().into(), + )) +} + +fn init_task(input_hash: &InputHash, task: &Task) -> Result<fs::Mount> { + // Remove metadata first to ensure task invalidation + fs::ensure_removed(&paths::task_cache_filename(input_hash))?; + + let task_state_dir = paths::task_state_dir(input_hash); + fs::ensure_removed(&task_state_dir)?; + fs::mkdir(&task_state_dir)?; + + let task_layer_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_LAYER_SUBDIR]); + fs::mkdir(&task_layer_dir)?; + fs::fixup_permissions(&task_layer_dir)?; + + let task_tmp_dir = paths::task_tmp_dir(input_hash); + + let taskdir = paths::join(&[&task_tmp_dir, paths::TASK_TASKDIR]); + fs::mkdir(&taskdir)?; + let runfile = paths::join(&[&taskdir, paths::TASK_RUN]); + std::fs::write(&runfile, &task.command) + .with_context(|| format!("Failed to write {}", runfile))?; + + let mount_target = paths::join(&[&task_tmp_dir, paths::TASK_WORKDIR]); + let mount = if task.inherit.is_empty() { + fs::mount(task_layer_dir, &mount_target, None, MsFlags::MS_BIND, None) + .with_context(|| format!("Failed to bind mount to {:?}", mount_target))? + } else { + let task_work_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_WORK_SUBDIR]); + fs::mkdir(&task_work_dir)?; + fs::fixup_permissions(&task_work_dir)?; + + let lower = task + .inherit + .iter() + .rev() + .map(paths::layer_dir) + .collect::<Vec<_>>() + .join(":"); + let options = format!( + "xino=off,index=off,metacopy=off,lowerdir={lower},upperdir={upper},workdir={work}", + lower = lower, + upper = task_layer_dir, + work = task_work_dir + ); + fs::mount( + "overlay", + &mount_target, + Some("overlay"), + MsFlags::empty(), + Some(&options), + ) + .with_context(|| format!("Failed to mount work overlay to {:?}", mount_target))? + }; + + Ok(mount) +} + +fn init_task_rootfs(input_hash: &InputHash) -> Result<fs::Mount> { + let task_tmp_dir = paths::task_tmp_dir(input_hash); + let depends_dir = paths::join(&[&task_tmp_dir, paths::TASK_TMP_DEPENDS_SUBDIR]); + let mount_target = paths::join(&[&task_tmp_dir, paths::TASK_TMP_ROOTFS_SUBDIR]); + + let lower = [&depends_dir, paths::ROOTFS_DIR].join(":"); + let options = format!( + "xino=off,index=off,metacopy=off,lowerdir={lower}", + lower = lower, + ); + + let mount = fs::mount( + "overlay", + &mount_target, + Some("overlay"), + MsFlags::empty(), + Some(&options), + ) + .with_context(|| format!("Failed to mount rootfs overlay to {:?}", mount_target))?; + + Ok(mount) +} + +fn cleanup_task(input_hash: &InputHash) -> Result<()> { + let task_tmp_dir = paths::task_tmp_dir(input_hash); + + fs::ensure_removed(&task_tmp_dir).context("Failed to remove task tmp dir")?; + + let task_state_dir = paths::task_state_dir(input_hash); + let task_work_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_WORK_SUBDIR]); + fs::ensure_removed(&task_work_dir).context("Failed to remove overlayfs workdir")?; + + Ok(()) +} + +fn unpack_dependency<P1: AsRef<Path>, P2: AsRef<Path>>(filename: P1, dest: P2) -> Result<()> { + let file = fs::open(filename.as_ref())?; + tar::unpack(file, dest.as_ref()).with_context(|| { + format!( + "Failed to unpack {:?} to {:?}", + filename.as_ref(), + dest.as_ref() + ) + }) +} + +fn unpack_dependencies(input_hash: &InputHash, task: &Task) -> Result<()> { + let task_tmp_dir = paths::task_tmp_dir(input_hash); + let downloads_dir = paths::join(&[&task_tmp_dir, paths::TASK_DLDIR]); + let depends_dir = paths::join(&[&task_tmp_dir, paths::TASK_TMP_DEPENDS_SUBDIR]); + + fs::mkdir(&downloads_dir)?; + fs::mkdir(&depends_dir)?; + + for dep in &task.depends { + match dep { + Dependency::Fetch { name, .. } => { + fs::copy( + paths::join(&[paths::DOWNLOADS_DIR, name]), + paths::join(&[&downloads_dir, name]), + )?; + } + Dependency::Task { output, path } => { + unpack_dependency( + paths::archive_filename(output), + paths::join(&[&depends_dir, path]), + )?; + } + } + } + + Ok(()) +} + +fn collect_output(input_hash: &InputHash, path: &str) -> Result<Option<ArchiveHash>> { + let source: PathBuf = [&paths::task_tmp_dir(input_hash), paths::TASK_DESTDIR, path] + .iter() + .collect(); + if !source.is_dir() { + return Ok(None); + } + + let filename = paths::archive_tmp_filename(input_hash); + + let hash = (|| -> Result<ArchiveHash> { + let file = fs::create(&filename)?; + let hasher = ArchiveHasher::new(); + let writer = TeeWriter::new(file, hasher); + let mut buffered_writer = BufWriter::with_capacity(16 * 1024 * 1024, writer); + + super::tar::pack(&mut buffered_writer, &source)?; + + let writer = buffered_writer.into_inner()?; + let (file, hasher) = writer.into_inner(); + file.sync_all()?; + drop(file); + + Ok(ArchiveHash(StringHash(hasher.finalize().into()))) + })() + .with_context(|| format!("Failed to pack {:?} to {:?}", source, filename))?; + + fs::rename(filename, paths::archive_filename(&hash))?; + + Ok(Some(hash)) +} + +fn collect_outputs(input_hash: &InputHash, task: &Task) -> Result<HashMap<String, ArchiveHash>> { + let mut ret = HashMap::new(); + + for (name, path) in &task.outputs { + if let Some(hash) = collect_output(input_hash, path)? { + ret.insert(name.clone(), hash); + } + } + + Ok(ret) +} + +fn run_task(input_hash: &InputHash, task: &Task, mut jobserver: Jobserver) -> Result<()> { + let _workdir_mount = init_task(input_hash, task).context("Failed to initialize task")?; + unpack_dependencies(input_hash, task).context("Failed to unpack dependencies")?; + let _rootfs_mount = init_task_rootfs(input_hash).context("Failed to initialize task rootfs")?; + + let task_tmp_dir = paths::task_tmp_dir(input_hash); + let rootfs = paths::join(&[&task_tmp_dir, paths::TASK_TMP_ROOTFS_SUBDIR]); + + let builddir_source = paths::join(&[&task_tmp_dir, paths::TASK_BUILDDIR]); + let builddir_target = paths::join(&[&rootfs, paths::TASK_BUILDDIR]); + + let log_filename = paths::task_log_filename(input_hash); + + let mut exec_cmd = || -> Result<()> { + let log = fs::create(&log_filename)?; + + let log_stdout = log + .try_clone() + .context("Failed to duplicate log file descriptor")?; + let log_stderr = log + .try_clone() + .context("Failed to duplicate log file descriptor")?; + + mount::mount::<_, _, str, str>( + Some(paths::join(&[paths::ROOTFS_DIR, "dev"]).as_str()), + paths::join(&[&rootfs, "dev"]).as_str(), + None, + MsFlags::MS_BIND | MsFlags::MS_REC, + None, + ) + .expect("Failed to bind mount /dev directory"); + mount::mount::<_, _, str, str>( + Some(builddir_source.as_str()), + builddir_target.as_str(), + None, + MsFlags::MS_BIND | MsFlags::MS_REC, + None, + ) + .expect("Failed to bind mount build directory"); + + ns::pivot_root(&rootfs); + mount::mount::<str, _, str, str>( + None, + "/", + None, + MsFlags::MS_PRIVATE | MsFlags::MS_REC, + None, + ) + .context("Failed to set MS_PRIVATE for container root")?; + ns::container_mounts().context("Failed to set up container mounts")?; + + unistd::sethostname("rebel-builder").context("Failed to set hostname")?; + + prctl::set_no_new_privs().context("set_no_new_privs()")?; + + unshare(CloneFlags::CLONE_NEWUSER | CloneFlags::CLONE_NEWNS) + .context("Failed to create user namespace")?; + ns::setup_userns(BUILD_UID, BUILD_GID, Uid::from_raw(0), Gid::from_raw(0)); + + jobserver + .set_cloexec(false) + .context("Failed to unset O_CLOEXEC on jobserver pipe")?; + + let err = Command::new("sh") + .args(&[ + "-ex", + &paths::abs(&paths::join(&[paths::TASK_TASKDIR, paths::TASK_RUN])), + ]) + .stdin(Stdio::null()) + .stdout(log_stdout) + .stderr(log_stderr) + .current_dir(paths::TASK_WORKDIR) + .env_clear() + .env("PATH", "/usr/sbin:/usr/bin:/sbin:/bin") + .env("HOME", "/build") + .env("INPUT_HASH", input_hash.to_string()) + .env("MAKEFLAGS", jobserver.to_makeflags()) + .exec(); + eprintln!("{}", err); + process::exit(127); + }; + + let (pid, ()) = unsafe { + ns::spawn( + CloneFlags::CLONE_NEWNS + | CloneFlags::CLONE_NEWPID + | CloneFlags::CLONE_NEWIPC + | CloneFlags::CLONE_NEWNET + | CloneFlags::CLONE_NEWUTS, + (), + |()| exec_cmd().unwrap(), + ) + } + .context("Failed to run task container")?; + + let status = wait::waitpid(pid, None)?; + + if let Err(err) = status.check() { + return Err(Error::new(format!( + "Task failed: {}\nOutput: {}", + err, log_filename + ))); + } + + Ok(()) +} + +fn hash_layer(input_hash: &InputHash) -> Result<Option<LayerHash>> { + let task_state_dir = paths::task_state_dir(input_hash); + let task_layer_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_LAYER_SUBDIR]); + + (|| -> Result<Option<LayerHash>> { + if fs::is_dir_empty(&task_layer_dir)? { + return Ok(None); + } + + let hasher = LayerHasher::new(); + let mut buffered_writer = BufWriter::with_capacity(16 * 1024 * 1024, hasher); + + tar::pack(&mut buffered_writer, &task_layer_dir)?; + + let hasher = buffered_writer.into_inner()?; + Ok(Some(LayerHash(StringHash(hasher.finalize().into())))) + })() + .with_context(|| format!("Failed to hash layer directory {:?}", task_layer_dir)) +} + +fn move_layer(input_hash: &InputHash, hash: &Option<LayerHash>) -> Result<()> { + let task_state_dir = paths::task_state_dir(input_hash); + let task_layer_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_LAYER_SUBDIR]); + + if let Some(hash) = hash { + let layer_dir = paths::layer_dir(hash); + + let err = match std::fs::rename(&task_layer_dir, &layer_dir) { + Ok(_) => return Ok(()), + Err(err) => err, + }; + + if !matches!( + err.raw_os_error(), + Some(libc::EEXIST) | Some(libc::ENOTEMPTY) + ) { + return Err(err).with_context(|| { + format!("Failed to rename {:?} to {:?}", task_layer_dir, layer_dir) + }); + } + } + + fs::ensure_removed(&task_layer_dir) +} + +fn run_and_hash_task( + input_hash: &InputHash, + task: &Task, + jobserver: Jobserver, +) -> Result<TaskOutput> { + run_task(input_hash, task, jobserver)?; + + let outputs = collect_outputs(input_hash, task)?; + + let layer = hash_layer(input_hash)?; + move_layer(input_hash, &layer)?; + + Ok(TaskOutput { + input_hash: *input_hash, + layer, + outputs, + }) +} + +fn load_cached(input_hash: &InputHash) -> Result<TaskOutput> { + let filename = paths::task_cache_filename(input_hash); + let file = fs::open(&filename)?; + + serde_json::from_reader(file) + .with_context(|| format!("Failed to read task cache data from {}", filename)) +} + +fn save_cached(input_hash: &InputHash, output: &TaskOutput) -> Result<()> { + fs::mkdir(&paths::task_state_dir(input_hash))?; + + let tmp_filename = paths::task_cache_tmp_filename(input_hash); + let filename = paths::task_cache_filename(input_hash); + + cjson::to_file(&tmp_filename, output) + .with_context(|| format!("Failed to write task cache data to {}", tmp_filename))?; + + fs::rename(tmp_filename, filename)?; + + Ok(()) +} + +pub fn handle(task: Task, jobserver: Jobserver) -> Result<TaskOutput> { + let input_hash = input_hash(&task); + + if let Ok(task_output) = load_cached(&input_hash) { + return Ok(task_output); + } + + let start_time = Instant::now(); + println!("Starting task {} ({})", task.label, input_hash); + + let task_ret = run_and_hash_task(&input_hash, &task, jobserver); + let cleanup_ret = cleanup_task(&input_hash); + + let task_output = task_ret?; + cleanup_ret.context("Failed to clean up after task")?; + + save_cached(&input_hash, &task_output)?; + + let duration = Instant::now().duration_since(start_time); + println!( + "Finished task {} ({}) in {}", + task.label, + input_hash, + duration.as_secs_f32() + ); + + Ok(task_output) +} diff --git a/crates/runner/src/util/cjson.rs b/crates/runner/src/util/cjson.rs new file mode 100644 index 0000000..5990943 --- /dev/null +++ b/crates/runner/src/util/cjson.rs @@ -0,0 +1,31 @@ +use std::{ + fs::File, + io::{self, Write}, + path::Path, +}; + +use digest::{self, Digest}; +use olpc_cjson::CanonicalFormatter; +use serde::Serialize; +use serde_json::error::Result; + +pub fn new_serializer<W: Write>(writer: W) -> serde_json::Serializer<W, CanonicalFormatter> { + serde_json::Serializer::with_formatter(writer, CanonicalFormatter::new()) +} + +pub fn to_writer<W: Write, T: ?Sized + Serialize>(writer: W, value: &T) -> Result<()> { + let mut ser = new_serializer(writer); + value.serialize(&mut ser) +} + +pub fn to_file<P: AsRef<Path>, T: ?Sized + Serialize>(path: P, value: &T) -> io::Result<()> { + let file = File::create(path)?; + to_writer(&file, value)?; + file.sync_all() +} + +pub fn digest<D: Digest + Write, T: ?Sized + Serialize>(value: &T) -> Result<digest::Output<D>> { + let mut digest = <D as Digest>::new(); + to_writer(&mut digest, value)?; + Ok(digest.finalize()) +} diff --git a/crates/runner/src/util/clone.rs b/crates/runner/src/util/clone.rs new file mode 100644 index 0000000..4835b53 --- /dev/null +++ b/crates/runner/src/util/clone.rs @@ -0,0 +1,58 @@ +use std::{mem, process}; + +use nix::{errno, sched, unistd}; + +#[repr(C)] +#[derive(Debug, Default)] +struct CloneArgs { + flags: u64, + pidfd: u64, + child_tid: u64, + parent_tid: u64, + exit_signal: u64, + stack: u64, + stack_size: u64, + tls: u64, +} + +pub unsafe fn clone(flags: sched::CloneFlags) -> nix::Result<unistd::ForkResult> { + let mut args = CloneArgs { + flags: flags.bits() as u64, + exit_signal: libc::SIGCHLD as u64, + ..CloneArgs::default() + }; + let size = mem::size_of_val(&args) as libc::size_t; + + let pid = libc::syscall(libc::SYS_clone3, &mut args, size); + if pid < 0 { + Err(errno::Errno::last()) + } else if pid == 0 { + Ok(unistd::ForkResult::Child) + } else { + Ok(unistd::ForkResult::Parent { + child: unistd::Pid::from_raw(pid as libc::pid_t), + }) + } +} + +pub unsafe fn spawn<T, F>( + flags: Option<sched::CloneFlags>, + arg: T, + f: F, +) -> nix::Result<(unistd::Pid, T)> +where + F: FnOnce(T), +{ + let res = if let Some(flags) = flags { + clone(flags) + } else { + unistd::fork() + }; + match res? { + unistd::ForkResult::Parent { child } => Ok((child, arg)), + unistd::ForkResult::Child => { + f(arg); + process::exit(0) + } + } +} diff --git a/crates/runner/src/util/fs.rs b/crates/runner/src/util/fs.rs new file mode 100644 index 0000000..099a339 --- /dev/null +++ b/crates/runner/src/util/fs.rs @@ -0,0 +1,122 @@ +use std::{ + fs::{self, File}, + io, + os::unix::prelude::*, + path::{Path, PathBuf}, +}; + +use nix::{ + fcntl::OFlag, + mount::{self, MsFlags}, + unistd, +}; + +use common::error::*; + +pub fn open<P: AsRef<Path>>(path: P) -> Result<fs::File> { + fs::File::open(path.as_ref()) + .with_context(|| format!("Failed to open file {:?} for reading", path.as_ref())) +} + +pub fn create<P: AsRef<Path>>(path: P) -> Result<fs::File> { + fs::File::create(path.as_ref()) + .with_context(|| format!("Failed to open file {:?} for writing", path.as_ref())) +} + +pub fn rename<P1: AsRef<Path>, P2: AsRef<Path>>(from: P1, to: P2) -> Result<()> { + fs::rename(from.as_ref(), to.as_ref()) + .with_context(|| format!("Failed to rename {:?} to {:?}", from.as_ref(), to.as_ref())) +} + +// Unlike fs::copy, this doesn't preserve file flags +pub fn copy<P1: AsRef<Path>, P2: AsRef<Path>>(from: P1, to: P2) -> Result<()> { + (|| -> Result<()> { + let mut src = open(from.as_ref())?; + let mut dest = create(to.as_ref())?; + io::copy(&mut src, &mut dest)?; + dest.sync_all()?; + Ok(()) + })() + .with_context(|| format!("Failed to copy {:?} to {:?}", from.as_ref(), to.as_ref())) +} + +pub fn mkdir<P: AsRef<Path>>(path: P) -> Result<()> { + let mut builder = fs::DirBuilder::new(); + builder.recursive(true); + builder + .create(path.as_ref()) + .with_context(|| format!("Failed to create directory {:?}", path.as_ref())) +} + +pub fn ensure_removed<P: AsRef<Path>>(path: P) -> Result<()> { + fs::remove_dir_all(path.as_ref()) + .or_else(|err| match err.kind() { + io::ErrorKind::NotFound => Ok(()), + _ => Err(err), + }) + .with_context(|| format!("Failed to delete directory {:?}", path.as_ref())) +} + +pub fn is_dir_empty<P: AsRef<Path>>(path: P) -> Result<bool> { + Ok(fs::read_dir(path)?.next().is_none()) +} + +/// Fixes up weirdness of set-group-ID or bsdgroups +pub fn fixup_permissions<P: AsRef<Path>>(path: P) -> Result<()> { + let path = path.as_ref(); + let gid = unistd::getegid(); + + let metadata = path + .metadata() + .with_context(|| format!("Failed to get metadata of {:?}", path))?; + + if metadata.gid() != gid.as_raw() { + unistd::chown(path, None, Some(gid)) + .with_context(|| format!("Failed to set group of {:?}", path))?; + } + + let mut perms = metadata.permissions(); + let mode = perms.mode(); + if (mode & 0o777) != mode { + perms.set_mode(mode & 0o777); + std::fs::set_permissions(path, perms) + .with_context(|| format!("Failed to set mode of {:?}", path))?; + } + + Ok(()) +} + +#[must_use] +pub struct Mount(PathBuf); + +impl Drop for Mount { + fn drop(&mut self) { + mount::umount(&self.0) + .with_context(|| format!("Failed to unmount {:?}", self.0)) + .unwrap(); + } +} + +pub fn mount<P1: AsRef<Path>, P2: AsRef<Path>>( + source: P1, + target: P2, + fstype: Option<&str>, + flags: MsFlags, + data: Option<&str>, +) -> Result<Mount> { + mkdir(target.as_ref()).with_context(|| format!("Failed to create {:?}", target.as_ref()))?; + + let canon_target = target + .as_ref() + .canonicalize() + .with_context(|| format!("Failed to get absolute path for {:?}", target.as_ref()))?; + mount::mount(Some(source.as_ref()), &canon_target, fstype, flags, data) + .with_context(|| format!("Failed to mount {:?}", canon_target))?; + Ok(Mount(canon_target)) +} + +pub fn pipe() -> Result<(File, File)> { + unistd::pipe2(OFlag::O_CLOEXEC) + .context("pipe2()") + .map(|(piper, pipew)| unsafe { (File::from_raw_fd(piper), File::from_raw_fd(pipew)) }) +} diff --git a/crates/runner/src/util/mod.rs b/crates/runner/src/util/mod.rs new file mode 100644 index 0000000..eff589d --- /dev/null +++ b/crates/runner/src/util/mod.rs @@ -0,0 +1,42 @@ +pub mod cjson; +pub mod clone; +pub mod fs; +pub mod unix; + +use std::{ + io::{Error, ErrorKind, Result}, + process::ExitStatus, +}; + +use nix::sys::wait; + +pub trait Checkable { + fn check(&self) -> Result<()>; +} + +impl Checkable for ExitStatus { + fn check(&self) -> Result<()> { + if self.success() { + Ok(()) + } else { + Err(Error::new( + ErrorKind::Other, + format!("Process exited with {}", self), + )) + } + } +} + +impl Checkable for wait::WaitStatus { + fn check(&self) -> Result<()> { + let message = match self { + wait::WaitStatus::Exited(_, 0) => return Ok(()), + wait::WaitStatus::Exited(_, code) => format!("Process exited with exit code: {}", code), + wait::WaitStatus::Signaled(_, signal, _) => { + format!("Process exited with signal: {}", signal) + } + _ => format!("Process in unexpected status: {:?}", self), + }; + Err(Error::new(ErrorKind::Other, message)) + } +} diff --git a/crates/runner/src/util/unix.rs b/crates/runner/src/util/unix.rs new file mode 100644 index 0000000..48db764 --- /dev/null +++ b/crates/runner/src/util/unix.rs @@ -0,0 +1,61 @@ +use std::os::unix::prelude::*; + +use nix::{ + fcntl::{self, FcntlArg, FdFlag, OFlag}, + sched, + unistd::Pid, +}; + +use common::error::*; + +pub fn set_blocking(fd: RawFd, blocking: bool) -> Result<()> { + let flags = unsafe { + OFlag::from_bits_unchecked(fcntl::fcntl(fd, FcntlArg::F_GETFL).context("fcntl(F_GETFL)")?) + }; + + let new_flags = if blocking { + flags & !OFlag::O_NONBLOCK + } else { + flags | OFlag::O_NONBLOCK + }; + + if new_flags != flags { + fcntl::fcntl(fd, FcntlArg::F_SETFL(new_flags)).context("fcntl(F_SETFL)")?; + } + + Ok(()) +} + +pub fn set_cloexec(fd: RawFd, cloexec: bool) -> Result<()> { + let flags = unsafe { + FdFlag::from_bits_unchecked(fcntl::fcntl(fd, FcntlArg::F_GETFD).context("fcntl(F_GETFD)")?) + }; + + let new_flags = if cloexec { + flags | FdFlag::FD_CLOEXEC + } else { + flags & !FdFlag::FD_CLOEXEC + }; + + if new_flags != flags { + fcntl::fcntl(fd, FcntlArg::F_SETFD(new_flags)).context("fcntl(F_SETFD)")?; + } + + Ok(()) +} + +pub fn nproc() -> Result<usize> { + const MAXCPU: usize = sched::CpuSet::count(); + + let affinity = sched::sched_getaffinity(Pid::from_raw(0)).context("sched_getaffinity()")?; + + let mut count = 0; + + for cpu in 0..MAXCPU { + if affinity.is_set(cpu).unwrap() { + count += 1; + } + } + + Ok(count) +} |