use std::{ collections::HashMap, io::BufWriter, path::{Path, PathBuf}, process, time::Instant, }; use indoc::formatdoc; use nix::mount::MsFlags; use tee_readwrite::TeeWriter; use crate::{ paths, runner, types::*, util::{cjson, error::*, fs}, }; use super::{spec, tar}; pub const BUILD_UID: u32 = 1000; pub const BUILD_GID: u32 = 1000; fn input_hash(task: &runner::TaskInput) -> InputHash { InputHash(StringHash( cjson::digest::(task).unwrap().into(), )) } fn init_task(input_hash: &InputHash, task: &runner::Task) -> Result { // Remove metadata first to ensure task invalidation fs::ensure_removed(&paths::task_cache_filename(input_hash))?; let task_state_dir = paths::task_state_dir(input_hash); fs::ensure_removed(&task_state_dir)?; fs::mkdir(&task_state_dir)?; let task_layer_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_LAYER_SUBDIR]); fs::mkdir(&task_layer_dir)?; fs::fixup_permissions(&task_layer_dir)?; let task_tmp_dir = paths::task_tmp_dir(input_hash); let mount_target = paths::join(&[&task_tmp_dir, paths::TASK_WORKDIR]); let mount = if task.input.inherit.is_empty() { fs::mount(task_layer_dir, &mount_target, None, MsFlags::MS_BIND, None) .with_context(|| format!("Failed to bind mount to {:?}", mount_target))? } else { let task_work_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_WORK_SUBDIR]); fs::mkdir(&task_work_dir)?; fs::fixup_permissions(&task_work_dir)?; let lower = task .input .inherit .iter() .rev() .map(paths::layer_dir) .collect::>() .join(":"); let options = format!( "xino=off,index=off,metacopy=off,lowerdir={lower},upperdir={upper},workdir={work}", lower = lower, upper = task_layer_dir, work = task_work_dir ); fs::mount( "overlay", &mount_target, Some("overlay"), MsFlags::empty(), Some(&options), ) .with_context(|| format!("Failed to mount work overlay to {:?}", mount_target))? }; Ok(mount) } fn init_task_rootfs(input_hash: &InputHash) -> Result { let task_tmp_dir = paths::task_tmp_dir(input_hash); let depends_dir = paths::join(&[&task_tmp_dir, paths::TASK_TMP_DEPENDS_SUBDIR]); let mount_target = paths::join(&[&task_tmp_dir, paths::TASK_TMP_ROOTFS_SUBDIR]); let lower = [&depends_dir, paths::ROOTFS_DIR].join(":"); let options = format!( "xino=off,index=off,metacopy=off,lowerdir={lower}", lower = lower, ); let mount = fs::mount( "overlay", &mount_target, Some("overlay"), MsFlags::empty(), Some(&options), ) .with_context(|| format!("Failed to mount rootfs overlay to {:?}", mount_target))?; Ok(mount) } fn cleanup_task(input_hash: &InputHash) -> Result<()> { let task_tmp_dir = paths::task_tmp_dir(input_hash); fs::ensure_removed(&task_tmp_dir).context("Failed to remove task tmp dir")?; let task_state_dir = paths::task_state_dir(input_hash); let task_work_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_WORK_SUBDIR]); fs::ensure_removed(&task_work_dir).context("Failed to remove overlayfs workdir")?; Ok(()) } fn unpack_dependency, P2: AsRef>(filename: P1, dest: P2) -> Result<()> { let file = fs::open(filename.as_ref())?; tar::unpack(file, dest.as_ref()).with_context(|| { format!( "Failed to unpack {:?} to {:?}", filename.as_ref(), dest.as_ref() ) }) } fn unpack_dependencies(input_hash: &InputHash, task: &runner::Task) -> Result<()> { let task_tmp_dir = paths::task_tmp_dir(input_hash); let downloads_dir = paths::join(&[&task_tmp_dir, paths::TASK_DLDIR]); let depends_dir = paths::join(&[&task_tmp_dir, paths::TASK_TMP_DEPENDS_SUBDIR]); fs::mkdir(&downloads_dir)?; fs::mkdir(&depends_dir)?; for dep in task.input.depends.values() { match dep { Dependency::Fetch { name, .. } => { fs::copy( paths::join(&[paths::DOWNLOADS_DIR, name]), paths::join(&[&downloads_dir, name]), )?; } Dependency::Task { output, path } => { unpack_dependency( paths::archive_filename(output), paths::join(&[&depends_dir, path]), )?; } } } Ok(()) } fn collect_output(input_hash: &InputHash, path: &str) -> Result> { let source: PathBuf = [&paths::task_tmp_dir(input_hash), paths::TASK_DESTDIR, path] .iter() .collect(); if !source.is_dir() { return Ok(None); } let filename = paths::archive_tmp_filename(input_hash); let hash = (|| -> Result { let file = fs::create(&filename)?; let hasher = ArchiveHasher::new(); let writer = TeeWriter::new(file, hasher); let mut buffered_writer = BufWriter::with_capacity(16 * 1024 * 1024, writer); super::tar::pack(input_hash, &mut buffered_writer, &source)?; let writer = buffered_writer.into_inner()?; let (file, hasher) = writer.into_inner(); file.sync_all()?; drop(file); Ok(ArchiveHash(StringHash(hasher.finalize().into()))) })() .with_context(|| format!("Failed to pack {:?} to {:?}", source, filename))?; fs::rename(filename, paths::archive_filename(&hash))?; Ok(Some(hash)) } fn collect_outputs( input_hash: &InputHash, task: &runner::Task, ) -> Result> { let mut ret = HashMap::new(); for (name, path) in &task.input.outputs { if let Some(hash) = collect_output(input_hash, path)? { ret.insert(name.clone(), hash); } } Ok(ret) } fn run_task(input_hash: &InputHash, task: &runner::Task) -> Result<()> { let _workdir_mount = init_task(input_hash, task).context("Failed to initialize task")?; unpack_dependencies(input_hash, task).context("Failed to unpack dependencies")?; let _rootfs_mount = init_task_rootfs(input_hash).context("Failed to initialize task rootfs")?; let task_tmp_dir = paths::task_tmp_dir(input_hash); let command = formatdoc! {" INPUT_HASH={input_hash} {command}", input_hash = input_hash, command = task.input.command, }; spec::generate_spec(&[ "unshare", "--user", &format!("--map-user={}", BUILD_UID), &format!("--map-group={}", BUILD_GID), "sh", "-exc", &command, ]) .save(paths::join(&[&task_tmp_dir, "config.json"])) .map_err(Error::new) .context("Failed to save runtime config")?; let log_filename = paths::task_log_filename(input_hash); let log = fs::create(&log_filename)?; let log_stdout = log .try_clone() .context("Failed to duplicate log file descriptor")?; let log_stderr = log .try_clone() .context("Failed to duplicate log file descriptor")?; let status = process::Command::new("crun") .arg("--root") .arg(paths::TASK_TMP_CONTAINERS_ROOT_SUBDIR) .arg("run") .arg(input_hash.to_string()) .current_dir(task_tmp_dir) .stdin(process::Stdio::null()) .stdout(log_stdout) .stderr(log_stderr) .status() .context("Failed to start container runtime")?; log.sync_all().context("Failed to write log output")?; if !status.success() { return Err(Error::new(format!( "Task failed: {}\nOutput: {}", status, log_filename ))); } Ok(()) } fn hash_layer(input_hash: &InputHash) -> Result> { let task_state_dir = paths::task_state_dir(input_hash); let task_layer_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_LAYER_SUBDIR]); (|| -> Result> { if fs::is_dir_empty(&task_layer_dir)? { return Ok(None); } let hasher = LayerHasher::new(); let mut buffered_writer = BufWriter::with_capacity(16 * 1024 * 1024, hasher); tar::pack(input_hash, &mut buffered_writer, &task_layer_dir)?; let hasher = buffered_writer.into_inner()?; Ok(Some(LayerHash(StringHash(hasher.finalize().into())))) })() .with_context(|| format!("Failed to hash layer directory {:?}", task_layer_dir)) } fn move_layer(input_hash: &InputHash, hash: &Option) -> Result<()> { let task_state_dir = paths::task_state_dir(input_hash); let task_layer_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_LAYER_SUBDIR]); if let Some(hash) = hash { let layer_dir = paths::layer_dir(hash); let err = match std::fs::rename(&task_layer_dir, &layer_dir) { Ok(_) => return Ok(()), Err(err) => err, }; if !matches!( err.raw_os_error(), Some(libc::EEXIST) | Some(libc::ENOTEMPTY) ) { return Err(err).with_context(|| { format!("Failed to rename {:?} to {:?}", task_layer_dir, layer_dir) }); } } fs::ensure_removed(&task_layer_dir) } fn run_and_hash_task(input_hash: &InputHash, task: &runner::Task) -> Result { run_task(input_hash, task)?; let outputs = collect_outputs(input_hash, task)?; let layer = hash_layer(input_hash)?; move_layer(input_hash, &layer)?; Ok(runner::TaskOutput { input_hash: *input_hash, layer, outputs, }) } fn load_cached(input_hash: &InputHash) -> Result { let filename = paths::task_cache_filename(input_hash); let file = fs::open(&filename)?; serde_json::from_reader(file) .with_context(|| format!("Failed to read task cache data from {}", filename)) } fn save_cached(input_hash: &InputHash, output: &runner::TaskOutput) -> Result<()> { fs::mkdir(&paths::task_state_dir(input_hash))?; let tmp_filename = paths::task_cache_tmp_filename(input_hash); let filename = paths::task_cache_filename(input_hash); cjson::to_file(&tmp_filename, output) .with_context(|| format!("Failed to write task cache data to {}", tmp_filename))?; fs::rename(tmp_filename, filename)?; Ok(()) } pub fn handle(task: runner::Task) -> Result { let input_hash = input_hash(&task.input); if let Ok(task_output) = load_cached(&input_hash) { return Ok(task_output); } let start_time = Instant::now(); println!("Starting task {} ({})", task.label, input_hash); let task_ret = run_and_hash_task(&input_hash, &task); let cleanup_ret = cleanup_task(&input_hash); let task_output = task_ret?; cleanup_ret.context("Failed to clean up after task")?; save_cached(&input_hash, &task_output)?; let duration = Instant::now().duration_since(start_time); println!( "Finished task {} ({}) in {}", task.label, input_hash, duration.as_secs_f32() ); Ok(task_output) }