use std::{ collections::{BTreeMap, HashMap, HashSet}, io::{self, BufWriter}, os::unix::prelude::CommandExt, path::{Path, PathBuf}, process::{self, Command, Stdio}, time::Instant, }; use capctl::prctl; use nix::{ mount::{self, MsFlags}, sched::{unshare, CloneFlags}, sys::wait, unistd::{self, Gid, Uid}, }; use serde::Serialize; use tee_readwrite::{TeeReader, TeeWriter}; use common::{error::*, string_hash::*, types::*}; use walkdir::WalkDir; use super::{ jobserver::Jobserver, ns, tar, util::{checkable::Checkable, cjson, fs}, }; use crate::{ paths, util::{stack::Stack, unix}, }; const BUILD_UID: Uid = Uid::from_raw(1000); const BUILD_GID: Gid = Gid::from_raw(1000); type InputHasher = blake3::Hasher; type DependencyHasher = blake3::Hasher; type LayerHasher = blake3::Hasher; type ArchiveHasher = blake3::Hasher; type DependMap = BTreeMap>; fn dependency_hash(dep: &Dependency) -> DependencyHash { DependencyHash(StringHash( cjson::digest::(dep).unwrap().into(), )) } fn input_hash(task: &Task) -> InputHash { #[derive(Debug, Serialize)] struct HashInput<'a> { pub command: &'a str, pub workdir: &'a str, pub rootfs: &'a ArchiveHash, pub inherit: &'a [LayerHash], pub depends: HashMap, pub outputs: &'a HashMap, } let input = HashInput { command: &task.command, workdir: &task.workdir, rootfs: &task.rootfs, inherit: &task.inherit, depends: task .depends .iter() .map(|dep| (dependency_hash(dep), dep)) .collect(), outputs: &task.outputs, }; InputHash(StringHash( cjson::digest::(&input).unwrap().into(), )) } fn init_task(input_hash: &InputHash, task: &Task) -> Result { // Remove metadata first to ensure task invalidation fs::ensure_removed(&paths::task_cache_filename(input_hash))?; let task_state_dir = paths::task_state_dir(input_hash); let task_layer_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_LAYER_SUBDIR]); fs::ensure_removed(&task_layer_dir)?; fs::mkdir(&task_layer_dir)?; fs::fixup_permissions(&task_layer_dir)?; let task_tmp_dir = paths::task_tmp_dir(input_hash); let taskdir = paths::join(&[&task_tmp_dir, paths::TASK_TASKDIR]); fs::mkdir(&taskdir)?; let runfile = paths::join(&[&taskdir, paths::TASK_RUN]); std::fs::write(&runfile, &task.command) .with_context(|| format!("Failed to write {}", runfile))?; let mount_target = paths::join(&[&task_tmp_dir, &task.workdir]); let mount = if task.inherit.is_empty() { fs::mount(task_layer_dir, &mount_target, None, MsFlags::MS_BIND, None) .with_context(|| format!("Failed to bind mount to {:?}", mount_target))? } else { let task_work_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_WORK_SUBDIR]); fs::ensure_removed(&task_work_dir)?; fs::mkdir(&task_work_dir)?; fs::fixup_permissions(&task_work_dir)?; let lower = task .inherit .iter() .rev() .map(paths::layer_dir) .collect::>() .join(":"); let options = format!( "xino=off,index=off,metacopy=off,lowerdir={lower},upperdir={upper},workdir={work}", lower = lower, upper = task_layer_dir, work = task_work_dir ); fs::mount( "overlay", &mount_target, Some("overlay"), MsFlags::empty(), Some(&options), ) .with_context(|| format!("Failed to mount work overlay to {:?}", mount_target))? }; Ok(mount) } fn get_contents, P2: AsRef>( path: P1, prefix: P2, ) -> Result<(HashSet, HashSet)> { let mut dirs = HashSet::new(); let mut files = HashSet::new(); let root: PathBuf = Path::new("/").join(prefix.as_ref()); for result in WalkDir::new(path.as_ref()).min_depth(1).into_iter() { let entry = result .with_context(|| format!("Failed to list contents of directory {:?}", path.as_ref()))?; let is_dir = entry.file_type().is_dir(); let entry_path = root.join(entry.into_path().strip_prefix(path.as_ref()).unwrap()); if is_dir { dirs.insert(entry_path); } else { files.insert(entry_path); } } dirs.insert(root); Ok((dirs, files)) } fn check_conflicts( dirs1: &HashSet, files1: &HashSet, dirs2: &HashSet, files2: &HashSet, ) -> Result<()> { let mut conflicts = Vec::new(); conflicts.extend(files1.intersection(files2)); conflicts.extend(dirs1.intersection(files2)); conflicts.extend(files1.intersection(dirs2)); if !conflicts.is_empty() { let mut conflict_strings: Box<[_]> = conflicts .into_iter() .map(|path| path.to_string_lossy().to_string()) .collect(); conflict_strings.sort(); return Err(Error::new(format!( "Found the following file conflicts in dependencies:\n{}", conflict_strings.join("\n") ))); } Ok(()) } fn init_task_rootfs(input_hash: &InputHash, task: &Task) -> Result> { let task_tmp_dir = paths::task_tmp_dir(input_hash); let mount_target = paths::join(&[&task_tmp_dir, paths::TASK_TMP_ROOTFS_SUBDIR]); let rootfs = paths::depend_dir(&task.rootfs); let depends = unpack_dependencies(input_hash, task).context("Failed to unpack dependencies")?; let mut mounts = Stack::new(); mounts.push( fs::mount(rootfs, &mount_target, None, MsFlags::MS_BIND, None) .with_context(|| format!("Failed to bind mount rootfs to {:?}", mount_target))?, ); mount::mount::( None, &mount_target, None, MsFlags::MS_REMOUNT | MsFlags::MS_BIND | MsFlags::MS_RDONLY, None, ) .context("Failed to mount container rootfs read-only")?; let (mut dirs, mut files) = get_contents(&mount_target, "")?; for (path, dep_hashes) in depends { assert!(!dep_hashes.is_empty()); if !path.is_empty() && !path.starts_with('/') { return Err(Error::new(format!( "Dependency path {:?} must be absolute", path ))); } let dep_target = mount_target.clone() + &path; let dep_paths: Box<[_]> = dep_hashes.iter().map(paths::depend_dir).collect(); for dep in dep_paths.iter() { let (dep_dirs, dep_files) = get_contents(dep, &path)?; check_conflicts(&dirs, &files, &dep_dirs, &dep_files)?; dirs.extend(dep_dirs); files.extend(dep_files); } let options = format!( "xino=off,index=off,metacopy=off,lowerdir={lower}:{base}", lower = dep_paths.join(":"), base = dep_target, ); mounts.push( fs::mount( "overlay", dep_target.as_str(), Some("overlay"), MsFlags::MS_RDONLY, Some(&options), ) .with_context(|| format!("Failed to mount overlay to {:?}", dep_target))?, ); } Ok(mounts) } fn cleanup_task(input_hash: &InputHash) -> Result<()> { let task_tmp_dir = paths::task_tmp_dir(input_hash); fs::ensure_removed(&task_tmp_dir).context("Failed to remove task tmp dir")?; let task_state_dir = paths::task_state_dir(input_hash); let task_work_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_WORK_SUBDIR]); fs::ensure_removed(&task_work_dir).context("Failed to remove overlayfs workdir")?; Ok(()) } fn get_archive_filename(task: &Task, hash: &ArchiveHash) -> String { if let Some(pinned_name) = task.pins.get(hash) { paths::pinned_archive_filename(pinned_name) } else { paths::archive_filename(hash) } } fn unpack_dependency(task: &Task, hash: &ArchiveHash) -> Result<()> { let _lock = unix::lock(paths::depend_lock_filename(hash), true, true); let filename = get_archive_filename(task, hash); let dest = paths::depend_dir(hash); if Path::new(&dest).is_dir() { return Ok(()); } (|| -> Result<()> { let tmp_dest = paths::depend_tmp_dir(hash); fs::ensure_removed(&tmp_dest)?; let file = fs::open(&filename)?; let hasher = ArchiveHasher::new(); let buffered_hasher = BufWriter::with_capacity(16 * 1024 * 1024, hasher); let mut reader = TeeReader::new(file, buffered_hasher, false); tar::unpack(&mut reader, &tmp_dest)?; // Read file to end to get the correct hash io::copy(&mut reader, &mut io::sink())?; let (_, buffered_hasher) = reader.into_inner(); let hasher = buffered_hasher.into_inner()?; let actual_hash = ArchiveHash(StringHash(hasher.finalize().into())); if &actual_hash != hash { return Err(Error::new(format!( "Incorrect file hash for {:?} (expected: {}, actual: {})", filename, hash, actual_hash, ))); } fs::rename(&tmp_dest, &dest)?; Ok(()) })() .with_context(|| format!("Failed to unpack {:?}", filename)) } fn unpack_dependencies(input_hash: &InputHash, task: &Task) -> Result { let task_tmp_dir = paths::task_tmp_dir(input_hash); unpack_dependency(task, &task.rootfs)?; let mut ret = DependMap::new(); for dep in &task.depends { match dep { Dependency::Fetch { name, target_dir, .. } => { let path = paths::join(&[&task_tmp_dir, target_dir]); fs::mkdir(&path)?; fs::copy( paths::join(&[paths::DOWNLOADS_DIR, name]), paths::join(&[&path, name]), )?; } Dependency::Task { output, path } => { unpack_dependency(task, output)?; ret.entry(path.clone()).or_default().push(*output); } } } Ok(ret) } fn collect_output(input_hash: &InputHash, task: &Task, path: &str) -> Result> { let source = paths::join(&[&paths::task_tmp_dir(input_hash), path]); if !Path::new(&source).is_dir() { return Ok(None); } let filename = paths::archive_tmp_filename(input_hash); let hash = (|| -> Result { let file = fs::create(&filename)?; let hasher = ArchiveHasher::new(); let writer = TeeWriter::new(file, hasher); let mut buffered_writer = BufWriter::with_capacity(16 * 1024 * 1024, writer); super::tar::pack(&task.rootfs, &mut buffered_writer, &source)?; let writer = buffered_writer.into_inner()?; let (file, hasher) = writer.into_inner(); file.sync_all()?; drop(file); Ok(ArchiveHash(StringHash(hasher.finalize().into()))) })() .with_context(|| format!("Failed to pack {:?} to {:?}", source, filename))?; fs::rename(filename, paths::archive_filename(&hash))?; Ok(Some(hash)) } fn collect_outputs(input_hash: &InputHash, task: &Task) -> Result> { let mut ret = HashMap::new(); for (name, path) in &task.outputs { if let Some(hash) = collect_output(input_hash, task, path)? { ret.insert(name.clone(), hash); } } Ok(ret) } fn run_task(input_hash: &InputHash, task: &Task, jobserver: &mut Jobserver) -> Result<()> { let _workdir_mount = init_task(input_hash, task).context("Failed to initialize task")?; let _rootfs_mounts = init_task_rootfs(input_hash, task).context("Failed to initialize task rootfs")?; let task_tmp_dir = paths::task_tmp_dir(input_hash); let rootfs = paths::join(&[&task_tmp_dir, paths::TASK_TMP_ROOTFS_SUBDIR]); let builddir_source = paths::join(&[&task_tmp_dir, paths::TASK_BUILDDIR]); let builddir_target = paths::join(&[&rootfs, paths::TASK_BUILDDIR]); let log_filename = paths::task_log_filename(input_hash); let mut exec_cmd = || -> Result<()> { let log = fs::create(&log_filename)?; let log_stdout = log .try_clone() .context("Failed to duplicate log file descriptor")?; let log_stderr = log .try_clone() .context("Failed to duplicate log file descriptor")?; mount::mount::<_, _, str, str>( Some(paths::DEV_DIR), paths::join(&[&rootfs, "dev"]).as_str(), None, MsFlags::MS_BIND | MsFlags::MS_REC, None, ) .expect("Failed to bind mount /dev directory"); mount::mount::<_, _, str, str>( Some(builddir_source.as_str()), builddir_target.as_str(), None, MsFlags::MS_BIND | MsFlags::MS_REC, None, ) .expect("Failed to bind mount build directory"); ns::pivot_root(&rootfs); mount::mount::( None, "/", None, MsFlags::MS_PRIVATE | MsFlags::MS_REC, None, ) .context("Failed to set MS_PRIVATE for container root")?; ns::container_mounts().context("Failed to set up container mounts")?; unistd::sethostname("rebel-builder").context("Failed to set hostname")?; prctl::set_no_new_privs().context("set_no_new_privs()")?; unshare(CloneFlags::CLONE_NEWUSER | CloneFlags::CLONE_NEWNS) .context("Failed to create user namespace")?; ns::setup_userns(BUILD_UID, BUILD_GID, Uid::from_raw(0), Gid::from_raw(0)); jobserver .set_cloexec(false) .context("Failed to unset O_CLOEXEC on jobserver pipe")?; let err = Command::new("sh") .args(&["-ex", &paths::join(&[paths::TASK_TASKDIR, paths::TASK_RUN])]) .stdin(Stdio::null()) .stdout(log_stdout) .stderr(log_stderr) .env_clear() .env("PATH", "/usr/sbin:/usr/bin:/sbin:/bin") .env("HOME", "/build") .env("INPUT_HASH", input_hash.to_string()) .env("MAKEFLAGS", jobserver.to_makeflags()) .exec(); eprintln!("{}", err); process::exit(127); }; let pid = unsafe { ns::spawn( CloneFlags::CLONE_NEWNS | CloneFlags::CLONE_NEWPID | CloneFlags::CLONE_NEWIPC | CloneFlags::CLONE_NEWNET | CloneFlags::CLONE_NEWUTS, || exec_cmd().unwrap(), ) } .context("Failed to run task container")?; let status = wait::waitpid(pid, None)?; if let Err(err) = status.check() { return Err(Error::new(format!( "Task failed: {}\nOutput: {}", err, log_filename ))); } Ok(()) } fn hash_layer(input_hash: &InputHash, task: &Task) -> Result> { let task_state_dir = paths::task_state_dir(input_hash); let task_layer_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_LAYER_SUBDIR]); (|| -> Result> { if fs::is_dir_empty(&task_layer_dir)? { return Ok(None); } let hasher = LayerHasher::new(); let mut buffered_writer = BufWriter::with_capacity(16 * 1024 * 1024, hasher); tar::pack(&task.rootfs, &mut buffered_writer, &task_layer_dir)?; let hasher = buffered_writer.into_inner()?; Ok(Some(LayerHash(StringHash(hasher.finalize().into())))) })() .with_context(|| format!("Failed to hash layer directory {:?}", task_layer_dir)) } fn move_layer(input_hash: &InputHash, hash: &Option) -> Result<()> { let task_state_dir = paths::task_state_dir(input_hash); let task_layer_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_LAYER_SUBDIR]); if let Some(hash) = hash { let layer_dir = paths::layer_dir(hash); let err = match std::fs::rename(&task_layer_dir, &layer_dir) { Ok(_) => return Ok(()), Err(err) => err, }; if !matches!( err.raw_os_error(), Some(libc::EEXIST) | Some(libc::ENOTEMPTY) ) { return Err(err).with_context(|| { format!("Failed to rename {:?} to {:?}", task_layer_dir, layer_dir) }); } } fs::ensure_removed(&task_layer_dir) } fn run_and_hash_task( input_hash: &InputHash, task: &Task, jobserver: &mut Jobserver, ) -> Result { run_task(input_hash, task, jobserver)?; let outputs = collect_outputs(input_hash, task)?; let layer = hash_layer(input_hash, task)?; move_layer(input_hash, &layer)?; Ok(TaskOutput { input_hash: Some(*input_hash), layer, outputs, }) } fn load_cached(input_hash: &InputHash) -> Result { let filename = paths::task_cache_filename(input_hash); let file = fs::open(&filename)?; serde_json::from_reader(file) .with_context(|| format!("Failed to read task cache data from {}", filename)) } fn save_cached(input_hash: &InputHash, output: &TaskOutput) -> Result<()> { fs::mkdir(&paths::task_state_dir(input_hash))?; let tmp_filename = paths::task_cache_tmp_filename(input_hash); let filename = paths::task_cache_filename(input_hash); cjson::to_file(&tmp_filename, output) .with_context(|| format!("Failed to write task cache data to {}", tmp_filename))?; fs::rename(tmp_filename, filename)?; Ok(()) } pub fn handle(task: Task, jobserver: &mut Jobserver) -> Result { let input_hash = input_hash(&task); let _lock = unix::lock(paths::task_lock_filename(&input_hash), true, true) .context("Failed to get task lock")?; if let Ok(task_output) = load_cached(&input_hash) { return Ok(task_output); } let token = jobserver.wait(); let start_time = Instant::now(); println!("Starting task {} ({})", task.label, input_hash); let task_ret = run_and_hash_task(&input_hash, &task, jobserver); let cleanup_ret = cleanup_task(&input_hash); jobserver.post(token); let task_output = task_ret?; cleanup_ret.context("Failed to clean up after task")?; save_cached(&input_hash, &task_output)?; let duration = Instant::now().duration_since(start_time); println!( "Finished task {} ({}) in {}", task.label, input_hash, duration.as_secs_f32() ); Ok(task_output) }