diff options
Diffstat (limited to 'crates/rebel-runner/src')
-rw-r--r-- | crates/rebel-runner/src/init.rs | 68 | ||||
-rw-r--r-- | crates/rebel-runner/src/jobserver.rs | 79 | ||||
-rw-r--r-- | crates/rebel-runner/src/lib.rs | 217 | ||||
-rw-r--r-- | crates/rebel-runner/src/ns.rs | 84 | ||||
-rw-r--r-- | crates/rebel-runner/src/paths.rs | 118 | ||||
-rw-r--r-- | crates/rebel-runner/src/tar.rs | 105 | ||||
-rw-r--r-- | crates/rebel-runner/src/task.rs | 638 | ||||
-rw-r--r-- | crates/rebel-runner/src/util/checkable.rs | 37 | ||||
-rw-r--r-- | crates/rebel-runner/src/util/cjson.rs | 37 | ||||
-rw-r--r-- | crates/rebel-runner/src/util/clone.rs | 59 | ||||
-rw-r--r-- | crates/rebel-runner/src/util/fs.rs | 127 | ||||
-rw-r--r-- | crates/rebel-runner/src/util/mod.rs | 7 | ||||
-rw-r--r-- | crates/rebel-runner/src/util/stack.rs | 25 | ||||
-rw-r--r-- | crates/rebel-runner/src/util/steal.rs | 40 | ||||
-rw-r--r-- | crates/rebel-runner/src/util/unix.rs | 86 |
15 files changed, 1727 insertions, 0 deletions
diff --git a/crates/rebel-runner/src/init.rs b/crates/rebel-runner/src/init.rs new file mode 100644 index 0000000..0172a01 --- /dev/null +++ b/crates/rebel-runner/src/init.rs @@ -0,0 +1,68 @@ +use nix::mount::{self, MsFlags}; + +use rebel_common::error::*; + +use crate::{paths, util::fs}; + +fn prepare_dev(path: &str) -> Result<()> { + fs::mkdir(path)?; + mount::mount::<_, _, str, str>(Some(path), path, None, MsFlags::MS_BIND, None) + .context("Failed to bind mount container /dev")?; + + for dir in ["pts", "shm"] { + fs::mkdir(paths::join(&[path, dir]))?; + } + + for (link, target) in [ + ("fd", "/proc/self/fd"), + ("stdin", "/proc/self/fd/0"), + ("stdout", "/proc/self/fd/1"), + ("stderr", "/proc/self/fd/2"), + ("ptmx", "pts/ptmx"), + ] { + let path = paths::join(&[path, link]); + std::os::unix::fs::symlink(target, &path) + .with_context(|| format!("Failed to create link {}", path))?; + } + + for dev in ["null", "zero", "full", "random", "urandom", "tty"] { + let source = paths::join(&["/dev", dev]); + let target = paths::join(&[path, dev]); + fs::create(&target)?; + mount::mount::<str, str, str, str>(Some(&source), &target, None, MsFlags::MS_BIND, None) + .with_context(|| format!("Failed to bind mount {}", source))?; + } + + mount::mount::<str, _, str, str>( + None, + path, + None, + MsFlags::MS_REMOUNT | MsFlags::MS_BIND | MsFlags::MS_RDONLY, + None, + ) + .context("Failed to mount container /dev read-only")?; + + Ok(()) +} + +pub fn init_runner() -> Result<()> { + fs::mkdir(paths::LAYER_STATE_DIR)?; + fs::mkdir(paths::OUTPUT_STATE_DIR)?; + + fs::ensure_removed(paths::TMP_DIR)?; + fs::mkdir(paths::TMP_DIR)?; + mount::mount::<_, _, str, str>( + Some(paths::TMP_DIR), + paths::TMP_DIR, + None, + MsFlags::MS_BIND, + None, + ) + .context("Failed to bind mount build tmpdir")?; + mount::mount::<str, _, str, str>(None, paths::TMP_DIR, None, MsFlags::MS_PRIVATE, None) + .context("Failed to set MS_PRIVATE for build tmpdir")?; + + prepare_dev(paths::DEV_DIR)?; + + Ok(()) +} diff --git a/crates/rebel-runner/src/jobserver.rs b/crates/rebel-runner/src/jobserver.rs new file mode 100644 index 0000000..7c3f2f7 --- /dev/null +++ b/crates/rebel-runner/src/jobserver.rs @@ -0,0 +1,79 @@ +use std::{ + os::fd::{AsFd, AsRawFd, OwnedFd}, + slice, +}; + +use nix::{errno::Errno, fcntl::OFlag, poll, unistd}; + +use rebel_common::error::*; + +use super::util::unix; + +#[derive(Debug)] +pub struct Jobserver { + tokens: usize, + r: OwnedFd, + w: OwnedFd, +} + +impl Jobserver { + pub fn new(tokens: usize) -> Result<Jobserver> { + let (r, w) = unistd::pipe2(OFlag::O_CLOEXEC | OFlag::O_NONBLOCK).context("pipe()")?; + + for _ in 0..tokens { + if unistd::write(w.as_fd(), b"+").is_err() { + break; + } + } + unix::set_blocking(&w, true)?; + + Ok(Jobserver { tokens, r, w }) + } + + pub fn wait(&mut self) -> u8 { + loop { + poll::poll( + &mut [poll::PollFd::new(self.r.as_fd(), poll::PollFlags::POLLIN)], + poll::PollTimeout::NONE, + ) + .expect("poll()"); + + let mut token = 0; + match unistd::read(self.r.as_raw_fd(), slice::from_mut(&mut token)) { + Ok(n) => { + assert!(n == 1); + return token; + } + Err(Errno::EAGAIN) => { + // Token was sniped by another task + continue; + } + error @ Err(_) => { + error.expect("read()"); + } + } + } + } + + pub fn post(&mut self, token: u8) { + let n = unistd::write(self.w.as_fd(), slice::from_ref(&token)).expect("write()"); + assert!(n == 1); + } + + pub fn set_cloexec(&mut self, cloexec: bool) -> Result<()> { + unix::set_cloexec(&self.r, cloexec)?; + unix::set_cloexec(&self.w, cloexec)?; + Ok(()) + } + + pub fn to_makeflags(&self) -> String { + format!( + " -j{} --jobserver-auth={},{}", + self.tokens, + self.r.as_raw_fd(), + self.w.as_raw_fd(), + ) + } +} + +// FIXME Log lost tokens on drop diff --git a/crates/rebel-runner/src/lib.rs b/crates/rebel-runner/src/lib.rs new file mode 100644 index 0000000..7dde05d --- /dev/null +++ b/crates/rebel-runner/src/lib.rs @@ -0,0 +1,217 @@ +mod init; +mod jobserver; +mod ns; +mod paths; +mod tar; +mod task; +mod util; + +use std::{ + collections::HashSet, + fs::File, + net, + os::unix::{net::UnixStream, prelude::*}, + process, slice, +}; + +use capctl::prctl; +use nix::{ + errno::Errno, + fcntl::Flock, + poll, + sched::CloneFlags, + sys::{ + signal, + signalfd::{SfdFlags, SignalFd}, + stat, wait, + }, + unistd::{self, Gid, Pid, Uid}, +}; +use uds::UnixSeqpacketConn; + +use rebel_common::{error::*, types::*}; + +use jobserver::Jobserver; +use util::{checkable::Checkable, clone, steal::Steal, unix}; + +#[derive(Debug, Clone)] +pub struct Options { + pub jobs: Option<usize>, +} + +#[derive(Debug)] +struct RunnerContext { + socket: Steal<UnixSeqpacketConn>, + jobserver: Jobserver, + tasks: HashSet<Pid>, +} + +fn handle_sigchld(ctx: &mut RunnerContext) -> Result<()> { + loop { + let status = match wait::waitpid(Pid::from_raw(-1), Some(wait::WaitPidFlag::WNOHANG)) { + Ok(wait::WaitStatus::StillAlive) | Err(Errno::ECHILD) => return Ok(()), + res => res.expect("waitpid()"), + }; + let pid = status.pid().unwrap(); + if ctx.tasks.remove(&pid) { + status.check()?; + } + } +} + +fn handle_request(ctx: &mut RunnerContext, request_socket: UnixStream) { + let run = || { + ctx.socket.steal(); + + let task: Task = + bincode::deserialize_from(&request_socket).expect("Failed to decode task description"); + + prctl::set_name(&task.label).expect("prctl(PR_SET_NAME)"); + + let result = task::handle(task, &mut ctx.jobserver); + bincode::serialize_into(&request_socket, &result).expect("Failed to send task result"); + drop(request_socket); + }; + + let pid = unsafe { clone::spawn(None, run) }.expect("fork()"); + assert!(ctx.tasks.insert(pid)); +} + +fn handle_socket(ctx: &mut RunnerContext) -> bool { + let mut fd = 0; + + match ctx + .socket + .recv_fds(&mut [0], slice::from_mut(&mut fd)) + .expect("recv_fds()") + { + (1, _, n_fd) => { + assert!(n_fd == 1); + } + _ => return false, + } + + let request_socket = unsafe { UnixStream::from_raw_fd(fd) }; + handle_request(ctx, request_socket); + true +} + +fn borrow_socket_fd(socket: &UnixSeqpacketConn) -> BorrowedFd<'_> { + unsafe { BorrowedFd::borrow_raw(socket.as_raw_fd()) } +} + +fn runner( + uid: Uid, + gid: Gid, + socket: UnixSeqpacketConn, + _lockfile: Flock<File>, + options: &Options, +) -> ! { + unistd::setsid().expect("setsid()"); + ns::mount_proc(); + ns::setup_userns(Uid::from_raw(0), Gid::from_raw(0), uid, gid); + + stat::umask(stat::Mode::from_bits_truncate(0o022)); + + init::init_runner().unwrap(); + + let jobs = options + .jobs + .unwrap_or_else(|| unix::nproc().expect("Failed to get number of available CPUs")); + let jobserver = Jobserver::new(jobs).expect("Failed to initialize jobserver pipe"); + let mut ctx = RunnerContext { + socket: socket.into(), + jobserver, + tasks: HashSet::new(), + }; + + let mut signals = signal::SigSet::empty(); + signals.add(signal::Signal::SIGCHLD); + signal::pthread_sigmask(signal::SigmaskHow::SIG_BLOCK, Some(&signals), None) + .expect("pthread_sigmask()"); + let mut signal_fd = SignalFd::with_flags(&signals, SfdFlags::SFD_CLOEXEC) + .expect("Failed to create signal file descriptor"); + + loop { + let socket_fd = borrow_socket_fd(&ctx.socket); + let mut pollfds = [ + poll::PollFd::new(signal_fd.as_fd(), poll::PollFlags::POLLIN), + poll::PollFd::new(socket_fd.as_fd(), poll::PollFlags::POLLIN), + ]; + poll::poll(&mut pollfds, poll::PollTimeout::NONE).expect("poll()"); + + let signal_events = pollfds[0] + .revents() + .expect("Unknown events in poll() return"); + let socket_events = pollfds[1] + .revents() + .expect("Unknown events in poll() return"); + + if signal_events.contains(poll::PollFlags::POLLIN) { + let _signal = signal_fd.read_signal().expect("read_signal()").unwrap(); + handle_sigchld(&mut ctx).expect("Task process exited abnormally"); + } else if signal_events.intersects(!poll::PollFlags::POLLIN) { + panic!("Unexpected error status for signal file descriptor"); + } + + if socket_events.contains(poll::PollFlags::POLLIN) { + if !handle_socket(&mut ctx) { + break; + } + } else if socket_events.intersects(!poll::PollFlags::POLLIN) { + panic!("Unexpected error status for socket file descriptor"); + } + } + + process::exit(0); +} + +pub struct Runner { + socket: UnixSeqpacketConn, +} + +impl Runner { + /// Creates a new container runner + /// + /// # Safety + /// + /// Do not call in multithreaded processes. + pub unsafe fn new(options: &Options) -> Result<Self> { + let lockfile = unix::lock(paths::LOCKFILE, true, false) + .context("Failed to get lock on build directory, is another instance running?")?; + + let uid = unistd::geteuid(); + let gid = unistd::getegid(); + + let (local, remote) = UnixSeqpacketConn::pair().expect("socketpair()"); + + match clone::clone( + CloneFlags::CLONE_NEWUSER | CloneFlags::CLONE_NEWNS | CloneFlags::CLONE_NEWPID, + ) + .expect("clone()") + { + unistd::ForkResult::Parent { .. } => Ok(Runner { socket: local }), + unistd::ForkResult::Child => { + drop(local); + runner(uid, gid, remote, lockfile, options); + } + } + } + + pub fn spawn(&self, task: &Task) -> UnixStream { + let (local, remote) = UnixStream::pair().expect("socketpair()"); + + self.socket + .send_fds(&[0], &[remote.as_raw_fd()]) + .expect("send()"); + + bincode::serialize_into(&local, task).expect("Task submission failed"); + local.shutdown(net::Shutdown::Write).expect("shutdown()"); + + local + } + + pub fn result(socket: &UnixStream) -> Result<TaskOutput> { + bincode::deserialize_from(socket).expect("Failed to read task result") + } +} diff --git a/crates/rebel-runner/src/ns.rs b/crates/rebel-runner/src/ns.rs new file mode 100644 index 0000000..986aa80 --- /dev/null +++ b/crates/rebel-runner/src/ns.rs @@ -0,0 +1,84 @@ +use nix::{ + mount::{self, MntFlags, MsFlags}, + sched::CloneFlags, + unistd::{self, Gid, Pid, Uid}, +}; + +use rebel_common::error::*; + +use super::util::clone; + +pub fn mount_proc() { + mount::mount::<_, _, _, str>(Some("proc"), "/proc", Some("proc"), MsFlags::empty(), None) + .expect("Failed to mount /proc"); +} + +pub fn setup_userns(inner_uid: Uid, inner_gid: Gid, outer_uid: Uid, outer_gid: Gid) { + std::fs::write("/proc/self/setgroups", "deny").expect("Failed to write /proc/self/setgroups"); + std::fs::write( + "/proc/self/uid_map", + format!("{} {} 1", inner_uid, outer_uid), + ) + .expect("Failed to write /proc/self/uid_map"); + std::fs::write( + "/proc/self/gid_map", + format!("{} {} 1", inner_gid, outer_gid), + ) + .expect("Failed to write /proc/self/gid_map"); +} + +pub unsafe fn spawn<F>(flags: CloneFlags, f: F) -> nix::Result<Pid> +where + F: FnOnce(), +{ + assert!(flags.contains(CloneFlags::CLONE_NEWNS) || !flags.contains(CloneFlags::CLONE_NEWPID)); + + clone::spawn(Some(flags), || { + if flags.contains(CloneFlags::CLONE_NEWPID) { + mount_proc(); + } + f() + }) +} + +pub fn pivot_root(path: &str) { + (|| -> Result<()> { + unistd::chdir(path).context("chdir()")?; + mount::mount::<_, _, str, str>(Some("/proc"), "proc", None, MsFlags::MS_BIND, None) + .context("Failed to bind mount /proc")?; + unistd::pivot_root(".", ".").context("pivot_root()")?; + mount::umount2(".", MntFlags::MNT_DETACH).context("umount2()")?; + unistd::chdir("/").context("chdir(\"/\")")?; + Ok(()) + })() + .expect("Failed to pivot root"); +} + +pub fn container_mounts() -> Result<()> { + mount::mount( + Some("tmp"), + "/tmp", + Some("tmpfs"), + MsFlags::MS_NODEV | MsFlags::MS_NOSUID, + Some("mode=1777,size=1048576k"), + ) + .context("Failed to mount /tmp")?; + mount::mount( + Some("devpts"), + "/dev/pts", + Some("devpts"), + MsFlags::MS_NOSUID | MsFlags::MS_NOEXEC, + Some("newinstance,ptmxmode=0666,mode=0620"), + ) + .context("Failed to mount /dev/pts")?; + mount::mount( + Some("shm"), + "/dev/shm", + Some("tmpfs"), + MsFlags::MS_NOSUID | MsFlags::MS_NOEXEC | MsFlags::MS_NODEV, + Some("mode=1777,size=65536k"), + ) + .context("Failed to mount /dev/shm")?; + + Ok(()) +} diff --git a/crates/rebel-runner/src/paths.rs b/crates/rebel-runner/src/paths.rs new file mode 100644 index 0000000..84f9c4d --- /dev/null +++ b/crates/rebel-runner/src/paths.rs @@ -0,0 +1,118 @@ +//! Build directory structure used through rebel +//! +//! # Current structure +//! +//! ```text +//! build/ +//! ├── build.lock +//! ├── downloads/ +//! │ └── ... +//! ├── state/ +//! │ ├── output/ +//! │ │ ├── <input hash>.tar.tmp # during packing +//! │ │ ├── <archive hash>.tar # files are renamed when packing is finished +//! │ │ └── ... +//! │ ├── layer/ +//! │ │ ├── <layer hash>/ # overlayfs layer dir of finished tasks +//! │ │ └── ... +//! │ └── task/ +//! │ ├── <input hash>/ +//! │ │ ├── layer/ # overlayfs layer dir (moved to layer/ after build) +//! │ │ ├── work/ # overlayfs work dir (discarded after build) +//! │ │ ├── task.json.tmp # during write +//! │ │ ├── task.json # after write +//! │ │ ├── task.log # stdout/stderr output of the task +//! │ │ └── task.lock # task lockfile +//! │ └── ... +//! └── tmp/ # temporary files (cleaned on start) +//! ├── dev/ # container /dev +//! ├── depends/ # unpacked dependencies +//! └── task/ +//! └── <input hash>/ +//! ├── build/ # mount point for /build directory +//! │ ├── downloads/ # downloaded sources +//! │ ├── task/ # internal runner files +//! │ └── work/ # build overlay mountpoint +//! └── rootfs/ # rootfs overlay mountpoint +//! ``` + +use rebel_common::string_hash::*; + +pub const DOWNLOADS_DIR: &str = "build/downloads"; +pub const PIN_DIR: &str = "build/pinned"; + +pub const TMP_DIR: &str = "build/tmp"; +pub const DEV_DIR: &str = "build/tmp/dev"; +pub const DEPENDS_TMP_DIR: &str = "build/tmp/depends"; +pub const TASK_TMP_DIR: &str = "build/tmp/task"; + +pub const TASK_TMP_ROOTFS_SUBDIR: &str = "rootfs"; + +pub const LOCKFILE: &str = "build/build.lock"; +pub const OUTPUT_STATE_DIR: &str = "build/state/output"; +pub const TASK_STATE_DIR: &str = "build/state/task"; +pub const LAYER_STATE_DIR: &str = "build/state/layer"; + +pub const TASK_STATE_LAYER_SUBDIR: &str = "layer"; +pub const TASK_STATE_WORK_SUBDIR: &str = "work"; + +pub const TASK_BUILDDIR: &str = "build"; +pub const TASK_TASKDIR: &str = "build/task"; + +pub const TASK_RUN: &str = "run"; + +pub fn join(paths: &[&str]) -> String { + paths.join("/") +} + +pub fn task_tmp_dir(hash: &InputHash) -> String { + join(&[TASK_TMP_DIR, &hash.to_string()]) +} + +pub fn task_state_dir(hash: &InputHash) -> String { + join(&[TASK_STATE_DIR, &hash.to_string()]) +} + +pub fn task_cache_tmp_filename(hash: &InputHash) -> String { + join(&[TASK_STATE_DIR, &hash.to_string(), "task.json.tmp"]) +} + +pub fn task_cache_filename(hash: &InputHash) -> String { + join(&[TASK_STATE_DIR, &hash.to_string(), "task.json"]) +} + +pub fn task_log_filename(hash: &InputHash) -> String { + join(&[TASK_STATE_DIR, &hash.to_string(), "task.log"]) +} + +pub fn task_lock_filename(hash: &InputHash) -> String { + join(&[TASK_STATE_DIR, &hash.to_string(), "task.lock"]) +} + +pub fn layer_dir(hash: &LayerHash) -> String { + join(&[LAYER_STATE_DIR, &hash.to_string()]) +} + +pub fn depend_tmp_dir(hash: &ArchiveHash) -> String { + join(&[DEPENDS_TMP_DIR, &hash.to_string()]) +} + +pub fn depend_dir(hash: &ArchiveHash) -> String { + join(&[DEPENDS_TMP_DIR, &hash.to_string()]) +} + +pub fn depend_lock_filename(hash: &ArchiveHash) -> String { + join(&[DEPENDS_TMP_DIR, &format!("{}.lock", hash)]) +} + +pub fn archive_tmp_filename(hash: &InputHash) -> String { + join(&[OUTPUT_STATE_DIR, &format!("{}.tar.tmp", hash)]) +} + +pub fn archive_filename(hash: &ArchiveHash) -> String { + join(&[OUTPUT_STATE_DIR, &format!("{}.tar", hash)]) +} + +pub fn pinned_archive_filename(name: &str) -> String { + join(&[PIN_DIR, &format!("{}.tar", name)]) +} diff --git a/crates/rebel-runner/src/tar.rs b/crates/rebel-runner/src/tar.rs new file mode 100644 index 0000000..891c603 --- /dev/null +++ b/crates/rebel-runner/src/tar.rs @@ -0,0 +1,105 @@ +use std::{ + io::{self, Read, Write}, + os::unix::prelude::CommandExt, + path::Path, + process::{self, Command, Stdio}, +}; + +use nix::{ + mount::{self, MsFlags}, + sched::CloneFlags, + sys::wait, +}; + +use rebel_common::{error::*, string_hash::ArchiveHash}; + +use super::{ + ns, + util::{checkable::Checkable, fs}, +}; +use crate::paths; + +pub fn pack<W: Write, P: AsRef<Path>>( + rootfs_hash: &ArchiveHash, + archive: &mut W, + source: P, +) -> Result<()> { + let rootfs = paths::depend_dir(rootfs_hash); + let _rootfs_mount = fs::mount(&rootfs, &rootfs, None, MsFlags::MS_BIND, None) + .with_context(|| format!("Failed to bind mount rootfs to {:?}", rootfs))?; + mount::mount::<str, str, str, str>( + None, + &rootfs, + None, + MsFlags::MS_REMOUNT | MsFlags::MS_BIND | MsFlags::MS_RDONLY, + None, + ) + .context("Failed to mount container rootfs read-only")?; + + let (mut piper, pipew) = fs::pipe()?; + + let exec_tar = || -> Result<()> { + // We are in our own mount namespace, so mounting into the shared rootfs is fine + let dev_target = paths::join(&[&rootfs, "dev"]); + mount::mount::<_, _, str, str>( + Some(paths::DEV_DIR), + dev_target.as_str(), + None, + MsFlags::MS_BIND | MsFlags::MS_REC, + None, + )?; + let mount_target = paths::join(&[&rootfs, paths::TASK_BUILDDIR]); + mount::mount::<_, _, str, str>( + Some(source.as_ref()), + mount_target.as_str(), + None, + MsFlags::MS_BIND, + None, + )?; + + ns::pivot_root(&rootfs); + + let err = Command::new("tar") + .args([ + "-c", + "--sort=name", + "--numeric-owner", + "--owner=0", + "--group=0", + "--mtime=@0", + ".", + ]) + .stdin(Stdio::null()) + .stdout(pipew) + .current_dir(paths::TASK_BUILDDIR) + .env_clear() + .env("PATH", "/usr/sbin:/usr/bin:/sbin:/bin") + .exec(); + eprintln!("{}", err); + process::exit(127); + }; + + let pid = unsafe { ns::spawn(CloneFlags::CLONE_NEWNS, || exec_tar().unwrap()) } + .context("Failed to run tar")?; + + let result = io::copy(&mut piper, archive).context("Failed to write TAR archive"); + + wait::waitpid(pid, None)? + .check() + .context("tar did not exit successfully")?; + + result?; + Ok(()) +} + +pub fn unpack<R: Read, P: AsRef<Path>>(archive: R, dest: P) -> Result<()> { + fs::mkdir(&dest)?; + + let mut ar = tar::Archive::new(archive); + ar.set_preserve_permissions(true); + ar.set_preserve_mtime(true); + ar.set_unpack_xattrs(true); + ar.set_overwrite(false); + + ar.unpack(dest).context("Failed to unpack TAR archive") +} diff --git a/crates/rebel-runner/src/task.rs b/crates/rebel-runner/src/task.rs new file mode 100644 index 0000000..5bb253a --- /dev/null +++ b/crates/rebel-runner/src/task.rs @@ -0,0 +1,638 @@ +use std::{ + collections::{BTreeMap, HashMap, HashSet}, + io::{self, BufWriter}, + os::unix::prelude::CommandExt, + path::{Path, PathBuf}, + process::{self, Command, Stdio}, + time::Instant, +}; + +use capctl::prctl; +use nix::{ + mount::{self, MsFlags}, + sched::{unshare, CloneFlags}, + sys::{resource, time::TimeVal, wait}, + unistd::{self, Gid, Uid}, +}; +use serde::Serialize; +use tee_readwrite::{TeeReader, TeeWriter}; + +use rebel_common::{error::*, string_hash::*, types::*}; +use walkdir::WalkDir; + +use super::{ + jobserver::Jobserver, + ns, tar, + util::{checkable::Checkable, cjson, fs}, +}; +use crate::{ + paths, + util::{stack::Stack, unix}, +}; + +const BUILD_UID: Uid = Uid::from_raw(1000); +const BUILD_GID: Gid = Gid::from_raw(1000); + +type InputHasher = blake3::Hasher; +type DependencyHasher = blake3::Hasher; +type LayerHasher = blake3::Hasher; +type ArchiveHasher = blake3::Hasher; + +type DependMap = BTreeMap<String, Vec<ArchiveHash>>; + +fn dependency_hash(dep: &Dependency) -> DependencyHash { + DependencyHash(StringHash( + cjson::digest::<DependencyHasher, _>(dep).unwrap().into(), + )) +} + +fn input_hash(task: &Task) -> InputHash { + #[derive(Debug, Serialize)] + struct HashInput<'a> { + pub command: &'a str, + pub workdir: &'a str, + pub rootfs: &'a ArchiveHash, + pub ancestors: &'a [LayerHash], + pub depends: HashMap<DependencyHash, &'a Dependency>, + pub outputs: &'a HashMap<String, String>, + } + let input = HashInput { + command: &task.command, + workdir: &task.workdir, + rootfs: &task.rootfs, + ancestors: &task.ancestors, + depends: task + .depends + .iter() + .map(|dep| (dependency_hash(dep), dep)) + .collect(), + outputs: &task.outputs, + }; + + InputHash(StringHash( + cjson::digest::<InputHasher, _>(&input).unwrap().into(), + )) +} + +fn init_task(input_hash: &InputHash, task: &Task) -> Result<fs::Mount> { + // Remove metadata first to ensure task invalidation + fs::ensure_removed(paths::task_cache_filename(input_hash))?; + + let task_state_dir = paths::task_state_dir(input_hash); + + let task_layer_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_LAYER_SUBDIR]); + fs::ensure_removed(&task_layer_dir)?; + fs::mkdir(&task_layer_dir)?; + fs::fixup_permissions(&task_layer_dir)?; + + let task_tmp_dir = paths::task_tmp_dir(input_hash); + + let taskdir = paths::join(&[&task_tmp_dir, paths::TASK_TASKDIR]); + fs::mkdir(&taskdir)?; + let runfile = paths::join(&[&taskdir, paths::TASK_RUN]); + std::fs::write(&runfile, &task.command) + .with_context(|| format!("Failed to write {}", runfile))?; + + let mount_target = paths::join(&[&task_tmp_dir, &task.workdir]); + let mount = if task.ancestors.is_empty() { + fs::mount(task_layer_dir, &mount_target, None, MsFlags::MS_BIND, None) + .with_context(|| format!("Failed to bind mount to {:?}", mount_target))? + } else { + let task_work_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_WORK_SUBDIR]); + fs::ensure_removed(&task_work_dir)?; + fs::mkdir(&task_work_dir)?; + fs::fixup_permissions(&task_work_dir)?; + + let lower = task + .ancestors + .iter() + .rev() + .map(paths::layer_dir) + .collect::<Vec<_>>() + .join(":"); + let options = format!( + "xino=off,index=off,metacopy=off,lowerdir={lower},upperdir={upper},workdir={work}", + lower = lower, + upper = task_layer_dir, + work = task_work_dir + ); + fs::mount( + "overlay", + &mount_target, + Some("overlay"), + MsFlags::empty(), + Some(&options), + ) + .with_context(|| format!("Failed to mount work overlay to {:?}", mount_target))? + }; + + Ok(mount) +} + +fn get_contents<P1: AsRef<Path>, P2: AsRef<Path>>( + path: P1, + prefix: P2, +) -> Result<(HashSet<PathBuf>, HashSet<PathBuf>)> { + let mut dirs = HashSet::new(); + let mut files = HashSet::new(); + + let root: PathBuf = Path::new("/").join(prefix.as_ref()); + + for result in WalkDir::new(path.as_ref()).min_depth(1).into_iter() { + let entry = result + .with_context(|| format!("Failed to list contents of directory {:?}", path.as_ref()))?; + let is_dir = entry.file_type().is_dir(); + let entry_path = root.join(entry.into_path().strip_prefix(path.as_ref()).unwrap()); + if is_dir { + dirs.insert(entry_path); + } else { + files.insert(entry_path); + } + } + + dirs.insert(root); + + Ok((dirs, files)) +} + +fn check_conflicts( + dirs1: &HashSet<PathBuf>, + files1: &HashSet<PathBuf>, + dirs2: &HashSet<PathBuf>, + files2: &HashSet<PathBuf>, +) -> Result<()> { + let mut conflicts = Vec::new(); + + conflicts.extend(files1.intersection(files2)); + conflicts.extend(dirs1.intersection(files2)); + conflicts.extend(files1.intersection(dirs2)); + + if !conflicts.is_empty() { + let mut conflict_strings: Box<[_]> = conflicts + .into_iter() + .map(|path| path.to_string_lossy().to_string()) + .collect(); + conflict_strings.sort(); + return Err(Error::new(format!( + "Found the following file conflicts in dependencies:\n{}", + conflict_strings.join("\n") + ))); + } + + Ok(()) +} + +fn init_task_rootfs(input_hash: &InputHash, task: &Task) -> Result<Stack<fs::Mount>> { + let task_tmp_dir = paths::task_tmp_dir(input_hash); + let mount_target = paths::join(&[&task_tmp_dir, paths::TASK_TMP_ROOTFS_SUBDIR]); + let rootfs = paths::depend_dir(&task.rootfs); + + let depends = unpack_dependencies(input_hash, task).context("Failed to unpack dependencies")?; + + let mut mounts = Stack::new(); + + mounts.push( + fs::mount(rootfs, &mount_target, None, MsFlags::MS_BIND, None) + .with_context(|| format!("Failed to bind mount rootfs to {:?}", mount_target))?, + ); + mount::mount::<str, str, str, str>( + None, + &mount_target, + None, + MsFlags::MS_REMOUNT | MsFlags::MS_BIND | MsFlags::MS_RDONLY, + None, + ) + .context("Failed to mount container rootfs read-only")?; + + let (mut dirs, mut files) = get_contents(&mount_target, "")?; + + for (path, dep_hashes) in depends { + assert!(!dep_hashes.is_empty()); + + if !path.is_empty() && !path.starts_with('/') { + return Err(Error::new(format!( + "Dependency path {:?} must be absolute", + path + ))); + } + + let dep_target = mount_target.clone() + &path; + let dep_paths: Box<[_]> = dep_hashes.iter().map(paths::depend_dir).collect(); + + for dep in dep_paths.iter() { + let (dep_dirs, dep_files) = get_contents(dep, &path)?; + check_conflicts(&dirs, &files, &dep_dirs, &dep_files)?; + dirs.extend(dep_dirs); + files.extend(dep_files); + } + + let options = format!( + "xino=off,index=off,metacopy=off,lowerdir={lower}:{base}", + lower = dep_paths.join(":"), + base = dep_target, + ); + + mounts.push( + fs::mount( + "overlay", + dep_target.as_str(), + Some("overlay"), + MsFlags::MS_RDONLY, + Some(&options), + ) + .with_context(|| format!("Failed to mount overlay to {:?}", dep_target))?, + ); + } + + Ok(mounts) +} + +fn cleanup_task(input_hash: &InputHash) -> Result<()> { + let task_tmp_dir = paths::task_tmp_dir(input_hash); + + fs::ensure_removed(task_tmp_dir).context("Failed to remove task tmp dir")?; + + let task_state_dir = paths::task_state_dir(input_hash); + let task_work_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_WORK_SUBDIR]); + fs::ensure_removed(task_work_dir).context("Failed to remove overlayfs workdir")?; + + Ok(()) +} + +fn get_archive_filename(task: &Task, hash: &ArchiveHash) -> String { + if let Some(pinned_name) = task.pins.get(hash) { + paths::pinned_archive_filename(pinned_name) + } else { + paths::archive_filename(hash) + } +} + +fn unpack_dependency(task: &Task, hash: &ArchiveHash) -> Result<()> { + let _lock = unix::lock(paths::depend_lock_filename(hash), true, true); + + let filename = get_archive_filename(task, hash); + + let dest = paths::depend_dir(hash); + if Path::new(&dest).is_dir() { + return Ok(()); + } + + (|| -> Result<()> { + let tmp_dest = paths::depend_tmp_dir(hash); + fs::ensure_removed(&tmp_dest)?; + + let file = fs::open(&filename)?; + let hasher = ArchiveHasher::new(); + let buffered_hasher = BufWriter::with_capacity(16 * 1024 * 1024, hasher); + let mut reader = TeeReader::new(file, buffered_hasher, false); + + tar::unpack(&mut reader, &tmp_dest)?; + + // Read file to end to get the correct hash + io::copy(&mut reader, &mut io::sink())?; + + let (_, buffered_hasher) = reader.into_inner(); + let hasher = buffered_hasher.into_inner()?; + + let actual_hash = ArchiveHash(StringHash(hasher.finalize().into())); + + if &actual_hash != hash { + return Err(Error::new(format!( + "Incorrect file hash for {:?} (expected: {}, actual: {})", + filename, hash, actual_hash, + ))); + } + + fs::rename(&tmp_dest, &dest)?; + + Ok(()) + })() + .with_context(|| format!("Failed to unpack {:?}", filename)) +} + +fn unpack_dependencies(input_hash: &InputHash, task: &Task) -> Result<DependMap> { + let task_tmp_dir = paths::task_tmp_dir(input_hash); + + unpack_dependency(task, &task.rootfs)?; + + let mut ret = DependMap::new(); + + for dep in &task.depends { + match dep { + Dependency::Fetch { + name, target_dir, .. + } => { + let path = paths::join(&[&task_tmp_dir, target_dir]); + fs::mkdir(&path)?; + fs::copy( + paths::join(&[paths::DOWNLOADS_DIR, name]), + paths::join(&[&path, name]), + )?; + } + Dependency::Task { output, path } => { + unpack_dependency(task, output)?; + ret.entry(path.clone()).or_default().push(*output); + } + } + } + + Ok(ret) +} + +fn collect_output(input_hash: &InputHash, task: &Task, path: &str) -> Result<Option<ArchiveHash>> { + let source = paths::join(&[&paths::task_tmp_dir(input_hash), path]); + if !Path::new(&source).is_dir() { + return Ok(None); + } + + let filename = paths::archive_tmp_filename(input_hash); + + let hash = (|| -> Result<ArchiveHash> { + let file = fs::create(&filename)?; + let hasher = ArchiveHasher::new(); + let writer = TeeWriter::new(file, hasher); + let mut buffered_writer = BufWriter::with_capacity(16 * 1024 * 1024, writer); + + super::tar::pack(&task.rootfs, &mut buffered_writer, &source)?; + + let writer = buffered_writer.into_inner()?; + let (file, hasher) = writer.into_inner(); + file.sync_all()?; + drop(file); + + Ok(ArchiveHash(StringHash(hasher.finalize().into()))) + })() + .with_context(|| format!("Failed to pack {:?} to {:?}", source, filename))?; + + fs::rename(filename, paths::archive_filename(&hash))?; + + Ok(Some(hash)) +} + +fn collect_outputs(input_hash: &InputHash, task: &Task) -> Result<HashMap<String, ArchiveHash>> { + let mut ret = HashMap::new(); + + for (name, path) in &task.outputs { + if let Some(hash) = collect_output(input_hash, task, path)? { + ret.insert(name.clone(), hash); + } + } + + Ok(ret) +} + +fn run_task(input_hash: &InputHash, task: &Task, jobserver: &mut Jobserver) -> Result<()> { + let _workdir_mount = init_task(input_hash, task).context("Failed to initialize task")?; + let _rootfs_mounts = + init_task_rootfs(input_hash, task).context("Failed to initialize task rootfs")?; + + let task_tmp_dir = paths::task_tmp_dir(input_hash); + let rootfs = paths::join(&[&task_tmp_dir, paths::TASK_TMP_ROOTFS_SUBDIR]); + + let builddir_source = paths::join(&[&task_tmp_dir, paths::TASK_BUILDDIR]); + let builddir_target = paths::join(&[&rootfs, paths::TASK_BUILDDIR]); + + let log_filename = paths::task_log_filename(input_hash); + + let mut exec_cmd = || -> Result<()> { + let log = fs::create(&log_filename)?; + + let log_stdout = log + .try_clone() + .context("Failed to duplicate log file descriptor")?; + let log_stderr = log + .try_clone() + .context("Failed to duplicate log file descriptor")?; + + mount::mount::<_, _, str, str>( + Some(paths::DEV_DIR), + paths::join(&[&rootfs, "dev"]).as_str(), + None, + MsFlags::MS_BIND | MsFlags::MS_REC, + None, + ) + .expect("Failed to bind mount /dev directory"); + mount::mount::<_, _, str, str>( + Some(builddir_source.as_str()), + builddir_target.as_str(), + None, + MsFlags::MS_BIND | MsFlags::MS_REC, + None, + ) + .expect("Failed to bind mount build directory"); + + ns::pivot_root(&rootfs); + mount::mount::<str, _, str, str>( + None, + "/", + None, + MsFlags::MS_PRIVATE | MsFlags::MS_REC, + None, + ) + .context("Failed to set MS_PRIVATE for container root")?; + ns::container_mounts().context("Failed to set up container mounts")?; + + unistd::sethostname("rebel-builder").context("Failed to set hostname")?; + + prctl::set_no_new_privs().context("set_no_new_privs()")?; + + unshare(CloneFlags::CLONE_NEWUSER | CloneFlags::CLONE_NEWNS) + .context("Failed to create user namespace")?; + ns::setup_userns(BUILD_UID, BUILD_GID, Uid::from_raw(0), Gid::from_raw(0)); + + jobserver + .set_cloexec(false) + .context("Failed to unset O_CLOEXEC on jobserver pipe")?; + + let err = Command::new("sh") + .args(["-ex", &paths::join(&[paths::TASK_TASKDIR, paths::TASK_RUN])]) + .stdin(Stdio::null()) + .stdout(log_stdout) + .stderr(log_stderr) + .env_clear() + .env("PATH", "/usr/sbin:/usr/bin:/sbin:/bin") + .env("HOME", "/build") + .env("MAKEFLAGS", jobserver.to_makeflags()) + .exec(); + eprintln!("{}", err); + process::exit(127); + }; + + let pid = unsafe { + ns::spawn( + CloneFlags::CLONE_NEWNS + | CloneFlags::CLONE_NEWPID + | CloneFlags::CLONE_NEWIPC + | CloneFlags::CLONE_NEWNET + | CloneFlags::CLONE_NEWUTS, + || exec_cmd().unwrap(), + ) + } + .context("Failed to run task container")?; + + let status = wait::waitpid(pid, None)?; + + if let Err(err) = status.check() { + return Err(Error::new(format!( + "Task failed: {}\nOutput: {}", + err, log_filename + ))); + } + + Ok(()) +} + +fn hash_layer(input_hash: &InputHash, task: &Task) -> Result<Option<LayerHash>> { + let task_state_dir = paths::task_state_dir(input_hash); + let task_layer_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_LAYER_SUBDIR]); + + (|| -> Result<Option<LayerHash>> { + if fs::is_dir_empty(&task_layer_dir)? { + return Ok(None); + } + + let hasher = LayerHasher::new(); + let mut buffered_writer = BufWriter::with_capacity(16 * 1024 * 1024, hasher); + + tar::pack(&task.rootfs, &mut buffered_writer, &task_layer_dir)?; + + let hasher = buffered_writer.into_inner()?; + Ok(Some(LayerHash(StringHash(hasher.finalize().into())))) + })() + .with_context(|| format!("Failed to hash layer directory {:?}", task_layer_dir)) +} + +fn move_layer(input_hash: &InputHash, hash: &Option<LayerHash>) -> Result<()> { + let task_state_dir = paths::task_state_dir(input_hash); + let task_layer_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_LAYER_SUBDIR]); + + if let Some(hash) = hash { + let layer_dir = paths::layer_dir(hash); + + let err = match std::fs::rename(&task_layer_dir, &layer_dir) { + Ok(_) => return Ok(()), + Err(err) => err, + }; + + if !matches!( + err.raw_os_error(), + Some(libc::EEXIST) | Some(libc::ENOTEMPTY) + ) { + return Err(err).with_context(|| { + format!("Failed to rename {:?} to {:?}", task_layer_dir, layer_dir) + }); + } + } + + fs::ensure_removed(&task_layer_dir) +} + +fn run_and_hash_task( + input_hash: &InputHash, + task: &Task, + jobserver: &mut Jobserver, +) -> Result<TaskOutput> { + run_task(input_hash, task, jobserver)?; + + let outputs = collect_outputs(input_hash, task)?; + + let layer = hash_layer(input_hash, task)?; + move_layer(input_hash, &layer)?; + + Ok(TaskOutput { + input_hash: Some(*input_hash), + layer, + outputs, + }) +} + +fn load_cached(input_hash: &InputHash) -> Result<TaskOutput> { + let filename = paths::task_cache_filename(input_hash); + let file = fs::open(&filename)?; + + serde_json::from_reader(file) + .with_context(|| format!("Failed to read task cache data from {}", filename)) +} + +fn save_cached(input_hash: &InputHash, output: &TaskOutput) -> Result<()> { + fs::mkdir(paths::task_state_dir(input_hash))?; + + let tmp_filename = paths::task_cache_tmp_filename(input_hash); + let filename = paths::task_cache_filename(input_hash); + + cjson::to_file(&tmp_filename, output) + .with_context(|| format!("Failed to write task cache data to {}", tmp_filename))?; + + fs::rename(tmp_filename, filename)?; + + Ok(()) +} + +trait AsSecsF32 { + fn as_secs_f32(&self) -> f32; +} + +impl AsSecsF32 for TimeVal { + fn as_secs_f32(&self) -> f32 { + self.tv_sec() as f32 + 1e-6 * (self.tv_usec() as f32) + } +} + +fn get_usage(total: f32) -> String { + let usage = resource::getrusage(resource::UsageWho::RUSAGE_CHILDREN).expect("getrusage()"); + + let user = usage.user_time().as_secs_f32(); + let system = usage.system_time().as_secs_f32(); + let cpu = (100.0 * (user + system) / total).round(); + + format!("{user:.2}s user {system:.2}s system {cpu:.0}% cpu") +} + +pub fn handle(task: Task, jobserver: &mut Jobserver) -> Result<TaskOutput> { + let input_hash = input_hash(&task); + + let _lock = unix::lock(paths::task_lock_filename(&input_hash), true, true) + .context("Failed to get task lock")?; + + let cached_output = load_cached(&input_hash); + if !task.force_run { + if let Ok(task_output) = cached_output { + return Ok(task_output); + } + } + + let token = jobserver.wait(); + + let start_time = Instant::now(); + println!("Starting task {} ({})", task.label, input_hash); + + let task_ret = run_and_hash_task(&input_hash, &task, jobserver); + let cleanup_ret = cleanup_task(&input_hash); + + jobserver.post(token); + + let task_output = task_ret?; + cleanup_ret.context("Failed to clean up after task")?; + + save_cached(&input_hash, &task_output)?; + + let duration = Instant::now().duration_since(start_time).as_secs_f32(); + let usage = get_usage(duration); + println!( + "Finished task {} ({}) in {:.2}s ({})", + task.label, input_hash, duration, usage, + ); + + if let Ok(cached_output) = cached_output { + if cached_output.outputs != task_output.outputs { + println!( + "WARNING: Output hashes for task {} do not match cached result\n Cached output: {}\n New output: {}", + task.label, + cjson::to_string(&cached_output.outputs).unwrap(), + cjson::to_string(&task_output.outputs).unwrap(), + ); + } + } + + Ok(task_output) +} diff --git a/crates/rebel-runner/src/util/checkable.rs b/crates/rebel-runner/src/util/checkable.rs new file mode 100644 index 0000000..8528d29 --- /dev/null +++ b/crates/rebel-runner/src/util/checkable.rs @@ -0,0 +1,37 @@ +use std::{ + io::{Error, ErrorKind, Result}, + process::ExitStatus, +}; + +use nix::sys::wait; + +pub trait Checkable { + fn check(&self) -> Result<()>; +} + +impl Checkable for ExitStatus { + fn check(&self) -> Result<()> { + if self.success() { + Ok(()) + } else { + Err(Error::new( + ErrorKind::Other, + format!("Process exited with {}", self), + )) + } + } +} + +impl Checkable for wait::WaitStatus { + fn check(&self) -> Result<()> { + let message = match self { + wait::WaitStatus::Exited(_, 0) => return Ok(()), + wait::WaitStatus::Exited(_, code) => format!("Process exited with exit code: {}", code), + wait::WaitStatus::Signaled(_, signal, _) => { + format!("Process exited with signal: {}", signal) + } + _ => format!("Process in unexpected status: {:?}", self), + }; + Err(Error::new(ErrorKind::Other, message)) + } +} diff --git a/crates/rebel-runner/src/util/cjson.rs b/crates/rebel-runner/src/util/cjson.rs new file mode 100644 index 0000000..e3840ce --- /dev/null +++ b/crates/rebel-runner/src/util/cjson.rs @@ -0,0 +1,37 @@ +use std::{ + fs::File, + io::{self, Write}, + path::Path, +}; + +use digest::{self, Digest}; +use olpc_cjson::CanonicalFormatter; +use serde::Serialize; +use serde_json::error::Result; + +pub fn new_serializer<W: Write>(writer: W) -> serde_json::Serializer<W, CanonicalFormatter> { + serde_json::Serializer::with_formatter(writer, CanonicalFormatter::new()) +} + +pub fn to_writer<W: Write, T: ?Sized + Serialize>(writer: W, value: &T) -> Result<()> { + let mut ser = new_serializer(writer); + value.serialize(&mut ser) +} + +pub fn to_file<P: AsRef<Path>, T: ?Sized + Serialize>(path: P, value: &T) -> io::Result<()> { + let file = File::create(path)?; + to_writer(&file, value)?; + file.sync_all() +} + +pub fn to_string<T: ?Sized + Serialize>(value: &T) -> Result<String> { + let mut ret = Vec::new(); + to_writer(&mut ret, value)?; + Ok(String::from_utf8(ret).unwrap()) +} + +pub fn digest<D: Digest + Write, T: ?Sized + Serialize>(value: &T) -> Result<digest::Output<D>> { + let mut digest = <D as Digest>::new(); + to_writer(&mut digest, value)?; + Ok(digest.finalize()) +} diff --git a/crates/rebel-runner/src/util/clone.rs b/crates/rebel-runner/src/util/clone.rs new file mode 100644 index 0000000..51a31c3 --- /dev/null +++ b/crates/rebel-runner/src/util/clone.rs @@ -0,0 +1,59 @@ +use std::{mem, process}; + +use nix::{ + errno, sched, + unistd::{self, Pid}, +}; + +#[repr(C)] +#[derive(Debug, Default)] +struct CloneArgs { + flags: u64, + pidfd: u64, + child_tid: u64, + parent_tid: u64, + exit_signal: u64, + stack: u64, + stack_size: u64, + tls: u64, +} + +pub unsafe fn clone(flags: sched::CloneFlags) -> nix::Result<unistd::ForkResult> { + let mut args = CloneArgs { + flags: flags.bits() as u64, + exit_signal: libc::SIGCHLD as u64, + ..CloneArgs::default() + }; + let size = mem::size_of_val(&args) as libc::size_t; + + let pid = libc::syscall(libc::SYS_clone3, &mut args, size); + + #[allow(clippy::comparison_chain)] + if pid < 0 { + Err(errno::Errno::last()) + } else if pid == 0 { + Ok(unistd::ForkResult::Child) + } else { + Ok(unistd::ForkResult::Parent { + child: Pid::from_raw(pid as libc::pid_t), + }) + } +} + +pub unsafe fn spawn<F>(flags: Option<sched::CloneFlags>, f: F) -> nix::Result<Pid> +where + F: FnOnce(), +{ + let res = if let Some(flags) = flags { + clone(flags) + } else { + unistd::fork() + }; + match res? { + unistd::ForkResult::Parent { child } => Ok(child), + unistd::ForkResult::Child => { + f(); + process::exit(0) + } + } +} diff --git a/crates/rebel-runner/src/util/fs.rs b/crates/rebel-runner/src/util/fs.rs new file mode 100644 index 0000000..9e33eb7 --- /dev/null +++ b/crates/rebel-runner/src/util/fs.rs @@ -0,0 +1,127 @@ +use std::{ + fs::{self, File}, + io, + os::unix::prelude::*, + path::{Path, PathBuf}, +}; + +use nix::{ + fcntl::OFlag, + mount::{self, MsFlags}, + unistd, +}; + +use rebel_common::error::*; + +pub fn open<P: AsRef<Path>>(path: P) -> Result<fs::File> { + fs::File::open(path.as_ref()) + .with_context(|| format!("Failed to open file {:?} for reading", path.as_ref())) +} + +pub fn create<P: AsRef<Path>>(path: P) -> Result<fs::File> { + fs::File::create(path.as_ref()) + .with_context(|| format!("Failed to open file {:?} for writing", path.as_ref())) +} + +pub fn rename<P1: AsRef<Path>, P2: AsRef<Path>>(from: P1, to: P2) -> Result<()> { + fs::rename(from.as_ref(), to.as_ref()) + .with_context(|| format!("Failed to rename {:?} to {:?}", from.as_ref(), to.as_ref())) +} + +// Unlike fs::copy, this doesn't preserve file flags +pub fn copy<P1: AsRef<Path>, P2: AsRef<Path>>(from: P1, to: P2) -> Result<()> { + (|| -> Result<()> { + let mut src = open(from.as_ref())?; + let mut dest = create(to.as_ref())?; + io::copy(&mut src, &mut dest)?; + dest.sync_all()?; + Ok(()) + })() + .with_context(|| format!("Failed to copy {:?} to {:?}", from.as_ref(), to.as_ref())) +} + +pub fn mkdir<P: AsRef<Path>>(path: P) -> Result<()> { + let mut builder = fs::DirBuilder::new(); + builder.recursive(true); + builder + .create(path.as_ref()) + .with_context(|| format!("Failed to create directory {:?}", path.as_ref())) +} + +pub fn ensure_removed<P: AsRef<Path>>(path: P) -> Result<()> { + let result = if path.as_ref().is_dir() { + fs::remove_dir_all(path.as_ref()) + } else { + fs::remove_file(path.as_ref()) + }; + result + .or_else(|err| match err.kind() { + io::ErrorKind::NotFound => Ok(()), + _ => Err(err), + }) + .with_context(|| format!("Failed to delete {:?}", path.as_ref())) +} + +pub fn is_dir_empty<P: AsRef<Path>>(path: P) -> Result<bool> { + Ok(fs::read_dir(path)?.next().is_none()) +} + +/// Fixes up weirdness of set-group-ID or bsdgroups +pub fn fixup_permissions<P: AsRef<Path>>(path: P) -> Result<()> { + let path = path.as_ref(); + let gid = unistd::getegid(); + + let metadata = path + .metadata() + .with_context(|| format!("Failed to get metadata of {:?}", path))?; + + if metadata.gid() != gid.as_raw() { + unistd::chown(path, None, Some(gid)) + .with_context(|| format!("Failed to set group of {:?}", path))?; + } + + let mut perms = metadata.permissions(); + let mode = perms.mode(); + if (mode & 0o777) != mode { + perms.set_mode(mode & 0o777); + std::fs::set_permissions(path, perms) + .with_context(|| format!("Failed to set mode of {:?}", path))?; + } + + Ok(()) +} + +#[must_use] +pub struct Mount(PathBuf); + +impl Drop for Mount { + fn drop(&mut self) { + mount::umount(&self.0) + .with_context(|| format!("Failed to unmount {:?}", self.0)) + .unwrap(); + } +} + +pub fn mount<P1: AsRef<Path>, P2: AsRef<Path>>( + source: P1, + target: P2, + fstype: Option<&str>, + flags: MsFlags, + data: Option<&str>, +) -> Result<Mount> { + mkdir(target.as_ref()).with_context(|| format!("Failed to create {:?}", target.as_ref()))?; + + let canon_target = target + .as_ref() + .canonicalize() + .with_context(|| format!("Failed to get absolute path for {:?}", target.as_ref()))?; + mount::mount(Some(source.as_ref()), &canon_target, fstype, flags, data) + .with_context(|| format!("Failed to mount {:?}", canon_target))?; + Ok(Mount(canon_target)) +} + +pub fn pipe() -> Result<(File, File)> { + unistd::pipe2(OFlag::O_CLOEXEC) + .context("pipe2()") + .map(|(piper, pipew)| (File::from(piper), File::from(pipew))) +} diff --git a/crates/rebel-runner/src/util/mod.rs b/crates/rebel-runner/src/util/mod.rs new file mode 100644 index 0000000..0fbe3b5 --- /dev/null +++ b/crates/rebel-runner/src/util/mod.rs @@ -0,0 +1,7 @@ +pub mod checkable; +pub mod cjson; +pub mod clone; +pub mod fs; +pub mod stack; +pub mod steal; +pub mod unix; diff --git a/crates/rebel-runner/src/util/stack.rs b/crates/rebel-runner/src/util/stack.rs new file mode 100644 index 0000000..15d5daf --- /dev/null +++ b/crates/rebel-runner/src/util/stack.rs @@ -0,0 +1,25 @@ +use std::mem; + +/// Simple inefficient datastructure with guaranteed drop order +#[derive(Debug)] +pub enum Stack<T> { + Cons(Box<(T, Stack<T>)>), + Empty, +} + +impl<T> Default for Stack<T> { + fn default() -> Self { + Self::Empty + } +} + +impl<T> Stack<T> { + pub fn new() -> Self { + Self::Empty + } + + pub fn push(&mut self, value: T) { + let tmp = mem::take(self); + *self = Stack::Cons(Box::new((value, tmp))); + } +} diff --git a/crates/rebel-runner/src/util/steal.rs b/crates/rebel-runner/src/util/steal.rs new file mode 100644 index 0000000..91b2cdf --- /dev/null +++ b/crates/rebel-runner/src/util/steal.rs @@ -0,0 +1,40 @@ +use std::ops::{Deref, DerefMut}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +pub struct Steal<T>(pub Option<T>); + +impl<T> Steal<T> { + pub fn new(value: T) -> Steal<T> { + Steal(Some(value)) + } + + pub fn steal(&mut self) -> T { + self.0 + .take() + .expect("Attempted to steal already stoken value") + } +} + +impl<T> From<T> for Steal<T> { + fn from(value: T) -> Self { + Steal::new(value) + } +} + +impl<T> Deref for Steal<T> { + type Target = T; + + fn deref(&self) -> &Self::Target { + self.0 + .as_ref() + .expect("Attempted to dereference stolen value") + } +} + +impl<T> DerefMut for Steal<T> { + fn deref_mut(&mut self) -> &mut Self::Target { + self.0 + .as_mut() + .expect("Attempted to dereference stolen value") + } +} diff --git a/crates/rebel-runner/src/util/unix.rs b/crates/rebel-runner/src/util/unix.rs new file mode 100644 index 0000000..a97b1db --- /dev/null +++ b/crates/rebel-runner/src/util/unix.rs @@ -0,0 +1,86 @@ +use std::{fs::File, os::unix::prelude::*, path::Path}; + +use nix::{ + fcntl::{self, FcntlArg, FdFlag, Flock, OFlag}, + sched, + unistd::Pid, +}; + +use rebel_common::error::*; + +use super::fs; + +pub fn set_blocking<Fd: AsFd>(fd: &Fd, blocking: bool) -> Result<()> { + let raw_fd = fd.as_fd().as_raw_fd(); + + let flags = + OFlag::from_bits_retain(fcntl::fcntl(raw_fd, FcntlArg::F_GETFL).context("fcntl(F_GETFL)")?); + + let new_flags = if blocking { + flags & !OFlag::O_NONBLOCK + } else { + flags | OFlag::O_NONBLOCK + }; + + if new_flags != flags { + fcntl::fcntl(raw_fd, FcntlArg::F_SETFL(new_flags)).context("fcntl(F_SETFL)")?; + } + + Ok(()) +} + +pub fn set_cloexec<Fd: AsFd>(fd: &Fd, cloexec: bool) -> Result<()> { + let raw_fd = fd.as_fd().as_raw_fd(); + + let flags = FdFlag::from_bits_retain( + fcntl::fcntl(raw_fd, FcntlArg::F_GETFD).context("fcntl(F_GETFD)")?, + ); + + let new_flags = if cloexec { + flags | FdFlag::FD_CLOEXEC + } else { + flags & !FdFlag::FD_CLOEXEC + }; + + if new_flags != flags { + fcntl::fcntl(raw_fd, FcntlArg::F_SETFD(new_flags)).context("fcntl(F_SETFD)")?; + } + + Ok(()) +} + +pub fn nproc() -> Result<usize> { + const MAXCPU: usize = sched::CpuSet::count(); + + let affinity = sched::sched_getaffinity(Pid::from_raw(0)).context("sched_getaffinity()")?; + + let mut count = 0; + + for cpu in 0..MAXCPU { + if affinity.is_set(cpu).unwrap() { + count += 1; + } + } + + Ok(count) +} + +pub fn lock<P: AsRef<Path>>(path: P, exclusive: bool, blocking: bool) -> Result<Flock<File>> { + use fcntl::FlockArg::*; + + if let Some(parent) = path.as_ref().parent() { + fs::mkdir(parent)?; + } + + let arg = match (exclusive, blocking) { + (true, true) => LockExclusive, + (true, false) => LockExclusiveNonblock, + (false, true) => LockShared, + (false, false) => LockSharedNonblock, + }; + + let file = fs::create(path.as_ref())?; + fcntl::Flock::lock(file, arg) + .map_err(|(_, errno)| errno) + .with_context(|| format!("flock failed on {:?}", path.as_ref())) +} |