From 34ac18d20c13a78914d447fee83204811a27b1e4 Mon Sep 17 00:00:00 2001 From: Matthias Schiffer Date: Mon, 25 Oct 2021 00:19:45 +0200 Subject: Move runner into separate crate --- crates/runner/src/task.rs | 460 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 460 insertions(+) create mode 100644 crates/runner/src/task.rs (limited to 'crates/runner/src/task.rs') diff --git a/crates/runner/src/task.rs b/crates/runner/src/task.rs new file mode 100644 index 0000000..296c1a0 --- /dev/null +++ b/crates/runner/src/task.rs @@ -0,0 +1,460 @@ +use std::{ + collections::HashMap, + io::BufWriter, + os::unix::prelude::CommandExt, + path::{Path, PathBuf}, + process::{self, Command, Stdio}, + time::Instant, +}; + +use capctl::prctl; +use nix::{ + mount::{self, MsFlags}, + sched::{unshare, CloneFlags}, + sys::wait, + unistd::{self, Gid, Uid}, +}; +use serde::Serialize; +use tee_readwrite::TeeWriter; + +use common::{error::*, string_hash::*, types::*}; + +use super::{ + jobserver::Jobserver, + ns, tar, + util::{cjson, fs, Checkable}, +}; +use crate::paths; + +const BUILD_UID: Uid = Uid::from_raw(1000); +const BUILD_GID: Gid = Gid::from_raw(1000); + +type InputHasher = blake3::Hasher; +type DependencyHasher = blake3::Hasher; +type LayerHasher = blake3::Hasher; +type ArchiveHasher = blake3::Hasher; + +fn dependency_hash(dep: &Dependency) -> DependencyHash { + DependencyHash(StringHash( + cjson::digest::(dep).unwrap().into(), + )) +} + +fn input_hash(task: &Task) -> InputHash { + #[derive(Debug, Serialize)] + struct HashInput<'a> { + pub command: &'a str, + pub inherit: &'a [LayerHash], + pub depends: HashMap, + pub outputs: &'a HashMap, + } + let input = HashInput { + command: &task.command, + inherit: &task.inherit, + depends: task + .depends + .iter() + .map(|dep| (dependency_hash(dep), dep)) + .collect(), + outputs: &task.outputs, + }; + + InputHash(StringHash( + cjson::digest::(&input).unwrap().into(), + )) +} + +fn init_task(input_hash: &InputHash, task: &Task) -> Result { + // Remove metadata first to ensure task invalidation + fs::ensure_removed(&paths::task_cache_filename(input_hash))?; + + let task_state_dir = paths::task_state_dir(input_hash); + fs::ensure_removed(&task_state_dir)?; + fs::mkdir(&task_state_dir)?; + + let task_layer_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_LAYER_SUBDIR]); + fs::mkdir(&task_layer_dir)?; + fs::fixup_permissions(&task_layer_dir)?; + + let task_tmp_dir = paths::task_tmp_dir(input_hash); + + let taskdir = paths::join(&[&task_tmp_dir, paths::TASK_TASKDIR]); + fs::mkdir(&taskdir)?; + let runfile = paths::join(&[&taskdir, paths::TASK_RUN]); + std::fs::write(&runfile, &task.command) + .with_context(|| format!("Failed to write {}", runfile))?; + + let mount_target = paths::join(&[&task_tmp_dir, paths::TASK_WORKDIR]); + let mount = if task.inherit.is_empty() { + fs::mount(task_layer_dir, &mount_target, None, MsFlags::MS_BIND, None) + .with_context(|| format!("Failed to bind mount to {:?}", mount_target))? + } else { + let task_work_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_WORK_SUBDIR]); + fs::mkdir(&task_work_dir)?; + fs::fixup_permissions(&task_work_dir)?; + + let lower = task + .inherit + .iter() + .rev() + .map(paths::layer_dir) + .collect::>() + .join(":"); + let options = format!( + "xino=off,index=off,metacopy=off,lowerdir={lower},upperdir={upper},workdir={work}", + lower = lower, + upper = task_layer_dir, + work = task_work_dir + ); + fs::mount( + "overlay", + &mount_target, + Some("overlay"), + MsFlags::empty(), + Some(&options), + ) + .with_context(|| format!("Failed to mount work overlay to {:?}", mount_target))? + }; + + Ok(mount) +} + +fn init_task_rootfs(input_hash: &InputHash) -> Result { + let task_tmp_dir = paths::task_tmp_dir(input_hash); + let depends_dir = paths::join(&[&task_tmp_dir, paths::TASK_TMP_DEPENDS_SUBDIR]); + let mount_target = paths::join(&[&task_tmp_dir, paths::TASK_TMP_ROOTFS_SUBDIR]); + + let lower = [&depends_dir, paths::ROOTFS_DIR].join(":"); + let options = format!( + "xino=off,index=off,metacopy=off,lowerdir={lower}", + lower = lower, + ); + + let mount = fs::mount( + "overlay", + &mount_target, + Some("overlay"), + MsFlags::empty(), + Some(&options), + ) + .with_context(|| format!("Failed to mount rootfs overlay to {:?}", mount_target))?; + + Ok(mount) +} + +fn cleanup_task(input_hash: &InputHash) -> Result<()> { + let task_tmp_dir = paths::task_tmp_dir(input_hash); + + fs::ensure_removed(&task_tmp_dir).context("Failed to remove task tmp dir")?; + + let task_state_dir = paths::task_state_dir(input_hash); + let task_work_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_WORK_SUBDIR]); + fs::ensure_removed(&task_work_dir).context("Failed to remove overlayfs workdir")?; + + Ok(()) +} + +fn unpack_dependency, P2: AsRef>(filename: P1, dest: P2) -> Result<()> { + let file = fs::open(filename.as_ref())?; + tar::unpack(file, dest.as_ref()).with_context(|| { + format!( + "Failed to unpack {:?} to {:?}", + filename.as_ref(), + dest.as_ref() + ) + }) +} + +fn unpack_dependencies(input_hash: &InputHash, task: &Task) -> Result<()> { + let task_tmp_dir = paths::task_tmp_dir(input_hash); + let downloads_dir = paths::join(&[&task_tmp_dir, paths::TASK_DLDIR]); + let depends_dir = paths::join(&[&task_tmp_dir, paths::TASK_TMP_DEPENDS_SUBDIR]); + + fs::mkdir(&downloads_dir)?; + fs::mkdir(&depends_dir)?; + + for dep in &task.depends { + match dep { + Dependency::Fetch { name, .. } => { + fs::copy( + paths::join(&[paths::DOWNLOADS_DIR, name]), + paths::join(&[&downloads_dir, name]), + )?; + } + Dependency::Task { output, path } => { + unpack_dependency( + paths::archive_filename(output), + paths::join(&[&depends_dir, path]), + )?; + } + } + } + + Ok(()) +} + +fn collect_output(input_hash: &InputHash, path: &str) -> Result> { + let source: PathBuf = [&paths::task_tmp_dir(input_hash), paths::TASK_DESTDIR, path] + .iter() + .collect(); + if !source.is_dir() { + return Ok(None); + } + + let filename = paths::archive_tmp_filename(input_hash); + + let hash = (|| -> Result { + let file = fs::create(&filename)?; + let hasher = ArchiveHasher::new(); + let writer = TeeWriter::new(file, hasher); + let mut buffered_writer = BufWriter::with_capacity(16 * 1024 * 1024, writer); + + super::tar::pack(&mut buffered_writer, &source)?; + + let writer = buffered_writer.into_inner()?; + let (file, hasher) = writer.into_inner(); + file.sync_all()?; + drop(file); + + Ok(ArchiveHash(StringHash(hasher.finalize().into()))) + })() + .with_context(|| format!("Failed to pack {:?} to {:?}", source, filename))?; + + fs::rename(filename, paths::archive_filename(&hash))?; + + Ok(Some(hash)) +} + +fn collect_outputs(input_hash: &InputHash, task: &Task) -> Result> { + let mut ret = HashMap::new(); + + for (name, path) in &task.outputs { + if let Some(hash) = collect_output(input_hash, path)? { + ret.insert(name.clone(), hash); + } + } + + Ok(ret) +} + +fn run_task(input_hash: &InputHash, task: &Task, mut jobserver: Jobserver) -> Result<()> { + let _workdir_mount = init_task(input_hash, task).context("Failed to initialize task")?; + unpack_dependencies(input_hash, task).context("Failed to unpack dependencies")?; + let _rootfs_mount = init_task_rootfs(input_hash).context("Failed to initialize task rootfs")?; + + let task_tmp_dir = paths::task_tmp_dir(input_hash); + let rootfs = paths::join(&[&task_tmp_dir, paths::TASK_TMP_ROOTFS_SUBDIR]); + + let builddir_source = paths::join(&[&task_tmp_dir, paths::TASK_BUILDDIR]); + let builddir_target = paths::join(&[&rootfs, paths::TASK_BUILDDIR]); + + let log_filename = paths::task_log_filename(input_hash); + + let mut exec_cmd = || -> Result<()> { + let log = fs::create(&log_filename)?; + + let log_stdout = log + .try_clone() + .context("Failed to duplicate log file descriptor")?; + let log_stderr = log + .try_clone() + .context("Failed to duplicate log file descriptor")?; + + mount::mount::<_, _, str, str>( + Some(paths::join(&[paths::ROOTFS_DIR, "dev"]).as_str()), + paths::join(&[&rootfs, "dev"]).as_str(), + None, + MsFlags::MS_BIND | MsFlags::MS_REC, + None, + ) + .expect("Failed to bind mount /dev directory"); + mount::mount::<_, _, str, str>( + Some(builddir_source.as_str()), + builddir_target.as_str(), + None, + MsFlags::MS_BIND | MsFlags::MS_REC, + None, + ) + .expect("Failed to bind mount build directory"); + + ns::pivot_root(&rootfs); + mount::mount::( + None, + "/", + None, + MsFlags::MS_PRIVATE | MsFlags::MS_REC, + None, + ) + .context("Failed to set MS_PRIVATE for container root")?; + ns::container_mounts().context("Failed to set up container mounts")?; + + unistd::sethostname("rebel-builder").context("Failed to set hostname")?; + + prctl::set_no_new_privs().context("set_no_new_privs()")?; + + unshare(CloneFlags::CLONE_NEWUSER | CloneFlags::CLONE_NEWNS) + .context("Failed to create user namespace")?; + ns::setup_userns(BUILD_UID, BUILD_GID, Uid::from_raw(0), Gid::from_raw(0)); + + jobserver + .set_cloexec(false) + .context("Failed to unset O_CLOEXEC on jobserver pipe")?; + + let err = Command::new("sh") + .args(&[ + "-ex", + &paths::abs(&paths::join(&[paths::TASK_TASKDIR, paths::TASK_RUN])), + ]) + .stdin(Stdio::null()) + .stdout(log_stdout) + .stderr(log_stderr) + .current_dir(paths::TASK_WORKDIR) + .env_clear() + .env("PATH", "/usr/sbin:/usr/bin:/sbin:/bin") + .env("HOME", "/build") + .env("INPUT_HASH", input_hash.to_string()) + .env("MAKEFLAGS", jobserver.to_makeflags()) + .exec(); + eprintln!("{}", err); + process::exit(127); + }; + + let (pid, ()) = unsafe { + ns::spawn( + CloneFlags::CLONE_NEWNS + | CloneFlags::CLONE_NEWPID + | CloneFlags::CLONE_NEWIPC + | CloneFlags::CLONE_NEWNET + | CloneFlags::CLONE_NEWUTS, + (), + |()| exec_cmd().unwrap(), + ) + } + .context("Failed to run task container")?; + + let status = wait::waitpid(pid, None)?; + + if let Err(err) = status.check() { + return Err(Error::new(format!( + "Task failed: {}\nOutput: {}", + err, log_filename + ))); + } + + Ok(()) +} + +fn hash_layer(input_hash: &InputHash) -> Result> { + let task_state_dir = paths::task_state_dir(input_hash); + let task_layer_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_LAYER_SUBDIR]); + + (|| -> Result> { + if fs::is_dir_empty(&task_layer_dir)? { + return Ok(None); + } + + let hasher = LayerHasher::new(); + let mut buffered_writer = BufWriter::with_capacity(16 * 1024 * 1024, hasher); + + tar::pack(&mut buffered_writer, &task_layer_dir)?; + + let hasher = buffered_writer.into_inner()?; + Ok(Some(LayerHash(StringHash(hasher.finalize().into())))) + })() + .with_context(|| format!("Failed to hash layer directory {:?}", task_layer_dir)) +} + +fn move_layer(input_hash: &InputHash, hash: &Option) -> Result<()> { + let task_state_dir = paths::task_state_dir(input_hash); + let task_layer_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_LAYER_SUBDIR]); + + if let Some(hash) = hash { + let layer_dir = paths::layer_dir(hash); + + let err = match std::fs::rename(&task_layer_dir, &layer_dir) { + Ok(_) => return Ok(()), + Err(err) => err, + }; + + if !matches!( + err.raw_os_error(), + Some(libc::EEXIST) | Some(libc::ENOTEMPTY) + ) { + return Err(err).with_context(|| { + format!("Failed to rename {:?} to {:?}", task_layer_dir, layer_dir) + }); + } + } + + fs::ensure_removed(&task_layer_dir) +} + +fn run_and_hash_task( + input_hash: &InputHash, + task: &Task, + jobserver: Jobserver, +) -> Result { + run_task(input_hash, task, jobserver)?; + + let outputs = collect_outputs(input_hash, task)?; + + let layer = hash_layer(input_hash)?; + move_layer(input_hash, &layer)?; + + Ok(TaskOutput { + input_hash: *input_hash, + layer, + outputs, + }) +} + +fn load_cached(input_hash: &InputHash) -> Result { + let filename = paths::task_cache_filename(input_hash); + let file = fs::open(&filename)?; + + serde_json::from_reader(file) + .with_context(|| format!("Failed to read task cache data from {}", filename)) +} + +fn save_cached(input_hash: &InputHash, output: &TaskOutput) -> Result<()> { + fs::mkdir(&paths::task_state_dir(input_hash))?; + + let tmp_filename = paths::task_cache_tmp_filename(input_hash); + let filename = paths::task_cache_filename(input_hash); + + cjson::to_file(&tmp_filename, output) + .with_context(|| format!("Failed to write task cache data to {}", tmp_filename))?; + + fs::rename(tmp_filename, filename)?; + + Ok(()) +} + +pub fn handle(task: Task, jobserver: Jobserver) -> Result { + let input_hash = input_hash(&task); + + if let Ok(task_output) = load_cached(&input_hash) { + return Ok(task_output); + } + + let start_time = Instant::now(); + println!("Starting task {} ({})", task.label, input_hash); + + let task_ret = run_and_hash_task(&input_hash, &task, jobserver); + let cleanup_ret = cleanup_task(&input_hash); + + let task_output = task_ret?; + cleanup_ret.context("Failed to clean up after task")?; + + save_cached(&input_hash, &task_output)?; + + let duration = Instant::now().duration_since(start_time); + println!( + "Finished task {} ({}) in {}", + task.label, + input_hash, + duration.as_secs_f32() + ); + + Ok(task_output) +} -- cgit v1.2.3