use std::{ collections::HashMap, io::BufWriter, os::unix::prelude::CommandExt, path::Path, process::{self, Command, Stdio}, time::Instant, }; use capctl::prctl; use nix::{ mount::{self, MsFlags}, sched::{unshare, CloneFlags}, sys::wait, unistd::{self, Gid, Uid}, }; use serde::Serialize; use tee_readwrite::TeeWriter; use common::{error::*, string_hash::*, types::*}; use super::{ jobserver::Jobserver, ns, tar, util::{checkable::Checkable, cjson, fs}, }; use crate::{paths, util::unix}; const BUILD_UID: Uid = Uid::from_raw(1000); const BUILD_GID: Gid = Gid::from_raw(1000); type InputHasher = blake3::Hasher; type DependencyHasher = blake3::Hasher; type LayerHasher = blake3::Hasher; type ArchiveHasher = blake3::Hasher; fn dependency_hash(dep: &Dependency) -> DependencyHash { DependencyHash(StringHash( cjson::digest::(dep).unwrap().into(), )) } fn input_hash(task: &Task) -> InputHash { #[derive(Debug, Serialize)] struct HashInput<'a> { pub command: &'a str, pub inherit: &'a [LayerHash], pub depends: HashMap, pub outputs: &'a HashMap, } let input = HashInput { command: &task.command, inherit: &task.inherit, depends: task .depends .iter() .map(|dep| (dependency_hash(dep), dep)) .collect(), outputs: &task.outputs, }; InputHash(StringHash( cjson::digest::(&input).unwrap().into(), )) } fn init_task(input_hash: &InputHash, task: &Task) -> Result { // Remove metadata first to ensure task invalidation fs::ensure_removed(&paths::task_cache_filename(input_hash))?; let task_state_dir = paths::task_state_dir(input_hash); let task_layer_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_LAYER_SUBDIR]); fs::ensure_removed(&task_layer_dir)?; fs::mkdir(&task_layer_dir)?; fs::fixup_permissions(&task_layer_dir)?; let task_tmp_dir = paths::task_tmp_dir(input_hash); let taskdir = paths::join(&[&task_tmp_dir, paths::TASK_TASKDIR]); fs::mkdir(&taskdir)?; let runfile = paths::join(&[&taskdir, paths::TASK_RUN]); std::fs::write(&runfile, &task.command) .with_context(|| format!("Failed to write {}", runfile))?; let mount_target = paths::join(&[&task_tmp_dir, paths::TASK_WORKDIR]); let mount = if task.inherit.is_empty() { fs::mount(task_layer_dir, &mount_target, None, MsFlags::MS_BIND, None) .with_context(|| format!("Failed to bind mount to {:?}", mount_target))? } else { let task_work_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_WORK_SUBDIR]); fs::ensure_removed(&task_work_dir)?; fs::mkdir(&task_work_dir)?; fs::fixup_permissions(&task_work_dir)?; let lower = task .inherit .iter() .rev() .map(paths::layer_dir) .collect::>() .join(":"); let options = format!( "xino=off,index=off,metacopy=off,lowerdir={lower},upperdir={upper},workdir={work}", lower = lower, upper = task_layer_dir, work = task_work_dir ); fs::mount( "overlay", &mount_target, Some("overlay"), MsFlags::empty(), Some(&options), ) .with_context(|| format!("Failed to mount work overlay to {:?}", mount_target))? }; Ok(mount) } fn init_task_rootfs(input_hash: &InputHash) -> Result { let task_tmp_dir = paths::task_tmp_dir(input_hash); let depends_dir = paths::join(&[&task_tmp_dir, paths::TASK_TMP_DEPENDS_SUBDIR]); let mount_target = paths::join(&[&task_tmp_dir, paths::TASK_TMP_ROOTFS_SUBDIR]); let lower = [&depends_dir, paths::ROOTFS_DIR].join(":"); let options = format!( "xino=off,index=off,metacopy=off,lowerdir={lower}", lower = lower, ); let mount = fs::mount( "overlay", &mount_target, Some("overlay"), MsFlags::empty(), Some(&options), ) .with_context(|| format!("Failed to mount rootfs overlay to {:?}", mount_target))?; Ok(mount) } fn cleanup_task(input_hash: &InputHash) -> Result<()> { let task_tmp_dir = paths::task_tmp_dir(input_hash); fs::ensure_removed(&task_tmp_dir).context("Failed to remove task tmp dir")?; let task_state_dir = paths::task_state_dir(input_hash); let task_work_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_WORK_SUBDIR]); fs::ensure_removed(&task_work_dir).context("Failed to remove overlayfs workdir")?; Ok(()) } fn unpack_dependency, P2: AsRef>(filename: P1, dest: P2) -> Result<()> { let file = fs::open(filename.as_ref())?; tar::unpack(file, dest.as_ref()).with_context(|| { format!( "Failed to unpack {:?} to {:?}", filename.as_ref(), dest.as_ref() ) }) } fn unpack_dependencies(input_hash: &InputHash, task: &Task) -> Result<()> { let task_tmp_dir = paths::task_tmp_dir(input_hash); let downloads_dir = paths::join(&[&task_tmp_dir, paths::TASK_DLDIR]); let depends_dir = paths::join(&[&task_tmp_dir, paths::TASK_TMP_DEPENDS_SUBDIR]); fs::mkdir(&downloads_dir)?; fs::mkdir(&depends_dir)?; for dep in &task.depends { match dep { Dependency::Fetch { name, .. } => { fs::copy( paths::join(&[paths::DOWNLOADS_DIR, name]), paths::join(&[&downloads_dir, name]), )?; } Dependency::Task { output, path } => { unpack_dependency( paths::archive_filename(output), paths::join(&[&depends_dir, path]), )?; } } } Ok(()) } fn collect_output(input_hash: &InputHash, path: &str) -> Result> { let source = paths::join(&[&paths::task_tmp_dir(input_hash), path]); if !Path::new(&source).is_dir() { return Ok(None); } let filename = paths::archive_tmp_filename(input_hash); let hash = (|| -> Result { let file = fs::create(&filename)?; let hasher = ArchiveHasher::new(); let writer = TeeWriter::new(file, hasher); let mut buffered_writer = BufWriter::with_capacity(16 * 1024 * 1024, writer); super::tar::pack(&mut buffered_writer, &source)?; let writer = buffered_writer.into_inner()?; let (file, hasher) = writer.into_inner(); file.sync_all()?; drop(file); Ok(ArchiveHash(StringHash(hasher.finalize().into()))) })() .with_context(|| format!("Failed to pack {:?} to {:?}", source, filename))?; fs::rename(filename, paths::archive_filename(&hash))?; Ok(Some(hash)) } fn collect_outputs(input_hash: &InputHash, task: &Task) -> Result> { let mut ret = HashMap::new(); for (name, path) in &task.outputs { if let Some(hash) = collect_output(input_hash, path)? { ret.insert(name.clone(), hash); } } Ok(ret) } fn run_task(input_hash: &InputHash, task: &Task, jobserver: &mut Jobserver) -> Result<()> { let _workdir_mount = init_task(input_hash, task).context("Failed to initialize task")?; unpack_dependencies(input_hash, task).context("Failed to unpack dependencies")?; let _rootfs_mount = init_task_rootfs(input_hash).context("Failed to initialize task rootfs")?; let task_tmp_dir = paths::task_tmp_dir(input_hash); let rootfs = paths::join(&[&task_tmp_dir, paths::TASK_TMP_ROOTFS_SUBDIR]); let builddir_source = paths::join(&[&task_tmp_dir, paths::TASK_BUILDDIR]); let builddir_target = paths::join(&[&rootfs, paths::TASK_BUILDDIR]); let log_filename = paths::task_log_filename(input_hash); let mut exec_cmd = || -> Result<()> { let log = fs::create(&log_filename)?; let log_stdout = log .try_clone() .context("Failed to duplicate log file descriptor")?; let log_stderr = log .try_clone() .context("Failed to duplicate log file descriptor")?; mount::mount::<_, _, str, str>( Some(paths::join(&[paths::ROOTFS_DIR, "dev"]).as_str()), paths::join(&[&rootfs, "dev"]).as_str(), None, MsFlags::MS_BIND | MsFlags::MS_REC, None, ) .expect("Failed to bind mount /dev directory"); mount::mount::<_, _, str, str>( Some(builddir_source.as_str()), builddir_target.as_str(), None, MsFlags::MS_BIND | MsFlags::MS_REC, None, ) .expect("Failed to bind mount build directory"); ns::pivot_root(&rootfs); mount::mount::( None, "/", None, MsFlags::MS_PRIVATE | MsFlags::MS_REC, None, ) .context("Failed to set MS_PRIVATE for container root")?; ns::container_mounts().context("Failed to set up container mounts")?; unistd::sethostname("rebel-builder").context("Failed to set hostname")?; prctl::set_no_new_privs().context("set_no_new_privs()")?; unshare(CloneFlags::CLONE_NEWUSER | CloneFlags::CLONE_NEWNS) .context("Failed to create user namespace")?; ns::setup_userns(BUILD_UID, BUILD_GID, Uid::from_raw(0), Gid::from_raw(0)); jobserver .set_cloexec(false) .context("Failed to unset O_CLOEXEC on jobserver pipe")?; let err = Command::new("sh") .args(&[ "-ex", &paths::abs(&paths::join(&[paths::TASK_TASKDIR, paths::TASK_RUN])), ]) .stdin(Stdio::null()) .stdout(log_stdout) .stderr(log_stderr) .current_dir(paths::TASK_WORKDIR) .env_clear() .env("PATH", "/usr/sbin:/usr/bin:/sbin:/bin") .env("HOME", "/build") .env("INPUT_HASH", input_hash.to_string()) .env("MAKEFLAGS", jobserver.to_makeflags()) .exec(); eprintln!("{}", err); process::exit(127); }; let pid = unsafe { ns::spawn( CloneFlags::CLONE_NEWNS | CloneFlags::CLONE_NEWPID | CloneFlags::CLONE_NEWIPC | CloneFlags::CLONE_NEWNET | CloneFlags::CLONE_NEWUTS, || exec_cmd().unwrap(), ) } .context("Failed to run task container")?; let status = wait::waitpid(pid, None)?; if let Err(err) = status.check() { return Err(Error::new(format!( "Task failed: {}\nOutput: {}", err, log_filename ))); } Ok(()) } fn hash_layer(input_hash: &InputHash) -> Result> { let task_state_dir = paths::task_state_dir(input_hash); let task_layer_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_LAYER_SUBDIR]); (|| -> Result> { if fs::is_dir_empty(&task_layer_dir)? { return Ok(None); } let hasher = LayerHasher::new(); let mut buffered_writer = BufWriter::with_capacity(16 * 1024 * 1024, hasher); tar::pack(&mut buffered_writer, &task_layer_dir)?; let hasher = buffered_writer.into_inner()?; Ok(Some(LayerHash(StringHash(hasher.finalize().into())))) })() .with_context(|| format!("Failed to hash layer directory {:?}", task_layer_dir)) } fn move_layer(input_hash: &InputHash, hash: &Option) -> Result<()> { let task_state_dir = paths::task_state_dir(input_hash); let task_layer_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_LAYER_SUBDIR]); if let Some(hash) = hash { let layer_dir = paths::layer_dir(hash); let err = match std::fs::rename(&task_layer_dir, &layer_dir) { Ok(_) => return Ok(()), Err(err) => err, }; if !matches!( err.raw_os_error(), Some(libc::EEXIST) | Some(libc::ENOTEMPTY) ) { return Err(err).with_context(|| { format!("Failed to rename {:?} to {:?}", task_layer_dir, layer_dir) }); } } fs::ensure_removed(&task_layer_dir) } fn run_and_hash_task( input_hash: &InputHash, task: &Task, jobserver: &mut Jobserver, ) -> Result { run_task(input_hash, task, jobserver)?; let outputs = collect_outputs(input_hash, task)?; let layer = hash_layer(input_hash)?; move_layer(input_hash, &layer)?; Ok(TaskOutput { input_hash: *input_hash, layer, outputs, }) } fn load_cached(input_hash: &InputHash) -> Result { let filename = paths::task_cache_filename(input_hash); let file = fs::open(&filename)?; serde_json::from_reader(file) .with_context(|| format!("Failed to read task cache data from {}", filename)) } fn save_cached(input_hash: &InputHash, output: &TaskOutput) -> Result<()> { fs::mkdir(&paths::task_state_dir(input_hash))?; let tmp_filename = paths::task_cache_tmp_filename(input_hash); let filename = paths::task_cache_filename(input_hash); cjson::to_file(&tmp_filename, output) .with_context(|| format!("Failed to write task cache data to {}", tmp_filename))?; fs::rename(tmp_filename, filename)?; Ok(()) } pub fn handle(task: Task, jobserver: &mut Jobserver) -> Result { let input_hash = input_hash(&task); let _lock = unix::lock(paths::task_lock_filename(&input_hash), true, true) .context("Failed to get task lock")?; if let Ok(task_output) = load_cached(&input_hash) { return Ok(task_output); } let token = jobserver.wait(); let start_time = Instant::now(); println!("Starting task {} ({})", task.label, input_hash); let task_ret = run_and_hash_task(&input_hash, &task, jobserver); let cleanup_ret = cleanup_task(&input_hash); jobserver.post(token); let task_output = task_ret?; cleanup_ret.context("Failed to clean up after task")?; save_cached(&input_hash, &task_output)?; let duration = Instant::now().duration_since(start_time); println!( "Finished task {} ({}) in {}", task.label, input_hash, duration.as_secs_f32() ); Ok(task_output) }