diff options
Diffstat (limited to 'crates/rebel-runner/src/task.rs')
-rw-r--r-- | crates/rebel-runner/src/task.rs | 638 |
1 files changed, 638 insertions, 0 deletions
diff --git a/crates/rebel-runner/src/task.rs b/crates/rebel-runner/src/task.rs new file mode 100644 index 0000000..5bb253a --- /dev/null +++ b/crates/rebel-runner/src/task.rs @@ -0,0 +1,638 @@ +use std::{ + collections::{BTreeMap, HashMap, HashSet}, + io::{self, BufWriter}, + os::unix::prelude::CommandExt, + path::{Path, PathBuf}, + process::{self, Command, Stdio}, + time::Instant, +}; + +use capctl::prctl; +use nix::{ + mount::{self, MsFlags}, + sched::{unshare, CloneFlags}, + sys::{resource, time::TimeVal, wait}, + unistd::{self, Gid, Uid}, +}; +use serde::Serialize; +use tee_readwrite::{TeeReader, TeeWriter}; + +use rebel_common::{error::*, string_hash::*, types::*}; +use walkdir::WalkDir; + +use super::{ + jobserver::Jobserver, + ns, tar, + util::{checkable::Checkable, cjson, fs}, +}; +use crate::{ + paths, + util::{stack::Stack, unix}, +}; + +const BUILD_UID: Uid = Uid::from_raw(1000); +const BUILD_GID: Gid = Gid::from_raw(1000); + +type InputHasher = blake3::Hasher; +type DependencyHasher = blake3::Hasher; +type LayerHasher = blake3::Hasher; +type ArchiveHasher = blake3::Hasher; + +type DependMap = BTreeMap<String, Vec<ArchiveHash>>; + +fn dependency_hash(dep: &Dependency) -> DependencyHash { + DependencyHash(StringHash( + cjson::digest::<DependencyHasher, _>(dep).unwrap().into(), + )) +} + +fn input_hash(task: &Task) -> InputHash { + #[derive(Debug, Serialize)] + struct HashInput<'a> { + pub command: &'a str, + pub workdir: &'a str, + pub rootfs: &'a ArchiveHash, + pub ancestors: &'a [LayerHash], + pub depends: HashMap<DependencyHash, &'a Dependency>, + pub outputs: &'a HashMap<String, String>, + } + let input = HashInput { + command: &task.command, + workdir: &task.workdir, + rootfs: &task.rootfs, + ancestors: &task.ancestors, + depends: task + .depends + .iter() + .map(|dep| (dependency_hash(dep), dep)) + .collect(), + outputs: &task.outputs, + }; + + InputHash(StringHash( + cjson::digest::<InputHasher, _>(&input).unwrap().into(), + )) +} + +fn init_task(input_hash: &InputHash, task: &Task) -> Result<fs::Mount> { + // Remove metadata first to ensure task invalidation + fs::ensure_removed(paths::task_cache_filename(input_hash))?; + + let task_state_dir = paths::task_state_dir(input_hash); + + let task_layer_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_LAYER_SUBDIR]); + fs::ensure_removed(&task_layer_dir)?; + fs::mkdir(&task_layer_dir)?; + fs::fixup_permissions(&task_layer_dir)?; + + let task_tmp_dir = paths::task_tmp_dir(input_hash); + + let taskdir = paths::join(&[&task_tmp_dir, paths::TASK_TASKDIR]); + fs::mkdir(&taskdir)?; + let runfile = paths::join(&[&taskdir, paths::TASK_RUN]); + std::fs::write(&runfile, &task.command) + .with_context(|| format!("Failed to write {}", runfile))?; + + let mount_target = paths::join(&[&task_tmp_dir, &task.workdir]); + let mount = if task.ancestors.is_empty() { + fs::mount(task_layer_dir, &mount_target, None, MsFlags::MS_BIND, None) + .with_context(|| format!("Failed to bind mount to {:?}", mount_target))? + } else { + let task_work_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_WORK_SUBDIR]); + fs::ensure_removed(&task_work_dir)?; + fs::mkdir(&task_work_dir)?; + fs::fixup_permissions(&task_work_dir)?; + + let lower = task + .ancestors + .iter() + .rev() + .map(paths::layer_dir) + .collect::<Vec<_>>() + .join(":"); + let options = format!( + "xino=off,index=off,metacopy=off,lowerdir={lower},upperdir={upper},workdir={work}", + lower = lower, + upper = task_layer_dir, + work = task_work_dir + ); + fs::mount( + "overlay", + &mount_target, + Some("overlay"), + MsFlags::empty(), + Some(&options), + ) + .with_context(|| format!("Failed to mount work overlay to {:?}", mount_target))? + }; + + Ok(mount) +} + +fn get_contents<P1: AsRef<Path>, P2: AsRef<Path>>( + path: P1, + prefix: P2, +) -> Result<(HashSet<PathBuf>, HashSet<PathBuf>)> { + let mut dirs = HashSet::new(); + let mut files = HashSet::new(); + + let root: PathBuf = Path::new("/").join(prefix.as_ref()); + + for result in WalkDir::new(path.as_ref()).min_depth(1).into_iter() { + let entry = result + .with_context(|| format!("Failed to list contents of directory {:?}", path.as_ref()))?; + let is_dir = entry.file_type().is_dir(); + let entry_path = root.join(entry.into_path().strip_prefix(path.as_ref()).unwrap()); + if is_dir { + dirs.insert(entry_path); + } else { + files.insert(entry_path); + } + } + + dirs.insert(root); + + Ok((dirs, files)) +} + +fn check_conflicts( + dirs1: &HashSet<PathBuf>, + files1: &HashSet<PathBuf>, + dirs2: &HashSet<PathBuf>, + files2: &HashSet<PathBuf>, +) -> Result<()> { + let mut conflicts = Vec::new(); + + conflicts.extend(files1.intersection(files2)); + conflicts.extend(dirs1.intersection(files2)); + conflicts.extend(files1.intersection(dirs2)); + + if !conflicts.is_empty() { + let mut conflict_strings: Box<[_]> = conflicts + .into_iter() + .map(|path| path.to_string_lossy().to_string()) + .collect(); + conflict_strings.sort(); + return Err(Error::new(format!( + "Found the following file conflicts in dependencies:\n{}", + conflict_strings.join("\n") + ))); + } + + Ok(()) +} + +fn init_task_rootfs(input_hash: &InputHash, task: &Task) -> Result<Stack<fs::Mount>> { + let task_tmp_dir = paths::task_tmp_dir(input_hash); + let mount_target = paths::join(&[&task_tmp_dir, paths::TASK_TMP_ROOTFS_SUBDIR]); + let rootfs = paths::depend_dir(&task.rootfs); + + let depends = unpack_dependencies(input_hash, task).context("Failed to unpack dependencies")?; + + let mut mounts = Stack::new(); + + mounts.push( + fs::mount(rootfs, &mount_target, None, MsFlags::MS_BIND, None) + .with_context(|| format!("Failed to bind mount rootfs to {:?}", mount_target))?, + ); + mount::mount::<str, str, str, str>( + None, + &mount_target, + None, + MsFlags::MS_REMOUNT | MsFlags::MS_BIND | MsFlags::MS_RDONLY, + None, + ) + .context("Failed to mount container rootfs read-only")?; + + let (mut dirs, mut files) = get_contents(&mount_target, "")?; + + for (path, dep_hashes) in depends { + assert!(!dep_hashes.is_empty()); + + if !path.is_empty() && !path.starts_with('/') { + return Err(Error::new(format!( + "Dependency path {:?} must be absolute", + path + ))); + } + + let dep_target = mount_target.clone() + &path; + let dep_paths: Box<[_]> = dep_hashes.iter().map(paths::depend_dir).collect(); + + for dep in dep_paths.iter() { + let (dep_dirs, dep_files) = get_contents(dep, &path)?; + check_conflicts(&dirs, &files, &dep_dirs, &dep_files)?; + dirs.extend(dep_dirs); + files.extend(dep_files); + } + + let options = format!( + "xino=off,index=off,metacopy=off,lowerdir={lower}:{base}", + lower = dep_paths.join(":"), + base = dep_target, + ); + + mounts.push( + fs::mount( + "overlay", + dep_target.as_str(), + Some("overlay"), + MsFlags::MS_RDONLY, + Some(&options), + ) + .with_context(|| format!("Failed to mount overlay to {:?}", dep_target))?, + ); + } + + Ok(mounts) +} + +fn cleanup_task(input_hash: &InputHash) -> Result<()> { + let task_tmp_dir = paths::task_tmp_dir(input_hash); + + fs::ensure_removed(task_tmp_dir).context("Failed to remove task tmp dir")?; + + let task_state_dir = paths::task_state_dir(input_hash); + let task_work_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_WORK_SUBDIR]); + fs::ensure_removed(task_work_dir).context("Failed to remove overlayfs workdir")?; + + Ok(()) +} + +fn get_archive_filename(task: &Task, hash: &ArchiveHash) -> String { + if let Some(pinned_name) = task.pins.get(hash) { + paths::pinned_archive_filename(pinned_name) + } else { + paths::archive_filename(hash) + } +} + +fn unpack_dependency(task: &Task, hash: &ArchiveHash) -> Result<()> { + let _lock = unix::lock(paths::depend_lock_filename(hash), true, true); + + let filename = get_archive_filename(task, hash); + + let dest = paths::depend_dir(hash); + if Path::new(&dest).is_dir() { + return Ok(()); + } + + (|| -> Result<()> { + let tmp_dest = paths::depend_tmp_dir(hash); + fs::ensure_removed(&tmp_dest)?; + + let file = fs::open(&filename)?; + let hasher = ArchiveHasher::new(); + let buffered_hasher = BufWriter::with_capacity(16 * 1024 * 1024, hasher); + let mut reader = TeeReader::new(file, buffered_hasher, false); + + tar::unpack(&mut reader, &tmp_dest)?; + + // Read file to end to get the correct hash + io::copy(&mut reader, &mut io::sink())?; + + let (_, buffered_hasher) = reader.into_inner(); + let hasher = buffered_hasher.into_inner()?; + + let actual_hash = ArchiveHash(StringHash(hasher.finalize().into())); + + if &actual_hash != hash { + return Err(Error::new(format!( + "Incorrect file hash for {:?} (expected: {}, actual: {})", + filename, hash, actual_hash, + ))); + } + + fs::rename(&tmp_dest, &dest)?; + + Ok(()) + })() + .with_context(|| format!("Failed to unpack {:?}", filename)) +} + +fn unpack_dependencies(input_hash: &InputHash, task: &Task) -> Result<DependMap> { + let task_tmp_dir = paths::task_tmp_dir(input_hash); + + unpack_dependency(task, &task.rootfs)?; + + let mut ret = DependMap::new(); + + for dep in &task.depends { + match dep { + Dependency::Fetch { + name, target_dir, .. + } => { + let path = paths::join(&[&task_tmp_dir, target_dir]); + fs::mkdir(&path)?; + fs::copy( + paths::join(&[paths::DOWNLOADS_DIR, name]), + paths::join(&[&path, name]), + )?; + } + Dependency::Task { output, path } => { + unpack_dependency(task, output)?; + ret.entry(path.clone()).or_default().push(*output); + } + } + } + + Ok(ret) +} + +fn collect_output(input_hash: &InputHash, task: &Task, path: &str) -> Result<Option<ArchiveHash>> { + let source = paths::join(&[&paths::task_tmp_dir(input_hash), path]); + if !Path::new(&source).is_dir() { + return Ok(None); + } + + let filename = paths::archive_tmp_filename(input_hash); + + let hash = (|| -> Result<ArchiveHash> { + let file = fs::create(&filename)?; + let hasher = ArchiveHasher::new(); + let writer = TeeWriter::new(file, hasher); + let mut buffered_writer = BufWriter::with_capacity(16 * 1024 * 1024, writer); + + super::tar::pack(&task.rootfs, &mut buffered_writer, &source)?; + + let writer = buffered_writer.into_inner()?; + let (file, hasher) = writer.into_inner(); + file.sync_all()?; + drop(file); + + Ok(ArchiveHash(StringHash(hasher.finalize().into()))) + })() + .with_context(|| format!("Failed to pack {:?} to {:?}", source, filename))?; + + fs::rename(filename, paths::archive_filename(&hash))?; + + Ok(Some(hash)) +} + +fn collect_outputs(input_hash: &InputHash, task: &Task) -> Result<HashMap<String, ArchiveHash>> { + let mut ret = HashMap::new(); + + for (name, path) in &task.outputs { + if let Some(hash) = collect_output(input_hash, task, path)? { + ret.insert(name.clone(), hash); + } + } + + Ok(ret) +} + +fn run_task(input_hash: &InputHash, task: &Task, jobserver: &mut Jobserver) -> Result<()> { + let _workdir_mount = init_task(input_hash, task).context("Failed to initialize task")?; + let _rootfs_mounts = + init_task_rootfs(input_hash, task).context("Failed to initialize task rootfs")?; + + let task_tmp_dir = paths::task_tmp_dir(input_hash); + let rootfs = paths::join(&[&task_tmp_dir, paths::TASK_TMP_ROOTFS_SUBDIR]); + + let builddir_source = paths::join(&[&task_tmp_dir, paths::TASK_BUILDDIR]); + let builddir_target = paths::join(&[&rootfs, paths::TASK_BUILDDIR]); + + let log_filename = paths::task_log_filename(input_hash); + + let mut exec_cmd = || -> Result<()> { + let log = fs::create(&log_filename)?; + + let log_stdout = log + .try_clone() + .context("Failed to duplicate log file descriptor")?; + let log_stderr = log + .try_clone() + .context("Failed to duplicate log file descriptor")?; + + mount::mount::<_, _, str, str>( + Some(paths::DEV_DIR), + paths::join(&[&rootfs, "dev"]).as_str(), + None, + MsFlags::MS_BIND | MsFlags::MS_REC, + None, + ) + .expect("Failed to bind mount /dev directory"); + mount::mount::<_, _, str, str>( + Some(builddir_source.as_str()), + builddir_target.as_str(), + None, + MsFlags::MS_BIND | MsFlags::MS_REC, + None, + ) + .expect("Failed to bind mount build directory"); + + ns::pivot_root(&rootfs); + mount::mount::<str, _, str, str>( + None, + "/", + None, + MsFlags::MS_PRIVATE | MsFlags::MS_REC, + None, + ) + .context("Failed to set MS_PRIVATE for container root")?; + ns::container_mounts().context("Failed to set up container mounts")?; + + unistd::sethostname("rebel-builder").context("Failed to set hostname")?; + + prctl::set_no_new_privs().context("set_no_new_privs()")?; + + unshare(CloneFlags::CLONE_NEWUSER | CloneFlags::CLONE_NEWNS) + .context("Failed to create user namespace")?; + ns::setup_userns(BUILD_UID, BUILD_GID, Uid::from_raw(0), Gid::from_raw(0)); + + jobserver + .set_cloexec(false) + .context("Failed to unset O_CLOEXEC on jobserver pipe")?; + + let err = Command::new("sh") + .args(["-ex", &paths::join(&[paths::TASK_TASKDIR, paths::TASK_RUN])]) + .stdin(Stdio::null()) + .stdout(log_stdout) + .stderr(log_stderr) + .env_clear() + .env("PATH", "/usr/sbin:/usr/bin:/sbin:/bin") + .env("HOME", "/build") + .env("MAKEFLAGS", jobserver.to_makeflags()) + .exec(); + eprintln!("{}", err); + process::exit(127); + }; + + let pid = unsafe { + ns::spawn( + CloneFlags::CLONE_NEWNS + | CloneFlags::CLONE_NEWPID + | CloneFlags::CLONE_NEWIPC + | CloneFlags::CLONE_NEWNET + | CloneFlags::CLONE_NEWUTS, + || exec_cmd().unwrap(), + ) + } + .context("Failed to run task container")?; + + let status = wait::waitpid(pid, None)?; + + if let Err(err) = status.check() { + return Err(Error::new(format!( + "Task failed: {}\nOutput: {}", + err, log_filename + ))); + } + + Ok(()) +} + +fn hash_layer(input_hash: &InputHash, task: &Task) -> Result<Option<LayerHash>> { + let task_state_dir = paths::task_state_dir(input_hash); + let task_layer_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_LAYER_SUBDIR]); + + (|| -> Result<Option<LayerHash>> { + if fs::is_dir_empty(&task_layer_dir)? { + return Ok(None); + } + + let hasher = LayerHasher::new(); + let mut buffered_writer = BufWriter::with_capacity(16 * 1024 * 1024, hasher); + + tar::pack(&task.rootfs, &mut buffered_writer, &task_layer_dir)?; + + let hasher = buffered_writer.into_inner()?; + Ok(Some(LayerHash(StringHash(hasher.finalize().into())))) + })() + .with_context(|| format!("Failed to hash layer directory {:?}", task_layer_dir)) +} + +fn move_layer(input_hash: &InputHash, hash: &Option<LayerHash>) -> Result<()> { + let task_state_dir = paths::task_state_dir(input_hash); + let task_layer_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_LAYER_SUBDIR]); + + if let Some(hash) = hash { + let layer_dir = paths::layer_dir(hash); + + let err = match std::fs::rename(&task_layer_dir, &layer_dir) { + Ok(_) => return Ok(()), + Err(err) => err, + }; + + if !matches!( + err.raw_os_error(), + Some(libc::EEXIST) | Some(libc::ENOTEMPTY) + ) { + return Err(err).with_context(|| { + format!("Failed to rename {:?} to {:?}", task_layer_dir, layer_dir) + }); + } + } + + fs::ensure_removed(&task_layer_dir) +} + +fn run_and_hash_task( + input_hash: &InputHash, + task: &Task, + jobserver: &mut Jobserver, +) -> Result<TaskOutput> { + run_task(input_hash, task, jobserver)?; + + let outputs = collect_outputs(input_hash, task)?; + + let layer = hash_layer(input_hash, task)?; + move_layer(input_hash, &layer)?; + + Ok(TaskOutput { + input_hash: Some(*input_hash), + layer, + outputs, + }) +} + +fn load_cached(input_hash: &InputHash) -> Result<TaskOutput> { + let filename = paths::task_cache_filename(input_hash); + let file = fs::open(&filename)?; + + serde_json::from_reader(file) + .with_context(|| format!("Failed to read task cache data from {}", filename)) +} + +fn save_cached(input_hash: &InputHash, output: &TaskOutput) -> Result<()> { + fs::mkdir(paths::task_state_dir(input_hash))?; + + let tmp_filename = paths::task_cache_tmp_filename(input_hash); + let filename = paths::task_cache_filename(input_hash); + + cjson::to_file(&tmp_filename, output) + .with_context(|| format!("Failed to write task cache data to {}", tmp_filename))?; + + fs::rename(tmp_filename, filename)?; + + Ok(()) +} + +trait AsSecsF32 { + fn as_secs_f32(&self) -> f32; +} + +impl AsSecsF32 for TimeVal { + fn as_secs_f32(&self) -> f32 { + self.tv_sec() as f32 + 1e-6 * (self.tv_usec() as f32) + } +} + +fn get_usage(total: f32) -> String { + let usage = resource::getrusage(resource::UsageWho::RUSAGE_CHILDREN).expect("getrusage()"); + + let user = usage.user_time().as_secs_f32(); + let system = usage.system_time().as_secs_f32(); + let cpu = (100.0 * (user + system) / total).round(); + + format!("{user:.2}s user {system:.2}s system {cpu:.0}% cpu") +} + +pub fn handle(task: Task, jobserver: &mut Jobserver) -> Result<TaskOutput> { + let input_hash = input_hash(&task); + + let _lock = unix::lock(paths::task_lock_filename(&input_hash), true, true) + .context("Failed to get task lock")?; + + let cached_output = load_cached(&input_hash); + if !task.force_run { + if let Ok(task_output) = cached_output { + return Ok(task_output); + } + } + + let token = jobserver.wait(); + + let start_time = Instant::now(); + println!("Starting task {} ({})", task.label, input_hash); + + let task_ret = run_and_hash_task(&input_hash, &task, jobserver); + let cleanup_ret = cleanup_task(&input_hash); + + jobserver.post(token); + + let task_output = task_ret?; + cleanup_ret.context("Failed to clean up after task")?; + + save_cached(&input_hash, &task_output)?; + + let duration = Instant::now().duration_since(start_time).as_secs_f32(); + let usage = get_usage(duration); + println!( + "Finished task {} ({}) in {:.2}s ({})", + task.label, input_hash, duration, usage, + ); + + if let Ok(cached_output) = cached_output { + if cached_output.outputs != task_output.outputs { + println!( + "WARNING: Output hashes for task {} do not match cached result\n Cached output: {}\n New output: {}", + task.label, + cjson::to_string(&cached_output.outputs).unwrap(), + cjson::to_string(&task_output.outputs).unwrap(), + ); + } + } + + Ok(task_output) +} |