diff options
Diffstat (limited to 'crates/runner')
-rw-r--r-- | crates/runner/Cargo.toml | 25 | ||||
-rw-r--r-- | crates/runner/src/init.rs | 68 | ||||
-rw-r--r-- | crates/runner/src/jobserver.rs | 77 | ||||
-rw-r--r-- | crates/runner/src/lib.rs | 204 | ||||
-rw-r--r-- | crates/runner/src/ns.rs | 84 | ||||
-rw-r--r-- | crates/runner/src/paths.rs | 118 | ||||
-rw-r--r-- | crates/runner/src/tar.rs | 105 | ||||
-rw-r--r-- | crates/runner/src/task.rs | 620 | ||||
-rw-r--r-- | crates/runner/src/util/checkable.rs | 37 | ||||
-rw-r--r-- | crates/runner/src/util/cjson.rs | 37 | ||||
-rw-r--r-- | crates/runner/src/util/clone.rs | 57 | ||||
-rw-r--r-- | crates/runner/src/util/fs.rs | 127 | ||||
-rw-r--r-- | crates/runner/src/util/mod.rs | 7 | ||||
-rw-r--r-- | crates/runner/src/util/stack.rs | 25 | ||||
-rw-r--r-- | crates/runner/src/util/steal.rs | 40 | ||||
-rw-r--r-- | crates/runner/src/util/unix.rs | 84 |
16 files changed, 0 insertions, 1715 deletions
diff --git a/crates/runner/Cargo.toml b/crates/runner/Cargo.toml deleted file mode 100644 index 19ad124..0000000 --- a/crates/runner/Cargo.toml +++ /dev/null @@ -1,25 +0,0 @@ -[package] -name = "rebel-runner" -version = "0.1.0" -authors = ["Matthias Schiffer <mschiffer@universe-factory.net>"] -license = "MIT" -edition = "2021" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -common = { path = "../common", package = "rebel-common" } - -bincode = "1.3.3" -blake3 = { version = "1.0.0", features = ["traits-preview"] } -capctl = "0.2.0" -digest = "0.9.0" -libc = "0.2.84" -nix = "0.23.0" -olpc-cjson = "0.1.0" -serde = { version = "1", features = ["derive"] } -serde_json = "1.0.62" -tar = "0.4.32" -tee_readwrite = "0.1.0" -uds = "0.2.6" -walkdir = "2.3.2" diff --git a/crates/runner/src/init.rs b/crates/runner/src/init.rs deleted file mode 100644 index ede8fd8..0000000 --- a/crates/runner/src/init.rs +++ /dev/null @@ -1,68 +0,0 @@ -use nix::mount::{self, MsFlags}; - -use common::error::*; - -use crate::{paths, util::fs}; - -fn prepare_dev(path: &str) -> Result<()> { - fs::mkdir(path)?; - mount::mount::<_, _, str, str>(Some(path), path, None, MsFlags::MS_BIND, None) - .context("Failed to bind mount container /dev")?; - - for dir in ["pts", "shm"] { - fs::mkdir(paths::join(&[path, dir]))?; - } - - for (link, target) in [ - ("fd", "/proc/self/fd"), - ("stdin", "/proc/self/fd/0"), - ("stdout", "/proc/self/fd/1"), - ("stderr", "/proc/self/fd/2"), - ("ptmx", "pts/ptmx"), - ] { - let path = paths::join(&[path, link]); - std::os::unix::fs::symlink(target, &path) - .with_context(|| format!("Failed to create link {}", path))?; - } - - for dev in ["null", "zero", "full", "random", "urandom", "tty"] { - let source = paths::join(&["/dev", dev]); - let target = paths::join(&[path, dev]); - fs::create(&target)?; - mount::mount::<str, str, str, str>(Some(&source), &target, None, MsFlags::MS_BIND, None) - .with_context(|| format!("Failed to bind mount {}", source))?; - } - - mount::mount::<str, _, str, str>( - None, - path, - None, - MsFlags::MS_REMOUNT | MsFlags::MS_BIND | MsFlags::MS_RDONLY, - None, - ) - .context("Failed to mount container /dev read-only")?; - - Ok(()) -} - -pub fn init_runner() -> Result<()> { - fs::mkdir(paths::LAYER_STATE_DIR)?; - fs::mkdir(paths::OUTPUT_STATE_DIR)?; - - fs::ensure_removed(paths::TMP_DIR)?; - fs::mkdir(paths::TMP_DIR)?; - mount::mount::<_, _, str, str>( - Some(paths::TMP_DIR), - paths::TMP_DIR, - None, - MsFlags::MS_BIND, - None, - ) - .context("Failed to bind mount build tmpdir")?; - mount::mount::<str, _, str, str>(None, paths::TMP_DIR, None, MsFlags::MS_PRIVATE, None) - .context("Failed to set MS_PRIVATE for build tmpdir")?; - - prepare_dev(paths::DEV_DIR)?; - - Ok(()) -} diff --git a/crates/runner/src/jobserver.rs b/crates/runner/src/jobserver.rs deleted file mode 100644 index 3b6c856..0000000 --- a/crates/runner/src/jobserver.rs +++ /dev/null @@ -1,77 +0,0 @@ -use std::{os::unix::prelude::RawFd, slice}; - -use nix::{errno::Errno, fcntl::OFlag, poll, unistd}; - -use common::error::*; - -use super::util::unix; - -#[derive(Debug)] -pub struct Jobserver { - tokens: usize, - r: RawFd, - w: RawFd, -} - -impl Jobserver { - pub fn new(tokens: usize) -> Result<Jobserver> { - let (r, w) = unistd::pipe2(OFlag::O_CLOEXEC | OFlag::O_NONBLOCK).context("pipe()")?; - - for _ in 0..tokens { - if let Err(_) = unistd::write(w, b"+") { - break; - } - } - unix::set_blocking(w, true)?; - - Ok(Jobserver { tokens, r, w }) - } - - pub fn wait(&mut self) -> u8 { - loop { - poll::poll( - &mut [poll::PollFd::new(self.r, poll::PollFlags::POLLIN)], - -1, - ) - .expect("poll()"); - - let mut token = 0; - match unistd::read(self.r, slice::from_mut(&mut token)) { - Ok(n) => { - assert!(n == 1); - return token; - } - Err(Errno::EAGAIN) => { - // Token was sniped by another task - continue; - } - error @ Err(_) => { - error.expect("read()"); - } - } - } - } - - pub fn post(&mut self, token: u8) { - let n = unistd::write(self.w, slice::from_ref(&token)).expect("write()"); - assert!(n == 1); - } - - pub fn set_cloexec(&mut self, cloexec: bool) -> Result<()> { - unix::set_cloexec(self.r, cloexec)?; - unix::set_cloexec(self.w, cloexec)?; - Ok(()) - } - - pub fn to_makeflags(&self) -> String { - format!(" -j{} --jobserver-auth={},{}", self.tokens, self.r, self.w) - } -} - -impl Drop for Jobserver { - fn drop(&mut self) { - // FIXME Logging - let _ = unistd::close(self.r); - let _ = unistd::close(self.w); - } -} diff --git a/crates/runner/src/lib.rs b/crates/runner/src/lib.rs deleted file mode 100644 index cfaf658..0000000 --- a/crates/runner/src/lib.rs +++ /dev/null @@ -1,204 +0,0 @@ -mod init; -mod jobserver; -mod ns; -mod paths; -mod tar; -mod task; -mod util; - -use std::{ - collections::HashSet, - fs::File, - net, - os::unix::{net::UnixStream, prelude::*}, - process, slice, -}; - -use capctl::prctl; -use nix::{ - errno::Errno, - poll, - sched::CloneFlags, - sys::{ - signal, - signalfd::{SfdFlags, SignalFd}, - stat, wait, - }, - unistd::{self, Gid, Pid, Uid}, -}; -use uds::UnixSeqpacketConn; - -use common::{error::*, types::*}; - -use jobserver::Jobserver; -use util::{checkable::Checkable, clone, steal::Steal, unix}; - -#[derive(Debug, Clone)] -pub struct Options { - pub jobs: Option<usize>, -} - -#[derive(Debug)] -struct RunnerContext { - socket: Steal<UnixSeqpacketConn>, - jobserver: Jobserver, - tasks: HashSet<Pid>, -} - -fn handle_sigchld(ctx: &mut RunnerContext) -> Result<()> { - loop { - let status = match wait::waitpid(Pid::from_raw(-1), Some(wait::WaitPidFlag::WNOHANG)) { - Ok(wait::WaitStatus::StillAlive) | Err(Errno::ECHILD) => return Ok(()), - res => res.expect("waitpid()"), - }; - let pid = status.pid().unwrap(); - if ctx.tasks.remove(&pid) { - status.check()?; - } - } -} - -fn handle_request(ctx: &mut RunnerContext, request_socket: UnixStream) { - let run = || { - ctx.socket.steal(); - - let task: Task = - bincode::deserialize_from(&request_socket).expect("Failed to decode task description"); - - prctl::set_name(&task.label).expect("prctl(PR_SET_NAME)"); - - let result = task::handle(task, &mut ctx.jobserver); - bincode::serialize_into(&request_socket, &result).expect("Failed to send task result"); - drop(request_socket); - }; - - let pid = unsafe { clone::spawn(None, run) }.expect("fork()"); - assert!(ctx.tasks.insert(pid)); -} - -fn handle_socket(ctx: &mut RunnerContext) -> bool { - let mut fd = 0; - - match ctx - .socket - .recv_fds(&mut [0], slice::from_mut(&mut fd)) - .expect("recv_fds()") - { - (1, _, n_fd) => { - assert!(n_fd == 1); - } - _ => return false, - } - - let request_socket = unsafe { UnixStream::from_raw_fd(fd) }; - handle_request(ctx, request_socket); - true -} - -fn runner(uid: Uid, gid: Gid, socket: UnixSeqpacketConn, _lockfile: File, options: &Options) -> ! { - ns::mount_proc(); - ns::setup_userns(Uid::from_raw(0), Gid::from_raw(0), uid, gid); - - stat::umask(stat::Mode::from_bits_truncate(0o022)); - - init::init_runner().unwrap(); - - let jobs = options - .jobs - .unwrap_or_else(|| unix::nproc().expect("Failed to get number of available CPUs")); - let jobserver = Jobserver::new(jobs).expect("Failed to initialize jobserver pipe"); - let mut ctx = RunnerContext { - socket: socket.into(), - jobserver, - tasks: HashSet::new(), - }; - - let mut signals = signal::SigSet::empty(); - signals.add(signal::Signal::SIGCHLD); - signal::pthread_sigmask(signal::SigmaskHow::SIG_BLOCK, Some(&signals), None) - .expect("pthread_sigmask()"); - let mut sfd = SignalFd::with_flags(&signals, SfdFlags::SFD_CLOEXEC) - .expect("Failed to create signal file descriptor"); - - let mut pollfds = [ - poll::PollFd::new(sfd.as_raw_fd(), poll::PollFlags::POLLIN), - poll::PollFd::new(ctx.socket.as_raw_fd(), poll::PollFlags::POLLIN), - ]; - - loop { - poll::poll(&mut pollfds, -1).expect("poll()"); - - let events = pollfds[0] - .revents() - .expect("Unknown events in poll() return"); - if events.contains(poll::PollFlags::POLLIN) { - let _signal = sfd.read_signal().expect("read_signal()").unwrap(); - handle_sigchld(&mut ctx).expect("Task process exited abnormally"); - } else if events.intersects(!poll::PollFlags::POLLIN) { - panic!("Unexpected error status for signal file descriptor"); - } - - let events = pollfds[1] - .revents() - .expect("Unknown events in poll() return"); - if events.contains(poll::PollFlags::POLLIN) { - if !handle_socket(&mut ctx) { - break; - } - } else if events.intersects(!poll::PollFlags::POLLIN) { - panic!("Unexpected error status for socket file descriptor"); - } - } - - process::exit(0); -} - -pub struct Runner { - socket: UnixSeqpacketConn, -} - -impl Runner { - /// Creates a new container runner - /// - /// # Safety - /// - /// Do not call in multithreaded processes. - pub unsafe fn new(options: &Options) -> Result<Self> { - let lockfile = unix::lock(paths::LOCKFILE, true, false) - .context("Failed to get lock on build directory, is another instance running?")?; - - let uid = unistd::geteuid(); - let gid = unistd::getegid(); - - let (local, remote) = UnixSeqpacketConn::pair().expect("socketpair()"); - - match clone::clone( - CloneFlags::CLONE_NEWUSER | CloneFlags::CLONE_NEWNS | CloneFlags::CLONE_NEWPID, - ) - .expect("clone()") - { - unistd::ForkResult::Parent { .. } => Ok(Runner { socket: local }), - unistd::ForkResult::Child => { - drop(local); - runner(uid, gid, remote, lockfile, options); - } - } - } - - pub fn spawn(&self, task: &Task) -> UnixStream { - let (local, remote) = UnixStream::pair().expect("socketpair()"); - - self.socket - .send_fds(&[0], &[remote.as_raw_fd()]) - .expect("send()"); - - bincode::serialize_into(&local, task).expect("Task submission failed"); - local.shutdown(net::Shutdown::Write).expect("shutdown()"); - - local - } - - pub fn result(socket: &UnixStream) -> Result<TaskOutput> { - bincode::deserialize_from(socket).expect("Failed to read task result") - } -} diff --git a/crates/runner/src/ns.rs b/crates/runner/src/ns.rs deleted file mode 100644 index 3a8b51f..0000000 --- a/crates/runner/src/ns.rs +++ /dev/null @@ -1,84 +0,0 @@ -use nix::{ - mount::{self, MntFlags, MsFlags}, - sched::CloneFlags, - unistd::{self, Gid, Pid, Uid}, -}; - -use common::error::*; - -use super::util::clone; - -pub fn mount_proc() { - mount::mount::<_, _, _, str>(Some("proc"), "/proc", Some("proc"), MsFlags::empty(), None) - .expect("Failed to mount /proc"); -} - -pub fn setup_userns(inner_uid: Uid, inner_gid: Gid, outer_uid: Uid, outer_gid: Gid) { - std::fs::write("/proc/self/setgroups", "deny").expect("Failed to write /proc/self/setgroups"); - std::fs::write( - "/proc/self/uid_map", - &format!("{} {} 1", inner_uid, outer_uid), - ) - .expect("Failed to write /proc/self/uid_map"); - std::fs::write( - "/proc/self/gid_map", - &format!("{} {} 1", inner_gid, outer_gid), - ) - .expect("Failed to write /proc/self/gid_map"); -} - -pub unsafe fn spawn<F>(flags: CloneFlags, f: F) -> nix::Result<Pid> -where - F: FnOnce(), -{ - assert!(flags.contains(CloneFlags::CLONE_NEWNS) || !flags.contains(CloneFlags::CLONE_NEWPID)); - - clone::spawn(Some(flags), || { - if flags.contains(CloneFlags::CLONE_NEWPID) { - mount_proc(); - } - f() - }) -} - -pub fn pivot_root(path: &str) { - (|| -> Result<()> { - unistd::chdir(path).context("chdir()")?; - mount::mount::<_, _, str, str>(Some("/proc"), "proc", None, MsFlags::MS_BIND, None) - .context("Failed to bind mount /proc")?; - unistd::pivot_root(".", ".").context("pivot_root()")?; - mount::umount2(".", MntFlags::MNT_DETACH).context("umount2()")?; - unistd::chdir("/").context("chdir(\"/\")")?; - Ok(()) - })() - .expect("Failed to pivot root"); -} - -pub fn container_mounts() -> Result<()> { - mount::mount( - Some("tmp"), - "/tmp", - Some("tmpfs"), - MsFlags::MS_NODEV | MsFlags::MS_NOSUID, - Some("mode=1777,size=1048576k"), - ) - .context("Failed to mount /tmp")?; - mount::mount( - Some("devpts"), - "/dev/pts", - Some("devpts"), - MsFlags::MS_NOSUID | MsFlags::MS_NOEXEC, - Some("newinstance,ptmxmode=0666,mode=0620"), - ) - .context("Failed to mount /dev/pts")?; - mount::mount( - Some("shm"), - "/dev/shm", - Some("tmpfs"), - MsFlags::MS_NOSUID | MsFlags::MS_NOEXEC | MsFlags::MS_NODEV, - Some("mode=1777,size=65536k"), - ) - .context("Failed to mount /dev/shm")?; - - Ok(()) -} diff --git a/crates/runner/src/paths.rs b/crates/runner/src/paths.rs deleted file mode 100644 index 7af1e3a..0000000 --- a/crates/runner/src/paths.rs +++ /dev/null @@ -1,118 +0,0 @@ -//! Build directory structure used through rebel -//! -//! # Current structure -//! -//! ```text -//! build/ -//! ├── build.lock -//! ├── downloads/ -//! │ └── ... -//! ├── state/ -//! │ ├── output/ -//! │ │ ├── <input hash>.tar.tmp # during packing -//! │ │ ├── <archive hash>.tar # files are renamed when packing is finished -//! │ │ └── ... -//! │ ├── layer/ -//! │ │ ├── <layer hash>/ # overlayfs layer dir of finished tasks -//! │ │ └── ... -//! │ └── task/ -//! │ ├── <input hash>/ -//! │ │ ├── layer/ # overlayfs layer dir (moved to layer/ after build) -//! │ │ ├── work/ # overlayfs work dir (discarded after build) -//! │ │ ├── task.json.tmp # during write -//! │ │ ├── task.json # after write -//! │ │ ├── task.log # stdout/stderr output of the task -//! │ │ └── task.lock # task lockfile -//! │ └── ... -//! └── tmp/ # temporary files (cleaned on start) -//! ├── dev/ # container /dev -//! ├── depends/ # unpacked dependencies -//! └── task/ -//! └── <input hash>/ -//! ├── build/ # mount point for /build directory -//! │ ├── downloads/ # downloaded sources -//! │ ├── task/ # internal runner files -//! │ └── work/ # build overlay mountpoint -//! └── rootfs/ # rootfs overlay mountpoint -//! ``` - -use common::string_hash::*; - -pub const DOWNLOADS_DIR: &str = "build/downloads"; -pub const PIN_DIR: &str = "build/pinned"; - -pub const TMP_DIR: &str = "build/tmp"; -pub const DEV_DIR: &str = "build/tmp/dev"; -pub const DEPENDS_TMP_DIR: &str = "build/tmp/depends"; -pub const TASK_TMP_DIR: &str = "build/tmp/task"; - -pub const TASK_TMP_ROOTFS_SUBDIR: &str = "rootfs"; - -pub const LOCKFILE: &str = "build/build.lock"; -pub const OUTPUT_STATE_DIR: &str = "build/state/output"; -pub const TASK_STATE_DIR: &str = "build/state/task"; -pub const LAYER_STATE_DIR: &str = "build/state/layer"; - -pub const TASK_STATE_LAYER_SUBDIR: &str = "layer"; -pub const TASK_STATE_WORK_SUBDIR: &str = "work"; - -pub const TASK_BUILDDIR: &str = "build"; -pub const TASK_TASKDIR: &str = "build/task"; - -pub const TASK_RUN: &str = "run"; - -pub fn join(paths: &[&str]) -> String { - paths.join("/") -} - -pub fn task_tmp_dir(hash: &InputHash) -> String { - join(&[TASK_TMP_DIR, &hash.to_string()]) -} - -pub fn task_state_dir(hash: &InputHash) -> String { - join(&[TASK_STATE_DIR, &hash.to_string()]) -} - -pub fn task_cache_tmp_filename(hash: &InputHash) -> String { - join(&[TASK_STATE_DIR, &hash.to_string(), "task.json.tmp"]) -} - -pub fn task_cache_filename(hash: &InputHash) -> String { - join(&[TASK_STATE_DIR, &hash.to_string(), "task.json"]) -} - -pub fn task_log_filename(hash: &InputHash) -> String { - join(&[TASK_STATE_DIR, &hash.to_string(), "task.log"]) -} - -pub fn task_lock_filename(hash: &InputHash) -> String { - join(&[TASK_STATE_DIR, &hash.to_string(), "task.lock"]) -} - -pub fn layer_dir(hash: &LayerHash) -> String { - join(&[LAYER_STATE_DIR, &hash.to_string()]) -} - -pub fn depend_tmp_dir(hash: &ArchiveHash) -> String { - join(&[DEPENDS_TMP_DIR, &hash.to_string()]) -} - -pub fn depend_dir(hash: &ArchiveHash) -> String { - join(&[DEPENDS_TMP_DIR, &hash.to_string()]) -} - -pub fn depend_lock_filename(hash: &ArchiveHash) -> String { - join(&[DEPENDS_TMP_DIR, &format!("{}.lock", hash.to_string())]) -} - -pub fn archive_tmp_filename(hash: &InputHash) -> String { - join(&[OUTPUT_STATE_DIR, &format!("{}.tar.tmp", hash)]) -} - -pub fn archive_filename(hash: &ArchiveHash) -> String { - join(&[OUTPUT_STATE_DIR, &format!("{}.tar", hash)]) -} - -pub fn pinned_archive_filename(name: &str) -> String { - join(&[PIN_DIR, &format!("{}.tar", name)]) -} diff --git a/crates/runner/src/tar.rs b/crates/runner/src/tar.rs deleted file mode 100644 index 10cac92..0000000 --- a/crates/runner/src/tar.rs +++ /dev/null @@ -1,105 +0,0 @@ -use std::{ - io::{self, Read, Write}, - os::unix::prelude::CommandExt, - path::Path, - process::{self, Command, Stdio}, -}; - -use nix::{ - mount::{self, MsFlags}, - sched::CloneFlags, - sys::wait, -}; - -use common::{error::*, string_hash::ArchiveHash}; - -use super::{ - ns, - util::{checkable::Checkable, fs}, -}; -use crate::paths; - -pub fn pack<W: Write, P: AsRef<Path>>( - rootfs_hash: &ArchiveHash, - archive: &mut W, - source: P, -) -> Result<()> { - let rootfs = paths::depend_dir(rootfs_hash); - let _rootfs_mount = fs::mount(&rootfs, &rootfs, None, MsFlags::MS_BIND, None) - .with_context(|| format!("Failed to bind mount rootfs to {:?}", rootfs))?; - mount::mount::<str, str, str, str>( - None, - &rootfs, - None, - MsFlags::MS_REMOUNT | MsFlags::MS_BIND | MsFlags::MS_RDONLY, - None, - ) - .context("Failed to mount container rootfs read-only")?; - - let (mut piper, pipew) = fs::pipe()?; - - let exec_tar = || -> Result<()> { - // We are in our own mount namespace, so mounting into the shared rootfs is fine - let dev_target = paths::join(&[&rootfs, "dev"]); - mount::mount::<_, _, str, str>( - Some(paths::DEV_DIR), - dev_target.as_str(), - None, - MsFlags::MS_BIND | MsFlags::MS_REC, - None, - )?; - let mount_target = paths::join(&[&rootfs, paths::TASK_BUILDDIR]); - mount::mount::<_, _, str, str>( - Some(source.as_ref()), - mount_target.as_str(), - None, - MsFlags::MS_BIND, - None, - )?; - - ns::pivot_root(&rootfs); - - let err = Command::new("tar") - .args(&[ - "-c", - "--sort=name", - "--numeric-owner", - "--owner=0", - "--group=0", - "--mtime=@0", - ".", - ]) - .stdin(Stdio::null()) - .stdout(pipew) - .current_dir(paths::TASK_BUILDDIR) - .env_clear() - .env("PATH", "/usr/sbin:/usr/bin:/sbin:/bin") - .exec(); - eprintln!("{}", err); - process::exit(127); - }; - - let pid = unsafe { ns::spawn(CloneFlags::CLONE_NEWNS, || exec_tar().unwrap()) } - .context("Failed to run tar")?; - - let result = io::copy(&mut piper, archive).context("Failed to write TAR archive"); - - wait::waitpid(pid, None)? - .check() - .context("tar did not exit successfully")?; - - result?; - Ok(()) -} - -pub fn unpack<R: Read, P: AsRef<Path>>(archive: R, dest: P) -> Result<()> { - fs::mkdir(&dest)?; - - let mut ar = tar::Archive::new(archive); - ar.set_preserve_permissions(true); - ar.set_preserve_mtime(true); - ar.set_unpack_xattrs(true); - ar.set_overwrite(false); - - ar.unpack(dest).context("Failed to unpack TAR archive") -} diff --git a/crates/runner/src/task.rs b/crates/runner/src/task.rs deleted file mode 100644 index 19a7484..0000000 --- a/crates/runner/src/task.rs +++ /dev/null @@ -1,620 +0,0 @@ -use std::{ - collections::{BTreeMap, HashMap, HashSet}, - io::{self, BufWriter}, - os::unix::prelude::CommandExt, - path::{Path, PathBuf}, - process::{self, Command, Stdio}, - time::Instant, -}; - -use capctl::prctl; -use nix::{ - mount::{self, MsFlags}, - sched::{unshare, CloneFlags}, - sys::wait, - unistd::{self, Gid, Uid}, -}; -use serde::Serialize; -use tee_readwrite::{TeeReader, TeeWriter}; - -use common::{error::*, string_hash::*, types::*}; -use walkdir::WalkDir; - -use super::{ - jobserver::Jobserver, - ns, tar, - util::{checkable::Checkable, cjson, fs}, -}; -use crate::{ - paths, - util::{stack::Stack, unix}, -}; - -const BUILD_UID: Uid = Uid::from_raw(1000); -const BUILD_GID: Gid = Gid::from_raw(1000); - -type InputHasher = blake3::Hasher; -type DependencyHasher = blake3::Hasher; -type LayerHasher = blake3::Hasher; -type ArchiveHasher = blake3::Hasher; - -type DependMap = BTreeMap<String, Vec<ArchiveHash>>; - -fn dependency_hash(dep: &Dependency) -> DependencyHash { - DependencyHash(StringHash( - cjson::digest::<DependencyHasher, _>(dep).unwrap().into(), - )) -} - -fn input_hash(task: &Task) -> InputHash { - #[derive(Debug, Serialize)] - struct HashInput<'a> { - pub command: &'a str, - pub workdir: &'a str, - pub rootfs: &'a ArchiveHash, - pub inherit: &'a [LayerHash], - pub depends: HashMap<DependencyHash, &'a Dependency>, - pub outputs: &'a HashMap<String, String>, - } - let input = HashInput { - command: &task.command, - workdir: &task.workdir, - rootfs: &task.rootfs, - inherit: &task.inherit, - depends: task - .depends - .iter() - .map(|dep| (dependency_hash(dep), dep)) - .collect(), - outputs: &task.outputs, - }; - - InputHash(StringHash( - cjson::digest::<InputHasher, _>(&input).unwrap().into(), - )) -} - -fn init_task(input_hash: &InputHash, task: &Task) -> Result<fs::Mount> { - // Remove metadata first to ensure task invalidation - fs::ensure_removed(&paths::task_cache_filename(input_hash))?; - - let task_state_dir = paths::task_state_dir(input_hash); - - let task_layer_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_LAYER_SUBDIR]); - fs::ensure_removed(&task_layer_dir)?; - fs::mkdir(&task_layer_dir)?; - fs::fixup_permissions(&task_layer_dir)?; - - let task_tmp_dir = paths::task_tmp_dir(input_hash); - - let taskdir = paths::join(&[&task_tmp_dir, paths::TASK_TASKDIR]); - fs::mkdir(&taskdir)?; - let runfile = paths::join(&[&taskdir, paths::TASK_RUN]); - std::fs::write(&runfile, &task.command) - .with_context(|| format!("Failed to write {}", runfile))?; - - let mount_target = paths::join(&[&task_tmp_dir, &task.workdir]); - let mount = if task.inherit.is_empty() { - fs::mount(task_layer_dir, &mount_target, None, MsFlags::MS_BIND, None) - .with_context(|| format!("Failed to bind mount to {:?}", mount_target))? - } else { - let task_work_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_WORK_SUBDIR]); - fs::ensure_removed(&task_work_dir)?; - fs::mkdir(&task_work_dir)?; - fs::fixup_permissions(&task_work_dir)?; - - let lower = task - .inherit - .iter() - .rev() - .map(paths::layer_dir) - .collect::<Vec<_>>() - .join(":"); - let options = format!( - "xino=off,index=off,metacopy=off,lowerdir={lower},upperdir={upper},workdir={work}", - lower = lower, - upper = task_layer_dir, - work = task_work_dir - ); - fs::mount( - "overlay", - &mount_target, - Some("overlay"), - MsFlags::empty(), - Some(&options), - ) - .with_context(|| format!("Failed to mount work overlay to {:?}", mount_target))? - }; - - Ok(mount) -} - -fn get_contents<P1: AsRef<Path>, P2: AsRef<Path>>( - path: P1, - prefix: P2, -) -> Result<(HashSet<PathBuf>, HashSet<PathBuf>)> { - let mut dirs = HashSet::new(); - let mut files = HashSet::new(); - - let root: PathBuf = Path::new("/").join(prefix.as_ref()); - - for result in WalkDir::new(path.as_ref()).min_depth(1).into_iter() { - let entry = result - .with_context(|| format!("Failed to list contents of directory {:?}", path.as_ref()))?; - let is_dir = entry.file_type().is_dir(); - let entry_path = root.join(entry.into_path().strip_prefix(path.as_ref()).unwrap()); - if is_dir { - dirs.insert(entry_path); - } else { - files.insert(entry_path); - } - } - - dirs.insert(root); - - Ok((dirs, files)) -} - -fn check_conflicts( - dirs1: &HashSet<PathBuf>, - files1: &HashSet<PathBuf>, - dirs2: &HashSet<PathBuf>, - files2: &HashSet<PathBuf>, -) -> Result<()> { - let mut conflicts = Vec::new(); - - conflicts.extend(files1.intersection(files2)); - conflicts.extend(dirs1.intersection(files2)); - conflicts.extend(files1.intersection(dirs2)); - - if !conflicts.is_empty() { - let mut conflict_strings: Box<[_]> = conflicts - .into_iter() - .map(|path| path.to_string_lossy().to_string()) - .collect(); - conflict_strings.sort(); - return Err(Error::new(format!( - "Found the following file conflicts in dependencies:\n{}", - conflict_strings.join("\n") - ))); - } - - Ok(()) -} - -fn init_task_rootfs(input_hash: &InputHash, task: &Task) -> Result<Stack<fs::Mount>> { - let task_tmp_dir = paths::task_tmp_dir(input_hash); - let mount_target = paths::join(&[&task_tmp_dir, paths::TASK_TMP_ROOTFS_SUBDIR]); - let rootfs = paths::depend_dir(&task.rootfs); - - let depends = unpack_dependencies(input_hash, task).context("Failed to unpack dependencies")?; - - let mut mounts = Stack::new(); - - mounts.push( - fs::mount(rootfs, &mount_target, None, MsFlags::MS_BIND, None) - .with_context(|| format!("Failed to bind mount rootfs to {:?}", mount_target))?, - ); - mount::mount::<str, str, str, str>( - None, - &mount_target, - None, - MsFlags::MS_REMOUNT | MsFlags::MS_BIND | MsFlags::MS_RDONLY, - None, - ) - .context("Failed to mount container rootfs read-only")?; - - let (mut dirs, mut files) = get_contents(&mount_target, "")?; - - for (path, dep_hashes) in depends { - assert!(!dep_hashes.is_empty()); - - if !path.is_empty() && !path.starts_with('/') { - return Err(Error::new(format!( - "Dependency path {:?} must be absolute", - path - ))); - } - - let dep_target = mount_target.clone() + &path; - let dep_paths: Box<[_]> = dep_hashes.iter().map(paths::depend_dir).collect(); - - for dep in dep_paths.iter() { - let (dep_dirs, dep_files) = get_contents(dep, &path)?; - check_conflicts(&dirs, &files, &dep_dirs, &dep_files)?; - dirs.extend(dep_dirs); - files.extend(dep_files); - } - - let options = format!( - "xino=off,index=off,metacopy=off,lowerdir={lower}:{base}", - lower = dep_paths.join(":"), - base = dep_target, - ); - - mounts.push( - fs::mount( - "overlay", - dep_target.as_str(), - Some("overlay"), - MsFlags::MS_RDONLY, - Some(&options), - ) - .with_context(|| format!("Failed to mount overlay to {:?}", dep_target))?, - ); - } - - Ok(mounts) -} - -fn cleanup_task(input_hash: &InputHash) -> Result<()> { - let task_tmp_dir = paths::task_tmp_dir(input_hash); - - fs::ensure_removed(&task_tmp_dir).context("Failed to remove task tmp dir")?; - - let task_state_dir = paths::task_state_dir(input_hash); - let task_work_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_WORK_SUBDIR]); - fs::ensure_removed(&task_work_dir).context("Failed to remove overlayfs workdir")?; - - Ok(()) -} - -fn get_archive_filename(task: &Task, hash: &ArchiveHash) -> String { - if let Some(pinned_name) = task.pins.get(hash) { - paths::pinned_archive_filename(pinned_name) - } else { - paths::archive_filename(hash) - } -} - -fn unpack_dependency(task: &Task, hash: &ArchiveHash) -> Result<()> { - let _lock = unix::lock(paths::depend_lock_filename(hash), true, true); - - let filename = get_archive_filename(task, hash); - - let dest = paths::depend_dir(hash); - if Path::new(&dest).is_dir() { - return Ok(()); - } - - (|| -> Result<()> { - let tmp_dest = paths::depend_tmp_dir(hash); - fs::ensure_removed(&tmp_dest)?; - - let file = fs::open(&filename)?; - let hasher = ArchiveHasher::new(); - let buffered_hasher = BufWriter::with_capacity(16 * 1024 * 1024, hasher); - let mut reader = TeeReader::new(file, buffered_hasher, false); - - tar::unpack(&mut reader, &tmp_dest)?; - - // Read file to end to get the correct hash - io::copy(&mut reader, &mut io::sink())?; - - let (_, buffered_hasher) = reader.into_inner(); - let hasher = buffered_hasher.into_inner()?; - - let actual_hash = ArchiveHash(StringHash(hasher.finalize().into())); - - if &actual_hash != hash { - return Err(Error::new(format!( - "Incorrect file hash for {:?} (expected: {}, actual: {})", - filename, hash, actual_hash, - ))); - } - - fs::rename(&tmp_dest, &dest)?; - - Ok(()) - })() - .with_context(|| format!("Failed to unpack {:?}", filename)) -} - -fn unpack_dependencies(input_hash: &InputHash, task: &Task) -> Result<DependMap> { - let task_tmp_dir = paths::task_tmp_dir(input_hash); - - unpack_dependency(task, &task.rootfs)?; - - let mut ret = DependMap::new(); - - for dep in &task.depends { - match dep { - Dependency::Fetch { - name, target_dir, .. - } => { - let path = paths::join(&[&task_tmp_dir, target_dir]); - fs::mkdir(&path)?; - fs::copy( - paths::join(&[paths::DOWNLOADS_DIR, name]), - paths::join(&[&path, name]), - )?; - } - Dependency::Task { output, path } => { - unpack_dependency(task, output)?; - ret.entry(path.clone()).or_default().push(*output); - } - } - } - - Ok(ret) -} - -fn collect_output(input_hash: &InputHash, task: &Task, path: &str) -> Result<Option<ArchiveHash>> { - let source = paths::join(&[&paths::task_tmp_dir(input_hash), path]); - if !Path::new(&source).is_dir() { - return Ok(None); - } - - let filename = paths::archive_tmp_filename(input_hash); - - let hash = (|| -> Result<ArchiveHash> { - let file = fs::create(&filename)?; - let hasher = ArchiveHasher::new(); - let writer = TeeWriter::new(file, hasher); - let mut buffered_writer = BufWriter::with_capacity(16 * 1024 * 1024, writer); - - super::tar::pack(&task.rootfs, &mut buffered_writer, &source)?; - - let writer = buffered_writer.into_inner()?; - let (file, hasher) = writer.into_inner(); - file.sync_all()?; - drop(file); - - Ok(ArchiveHash(StringHash(hasher.finalize().into()))) - })() - .with_context(|| format!("Failed to pack {:?} to {:?}", source, filename))?; - - fs::rename(filename, paths::archive_filename(&hash))?; - - Ok(Some(hash)) -} - -fn collect_outputs(input_hash: &InputHash, task: &Task) -> Result<HashMap<String, ArchiveHash>> { - let mut ret = HashMap::new(); - - for (name, path) in &task.outputs { - if let Some(hash) = collect_output(input_hash, task, path)? { - ret.insert(name.clone(), hash); - } - } - - Ok(ret) -} - -fn run_task(input_hash: &InputHash, task: &Task, jobserver: &mut Jobserver) -> Result<()> { - let _workdir_mount = init_task(input_hash, task).context("Failed to initialize task")?; - let _rootfs_mounts = - init_task_rootfs(input_hash, task).context("Failed to initialize task rootfs")?; - - let task_tmp_dir = paths::task_tmp_dir(input_hash); - let rootfs = paths::join(&[&task_tmp_dir, paths::TASK_TMP_ROOTFS_SUBDIR]); - - let builddir_source = paths::join(&[&task_tmp_dir, paths::TASK_BUILDDIR]); - let builddir_target = paths::join(&[&rootfs, paths::TASK_BUILDDIR]); - - let log_filename = paths::task_log_filename(input_hash); - - let mut exec_cmd = || -> Result<()> { - let log = fs::create(&log_filename)?; - - let log_stdout = log - .try_clone() - .context("Failed to duplicate log file descriptor")?; - let log_stderr = log - .try_clone() - .context("Failed to duplicate log file descriptor")?; - - mount::mount::<_, _, str, str>( - Some(paths::DEV_DIR), - paths::join(&[&rootfs, "dev"]).as_str(), - None, - MsFlags::MS_BIND | MsFlags::MS_REC, - None, - ) - .expect("Failed to bind mount /dev directory"); - mount::mount::<_, _, str, str>( - Some(builddir_source.as_str()), - builddir_target.as_str(), - None, - MsFlags::MS_BIND | MsFlags::MS_REC, - None, - ) - .expect("Failed to bind mount build directory"); - - ns::pivot_root(&rootfs); - mount::mount::<str, _, str, str>( - None, - "/", - None, - MsFlags::MS_PRIVATE | MsFlags::MS_REC, - None, - ) - .context("Failed to set MS_PRIVATE for container root")?; - ns::container_mounts().context("Failed to set up container mounts")?; - - unistd::sethostname("rebel-builder").context("Failed to set hostname")?; - - prctl::set_no_new_privs().context("set_no_new_privs()")?; - - unshare(CloneFlags::CLONE_NEWUSER | CloneFlags::CLONE_NEWNS) - .context("Failed to create user namespace")?; - ns::setup_userns(BUILD_UID, BUILD_GID, Uid::from_raw(0), Gid::from_raw(0)); - - jobserver - .set_cloexec(false) - .context("Failed to unset O_CLOEXEC on jobserver pipe")?; - - let err = Command::new("sh") - .args(&["-ex", &paths::join(&[paths::TASK_TASKDIR, paths::TASK_RUN])]) - .stdin(Stdio::null()) - .stdout(log_stdout) - .stderr(log_stderr) - .env_clear() - .env("PATH", "/usr/sbin:/usr/bin:/sbin:/bin") - .env("HOME", "/build") - .env("INPUT_HASH", input_hash.to_string()) - .env("MAKEFLAGS", jobserver.to_makeflags()) - .exec(); - eprintln!("{}", err); - process::exit(127); - }; - - let pid = unsafe { - ns::spawn( - CloneFlags::CLONE_NEWNS - | CloneFlags::CLONE_NEWPID - | CloneFlags::CLONE_NEWIPC - | CloneFlags::CLONE_NEWNET - | CloneFlags::CLONE_NEWUTS, - || exec_cmd().unwrap(), - ) - } - .context("Failed to run task container")?; - - let status = wait::waitpid(pid, None)?; - - if let Err(err) = status.check() { - return Err(Error::new(format!( - "Task failed: {}\nOutput: {}", - err, log_filename - ))); - } - - Ok(()) -} - -fn hash_layer(input_hash: &InputHash, task: &Task) -> Result<Option<LayerHash>> { - let task_state_dir = paths::task_state_dir(input_hash); - let task_layer_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_LAYER_SUBDIR]); - - (|| -> Result<Option<LayerHash>> { - if fs::is_dir_empty(&task_layer_dir)? { - return Ok(None); - } - - let hasher = LayerHasher::new(); - let mut buffered_writer = BufWriter::with_capacity(16 * 1024 * 1024, hasher); - - tar::pack(&task.rootfs, &mut buffered_writer, &task_layer_dir)?; - - let hasher = buffered_writer.into_inner()?; - Ok(Some(LayerHash(StringHash(hasher.finalize().into())))) - })() - .with_context(|| format!("Failed to hash layer directory {:?}", task_layer_dir)) -} - -fn move_layer(input_hash: &InputHash, hash: &Option<LayerHash>) -> Result<()> { - let task_state_dir = paths::task_state_dir(input_hash); - let task_layer_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_LAYER_SUBDIR]); - - if let Some(hash) = hash { - let layer_dir = paths::layer_dir(hash); - - let err = match std::fs::rename(&task_layer_dir, &layer_dir) { - Ok(_) => return Ok(()), - Err(err) => err, - }; - - if !matches!( - err.raw_os_error(), - Some(libc::EEXIST) | Some(libc::ENOTEMPTY) - ) { - return Err(err).with_context(|| { - format!("Failed to rename {:?} to {:?}", task_layer_dir, layer_dir) - }); - } - } - - fs::ensure_removed(&task_layer_dir) -} - -fn run_and_hash_task( - input_hash: &InputHash, - task: &Task, - jobserver: &mut Jobserver, -) -> Result<TaskOutput> { - run_task(input_hash, task, jobserver)?; - - let outputs = collect_outputs(input_hash, task)?; - - let layer = hash_layer(input_hash, task)?; - move_layer(input_hash, &layer)?; - - Ok(TaskOutput { - input_hash: Some(*input_hash), - layer, - outputs, - }) -} - -fn load_cached(input_hash: &InputHash) -> Result<TaskOutput> { - let filename = paths::task_cache_filename(input_hash); - let file = fs::open(&filename)?; - - serde_json::from_reader(file) - .with_context(|| format!("Failed to read task cache data from {}", filename)) -} - -fn save_cached(input_hash: &InputHash, output: &TaskOutput) -> Result<()> { - fs::mkdir(&paths::task_state_dir(input_hash))?; - - let tmp_filename = paths::task_cache_tmp_filename(input_hash); - let filename = paths::task_cache_filename(input_hash); - - cjson::to_file(&tmp_filename, output) - .with_context(|| format!("Failed to write task cache data to {}", tmp_filename))?; - - fs::rename(tmp_filename, filename)?; - - Ok(()) -} - -pub fn handle(task: Task, jobserver: &mut Jobserver) -> Result<TaskOutput> { - let input_hash = input_hash(&task); - - let _lock = unix::lock(paths::task_lock_filename(&input_hash), true, true) - .context("Failed to get task lock")?; - - let cached_output = load_cached(&input_hash); - if !task.force_run { - if let Ok(task_output) = cached_output { - return Ok(task_output); - } - } - - let token = jobserver.wait(); - - let start_time = Instant::now(); - println!("Starting task {} ({})", task.label, input_hash); - - let task_ret = run_and_hash_task(&input_hash, &task, jobserver); - let cleanup_ret = cleanup_task(&input_hash); - - jobserver.post(token); - - let task_output = task_ret?; - cleanup_ret.context("Failed to clean up after task")?; - - save_cached(&input_hash, &task_output)?; - - let duration = Instant::now().duration_since(start_time); - println!( - "Finished task {} ({}) in {}", - task.label, - input_hash, - duration.as_secs_f32() - ); - - if let Ok(cached_output) = cached_output { - if cached_output.outputs != task_output.outputs { - println!( - "WARNING: Output hashes for task {} do not match cached result\n Cached output: {}\n New output: {}", - task.label, - cjson::to_string(&cached_output.outputs).unwrap(), - cjson::to_string(&task_output.outputs).unwrap(), - ); - } - } - - Ok(task_output) -} diff --git a/crates/runner/src/util/checkable.rs b/crates/runner/src/util/checkable.rs deleted file mode 100644 index 8528d29..0000000 --- a/crates/runner/src/util/checkable.rs +++ /dev/null @@ -1,37 +0,0 @@ -use std::{ - io::{Error, ErrorKind, Result}, - process::ExitStatus, -}; - -use nix::sys::wait; - -pub trait Checkable { - fn check(&self) -> Result<()>; -} - -impl Checkable for ExitStatus { - fn check(&self) -> Result<()> { - if self.success() { - Ok(()) - } else { - Err(Error::new( - ErrorKind::Other, - format!("Process exited with {}", self), - )) - } - } -} - -impl Checkable for wait::WaitStatus { - fn check(&self) -> Result<()> { - let message = match self { - wait::WaitStatus::Exited(_, 0) => return Ok(()), - wait::WaitStatus::Exited(_, code) => format!("Process exited with exit code: {}", code), - wait::WaitStatus::Signaled(_, signal, _) => { - format!("Process exited with signal: {}", signal) - } - _ => format!("Process in unexpected status: {:?}", self), - }; - Err(Error::new(ErrorKind::Other, message)) - } -} diff --git a/crates/runner/src/util/cjson.rs b/crates/runner/src/util/cjson.rs deleted file mode 100644 index e3840ce..0000000 --- a/crates/runner/src/util/cjson.rs +++ /dev/null @@ -1,37 +0,0 @@ -use std::{ - fs::File, - io::{self, Write}, - path::Path, -}; - -use digest::{self, Digest}; -use olpc_cjson::CanonicalFormatter; -use serde::Serialize; -use serde_json::error::Result; - -pub fn new_serializer<W: Write>(writer: W) -> serde_json::Serializer<W, CanonicalFormatter> { - serde_json::Serializer::with_formatter(writer, CanonicalFormatter::new()) -} - -pub fn to_writer<W: Write, T: ?Sized + Serialize>(writer: W, value: &T) -> Result<()> { - let mut ser = new_serializer(writer); - value.serialize(&mut ser) -} - -pub fn to_file<P: AsRef<Path>, T: ?Sized + Serialize>(path: P, value: &T) -> io::Result<()> { - let file = File::create(path)?; - to_writer(&file, value)?; - file.sync_all() -} - -pub fn to_string<T: ?Sized + Serialize>(value: &T) -> Result<String> { - let mut ret = Vec::new(); - to_writer(&mut ret, value)?; - Ok(String::from_utf8(ret).unwrap()) -} - -pub fn digest<D: Digest + Write, T: ?Sized + Serialize>(value: &T) -> Result<digest::Output<D>> { - let mut digest = <D as Digest>::new(); - to_writer(&mut digest, value)?; - Ok(digest.finalize()) -} diff --git a/crates/runner/src/util/clone.rs b/crates/runner/src/util/clone.rs deleted file mode 100644 index 0af9e4d..0000000 --- a/crates/runner/src/util/clone.rs +++ /dev/null @@ -1,57 +0,0 @@ -use std::{mem, process}; - -use nix::{ - errno, sched, - unistd::{self, Pid}, -}; - -#[repr(C)] -#[derive(Debug, Default)] -struct CloneArgs { - flags: u64, - pidfd: u64, - child_tid: u64, - parent_tid: u64, - exit_signal: u64, - stack: u64, - stack_size: u64, - tls: u64, -} - -pub unsafe fn clone(flags: sched::CloneFlags) -> nix::Result<unistd::ForkResult> { - let mut args = CloneArgs { - flags: flags.bits() as u64, - exit_signal: libc::SIGCHLD as u64, - ..CloneArgs::default() - }; - let size = mem::size_of_val(&args) as libc::size_t; - - let pid = libc::syscall(libc::SYS_clone3, &mut args, size); - if pid < 0 { - Err(errno::Errno::last()) - } else if pid == 0 { - Ok(unistd::ForkResult::Child) - } else { - Ok(unistd::ForkResult::Parent { - child: Pid::from_raw(pid as libc::pid_t), - }) - } -} - -pub unsafe fn spawn<F>(flags: Option<sched::CloneFlags>, f: F) -> nix::Result<Pid> -where - F: FnOnce(), -{ - let res = if let Some(flags) = flags { - clone(flags) - } else { - unistd::fork() - }; - match res? { - unistd::ForkResult::Parent { child } => Ok(child), - unistd::ForkResult::Child => { - f(); - process::exit(0) - } - } -} diff --git a/crates/runner/src/util/fs.rs b/crates/runner/src/util/fs.rs deleted file mode 100644 index 9e16648..0000000 --- a/crates/runner/src/util/fs.rs +++ /dev/null @@ -1,127 +0,0 @@ -use std::{ - fs::{self, File}, - io, - os::unix::prelude::*, - path::{Path, PathBuf}, -}; - -use nix::{ - fcntl::OFlag, - mount::{self, MsFlags}, - unistd, -}; - -use common::error::*; - -pub fn open<P: AsRef<Path>>(path: P) -> Result<fs::File> { - fs::File::open(path.as_ref()) - .with_context(|| format!("Failed to open file {:?} for reading", path.as_ref())) -} - -pub fn create<P: AsRef<Path>>(path: P) -> Result<fs::File> { - fs::File::create(path.as_ref()) - .with_context(|| format!("Failed to open file {:?} for writing", path.as_ref())) -} - -pub fn rename<P1: AsRef<Path>, P2: AsRef<Path>>(from: P1, to: P2) -> Result<()> { - fs::rename(from.as_ref(), to.as_ref()) - .with_context(|| format!("Failed to rename {:?} to {:?}", from.as_ref(), to.as_ref())) -} - -// Unlike fs::copy, this doesn't preserve file flags -pub fn copy<P1: AsRef<Path>, P2: AsRef<Path>>(from: P1, to: P2) -> Result<()> { - (|| -> Result<()> { - let mut src = open(from.as_ref())?; - let mut dest = create(to.as_ref())?; - io::copy(&mut src, &mut dest)?; - dest.sync_all()?; - Ok(()) - })() - .with_context(|| format!("Failed to copy {:?} to {:?}", from.as_ref(), to.as_ref())) -} - -pub fn mkdir<P: AsRef<Path>>(path: P) -> Result<()> { - let mut builder = fs::DirBuilder::new(); - builder.recursive(true); - builder - .create(path.as_ref()) - .with_context(|| format!("Failed to create directory {:?}", path.as_ref())) -} - -pub fn ensure_removed<P: AsRef<Path>>(path: P) -> Result<()> { - let result = if path.as_ref().is_dir() { - fs::remove_dir_all(path.as_ref()) - } else { - fs::remove_file(path.as_ref()) - }; - result - .or_else(|err| match err.kind() { - io::ErrorKind::NotFound => Ok(()), - _ => Err(err), - }) - .with_context(|| format!("Failed to delete {:?}", path.as_ref())) -} - -pub fn is_dir_empty<P: AsRef<Path>>(path: P) -> Result<bool> { - Ok(fs::read_dir(path)?.next().is_none()) -} - -/// Fixes up weirdness of set-group-ID or bsdgroups -pub fn fixup_permissions<P: AsRef<Path>>(path: P) -> Result<()> { - let path = path.as_ref(); - let gid = unistd::getegid(); - - let metadata = path - .metadata() - .with_context(|| format!("Failed to get metadata of {:?}", path))?; - - if metadata.gid() != gid.as_raw() { - unistd::chown(path, None, Some(gid)) - .with_context(|| format!("Failed to set group of {:?}", path))?; - } - - let mut perms = metadata.permissions(); - let mode = perms.mode(); - if (mode & 0o777) != mode { - perms.set_mode(mode & 0o777); - std::fs::set_permissions(path, perms) - .with_context(|| format!("Failed to set mode of {:?}", path))?; - } - - Ok(()) -} - -#[must_use] -pub struct Mount(PathBuf); - -impl Drop for Mount { - fn drop(&mut self) { - mount::umount(&self.0) - .with_context(|| format!("Failed to unmount {:?}", self.0)) - .unwrap(); - } -} - -pub fn mount<P1: AsRef<Path>, P2: AsRef<Path>>( - source: P1, - target: P2, - fstype: Option<&str>, - flags: MsFlags, - data: Option<&str>, -) -> Result<Mount> { - mkdir(target.as_ref()).with_context(|| format!("Failed to create {:?}", target.as_ref()))?; - - let canon_target = target - .as_ref() - .canonicalize() - .with_context(|| format!("Failed to get absolute path for {:?}", target.as_ref()))?; - mount::mount(Some(source.as_ref()), &canon_target, fstype, flags, data) - .with_context(|| format!("Failed to mount {:?}", canon_target))?; - Ok(Mount(canon_target)) -} - -pub fn pipe() -> Result<(File, File)> { - unistd::pipe2(OFlag::O_CLOEXEC) - .context("pipe2()") - .map(|(piper, pipew)| unsafe { (File::from_raw_fd(piper), File::from_raw_fd(pipew)) }) -} diff --git a/crates/runner/src/util/mod.rs b/crates/runner/src/util/mod.rs deleted file mode 100644 index 0fbe3b5..0000000 --- a/crates/runner/src/util/mod.rs +++ /dev/null @@ -1,7 +0,0 @@ -pub mod checkable; -pub mod cjson; -pub mod clone; -pub mod fs; -pub mod stack; -pub mod steal; -pub mod unix; diff --git a/crates/runner/src/util/stack.rs b/crates/runner/src/util/stack.rs deleted file mode 100644 index 15d5daf..0000000 --- a/crates/runner/src/util/stack.rs +++ /dev/null @@ -1,25 +0,0 @@ -use std::mem; - -/// Simple inefficient datastructure with guaranteed drop order -#[derive(Debug)] -pub enum Stack<T> { - Cons(Box<(T, Stack<T>)>), - Empty, -} - -impl<T> Default for Stack<T> { - fn default() -> Self { - Self::Empty - } -} - -impl<T> Stack<T> { - pub fn new() -> Self { - Self::Empty - } - - pub fn push(&mut self, value: T) { - let tmp = mem::take(self); - *self = Stack::Cons(Box::new((value, tmp))); - } -} diff --git a/crates/runner/src/util/steal.rs b/crates/runner/src/util/steal.rs deleted file mode 100644 index 91b2cdf..0000000 --- a/crates/runner/src/util/steal.rs +++ /dev/null @@ -1,40 +0,0 @@ -use std::ops::{Deref, DerefMut}; - -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] -pub struct Steal<T>(pub Option<T>); - -impl<T> Steal<T> { - pub fn new(value: T) -> Steal<T> { - Steal(Some(value)) - } - - pub fn steal(&mut self) -> T { - self.0 - .take() - .expect("Attempted to steal already stoken value") - } -} - -impl<T> From<T> for Steal<T> { - fn from(value: T) -> Self { - Steal::new(value) - } -} - -impl<T> Deref for Steal<T> { - type Target = T; - - fn deref(&self) -> &Self::Target { - self.0 - .as_ref() - .expect("Attempted to dereference stolen value") - } -} - -impl<T> DerefMut for Steal<T> { - fn deref_mut(&mut self) -> &mut Self::Target { - self.0 - .as_mut() - .expect("Attempted to dereference stolen value") - } -} diff --git a/crates/runner/src/util/unix.rs b/crates/runner/src/util/unix.rs deleted file mode 100644 index 710138c..0000000 --- a/crates/runner/src/util/unix.rs +++ /dev/null @@ -1,84 +0,0 @@ -use std::{fs::File, os::unix::prelude::*, path::Path}; - -use nix::{ - fcntl::{self, FcntlArg, FdFlag, OFlag}, - sched, - unistd::Pid, -}; - -use common::error::*; - -use super::fs; - -pub fn set_blocking(fd: RawFd, blocking: bool) -> Result<()> { - let flags = unsafe { - OFlag::from_bits_unchecked(fcntl::fcntl(fd, FcntlArg::F_GETFL).context("fcntl(F_GETFL)")?) - }; - - let new_flags = if blocking { - flags & !OFlag::O_NONBLOCK - } else { - flags | OFlag::O_NONBLOCK - }; - - if new_flags != flags { - fcntl::fcntl(fd, FcntlArg::F_SETFL(new_flags)).context("fcntl(F_SETFL)")?; - } - - Ok(()) -} - -pub fn set_cloexec(fd: RawFd, cloexec: bool) -> Result<()> { - let flags = unsafe { - FdFlag::from_bits_unchecked(fcntl::fcntl(fd, FcntlArg::F_GETFD).context("fcntl(F_GETFD)")?) - }; - - let new_flags = if cloexec { - flags | FdFlag::FD_CLOEXEC - } else { - flags & !FdFlag::FD_CLOEXEC - }; - - if new_flags != flags { - fcntl::fcntl(fd, FcntlArg::F_SETFD(new_flags)).context("fcntl(F_SETFD)")?; - } - - Ok(()) -} - -pub fn nproc() -> Result<usize> { - const MAXCPU: usize = sched::CpuSet::count(); - - let affinity = sched::sched_getaffinity(Pid::from_raw(0)).context("sched_getaffinity()")?; - - let mut count = 0; - - for cpu in 0..MAXCPU { - if affinity.is_set(cpu).unwrap() { - count += 1; - } - } - - Ok(count) -} - -pub fn lock<P: AsRef<Path>>(path: P, exclusive: bool, blocking: bool) -> Result<File> { - use fcntl::FlockArg::*; - - if let Some(parent) = path.as_ref().parent() { - fs::mkdir(parent)?; - } - - let arg = match (exclusive, blocking) { - (true, true) => LockExclusive, - (true, false) => LockExclusiveNonblock, - (false, true) => LockShared, - (false, false) => LockSharedNonblock, - }; - - let file = fs::create(path.as_ref())?; - fcntl::flock(file.as_raw_fd(), arg) - .with_context(|| format!("flock failed on {:?}", path.as_ref()))?; - - Ok(file) -} |