diff options
Diffstat (limited to 'crates/runner/src/task.rs')
-rw-r--r-- | crates/runner/src/task.rs | 620 |
1 files changed, 0 insertions, 620 deletions
diff --git a/crates/runner/src/task.rs b/crates/runner/src/task.rs deleted file mode 100644 index 19a7484..0000000 --- a/crates/runner/src/task.rs +++ /dev/null @@ -1,620 +0,0 @@ -use std::{ - collections::{BTreeMap, HashMap, HashSet}, - io::{self, BufWriter}, - os::unix::prelude::CommandExt, - path::{Path, PathBuf}, - process::{self, Command, Stdio}, - time::Instant, -}; - -use capctl::prctl; -use nix::{ - mount::{self, MsFlags}, - sched::{unshare, CloneFlags}, - sys::wait, - unistd::{self, Gid, Uid}, -}; -use serde::Serialize; -use tee_readwrite::{TeeReader, TeeWriter}; - -use common::{error::*, string_hash::*, types::*}; -use walkdir::WalkDir; - -use super::{ - jobserver::Jobserver, - ns, tar, - util::{checkable::Checkable, cjson, fs}, -}; -use crate::{ - paths, - util::{stack::Stack, unix}, -}; - -const BUILD_UID: Uid = Uid::from_raw(1000); -const BUILD_GID: Gid = Gid::from_raw(1000); - -type InputHasher = blake3::Hasher; -type DependencyHasher = blake3::Hasher; -type LayerHasher = blake3::Hasher; -type ArchiveHasher = blake3::Hasher; - -type DependMap = BTreeMap<String, Vec<ArchiveHash>>; - -fn dependency_hash(dep: &Dependency) -> DependencyHash { - DependencyHash(StringHash( - cjson::digest::<DependencyHasher, _>(dep).unwrap().into(), - )) -} - -fn input_hash(task: &Task) -> InputHash { - #[derive(Debug, Serialize)] - struct HashInput<'a> { - pub command: &'a str, - pub workdir: &'a str, - pub rootfs: &'a ArchiveHash, - pub inherit: &'a [LayerHash], - pub depends: HashMap<DependencyHash, &'a Dependency>, - pub outputs: &'a HashMap<String, String>, - } - let input = HashInput { - command: &task.command, - workdir: &task.workdir, - rootfs: &task.rootfs, - inherit: &task.inherit, - depends: task - .depends - .iter() - .map(|dep| (dependency_hash(dep), dep)) - .collect(), - outputs: &task.outputs, - }; - - InputHash(StringHash( - cjson::digest::<InputHasher, _>(&input).unwrap().into(), - )) -} - -fn init_task(input_hash: &InputHash, task: &Task) -> Result<fs::Mount> { - // Remove metadata first to ensure task invalidation - fs::ensure_removed(&paths::task_cache_filename(input_hash))?; - - let task_state_dir = paths::task_state_dir(input_hash); - - let task_layer_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_LAYER_SUBDIR]); - fs::ensure_removed(&task_layer_dir)?; - fs::mkdir(&task_layer_dir)?; - fs::fixup_permissions(&task_layer_dir)?; - - let task_tmp_dir = paths::task_tmp_dir(input_hash); - - let taskdir = paths::join(&[&task_tmp_dir, paths::TASK_TASKDIR]); - fs::mkdir(&taskdir)?; - let runfile = paths::join(&[&taskdir, paths::TASK_RUN]); - std::fs::write(&runfile, &task.command) - .with_context(|| format!("Failed to write {}", runfile))?; - - let mount_target = paths::join(&[&task_tmp_dir, &task.workdir]); - let mount = if task.inherit.is_empty() { - fs::mount(task_layer_dir, &mount_target, None, MsFlags::MS_BIND, None) - .with_context(|| format!("Failed to bind mount to {:?}", mount_target))? - } else { - let task_work_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_WORK_SUBDIR]); - fs::ensure_removed(&task_work_dir)?; - fs::mkdir(&task_work_dir)?; - fs::fixup_permissions(&task_work_dir)?; - - let lower = task - .inherit - .iter() - .rev() - .map(paths::layer_dir) - .collect::<Vec<_>>() - .join(":"); - let options = format!( - "xino=off,index=off,metacopy=off,lowerdir={lower},upperdir={upper},workdir={work}", - lower = lower, - upper = task_layer_dir, - work = task_work_dir - ); - fs::mount( - "overlay", - &mount_target, - Some("overlay"), - MsFlags::empty(), - Some(&options), - ) - .with_context(|| format!("Failed to mount work overlay to {:?}", mount_target))? - }; - - Ok(mount) -} - -fn get_contents<P1: AsRef<Path>, P2: AsRef<Path>>( - path: P1, - prefix: P2, -) -> Result<(HashSet<PathBuf>, HashSet<PathBuf>)> { - let mut dirs = HashSet::new(); - let mut files = HashSet::new(); - - let root: PathBuf = Path::new("/").join(prefix.as_ref()); - - for result in WalkDir::new(path.as_ref()).min_depth(1).into_iter() { - let entry = result - .with_context(|| format!("Failed to list contents of directory {:?}", path.as_ref()))?; - let is_dir = entry.file_type().is_dir(); - let entry_path = root.join(entry.into_path().strip_prefix(path.as_ref()).unwrap()); - if is_dir { - dirs.insert(entry_path); - } else { - files.insert(entry_path); - } - } - - dirs.insert(root); - - Ok((dirs, files)) -} - -fn check_conflicts( - dirs1: &HashSet<PathBuf>, - files1: &HashSet<PathBuf>, - dirs2: &HashSet<PathBuf>, - files2: &HashSet<PathBuf>, -) -> Result<()> { - let mut conflicts = Vec::new(); - - conflicts.extend(files1.intersection(files2)); - conflicts.extend(dirs1.intersection(files2)); - conflicts.extend(files1.intersection(dirs2)); - - if !conflicts.is_empty() { - let mut conflict_strings: Box<[_]> = conflicts - .into_iter() - .map(|path| path.to_string_lossy().to_string()) - .collect(); - conflict_strings.sort(); - return Err(Error::new(format!( - "Found the following file conflicts in dependencies:\n{}", - conflict_strings.join("\n") - ))); - } - - Ok(()) -} - -fn init_task_rootfs(input_hash: &InputHash, task: &Task) -> Result<Stack<fs::Mount>> { - let task_tmp_dir = paths::task_tmp_dir(input_hash); - let mount_target = paths::join(&[&task_tmp_dir, paths::TASK_TMP_ROOTFS_SUBDIR]); - let rootfs = paths::depend_dir(&task.rootfs); - - let depends = unpack_dependencies(input_hash, task).context("Failed to unpack dependencies")?; - - let mut mounts = Stack::new(); - - mounts.push( - fs::mount(rootfs, &mount_target, None, MsFlags::MS_BIND, None) - .with_context(|| format!("Failed to bind mount rootfs to {:?}", mount_target))?, - ); - mount::mount::<str, str, str, str>( - None, - &mount_target, - None, - MsFlags::MS_REMOUNT | MsFlags::MS_BIND | MsFlags::MS_RDONLY, - None, - ) - .context("Failed to mount container rootfs read-only")?; - - let (mut dirs, mut files) = get_contents(&mount_target, "")?; - - for (path, dep_hashes) in depends { - assert!(!dep_hashes.is_empty()); - - if !path.is_empty() && !path.starts_with('/') { - return Err(Error::new(format!( - "Dependency path {:?} must be absolute", - path - ))); - } - - let dep_target = mount_target.clone() + &path; - let dep_paths: Box<[_]> = dep_hashes.iter().map(paths::depend_dir).collect(); - - for dep in dep_paths.iter() { - let (dep_dirs, dep_files) = get_contents(dep, &path)?; - check_conflicts(&dirs, &files, &dep_dirs, &dep_files)?; - dirs.extend(dep_dirs); - files.extend(dep_files); - } - - let options = format!( - "xino=off,index=off,metacopy=off,lowerdir={lower}:{base}", - lower = dep_paths.join(":"), - base = dep_target, - ); - - mounts.push( - fs::mount( - "overlay", - dep_target.as_str(), - Some("overlay"), - MsFlags::MS_RDONLY, - Some(&options), - ) - .with_context(|| format!("Failed to mount overlay to {:?}", dep_target))?, - ); - } - - Ok(mounts) -} - -fn cleanup_task(input_hash: &InputHash) -> Result<()> { - let task_tmp_dir = paths::task_tmp_dir(input_hash); - - fs::ensure_removed(&task_tmp_dir).context("Failed to remove task tmp dir")?; - - let task_state_dir = paths::task_state_dir(input_hash); - let task_work_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_WORK_SUBDIR]); - fs::ensure_removed(&task_work_dir).context("Failed to remove overlayfs workdir")?; - - Ok(()) -} - -fn get_archive_filename(task: &Task, hash: &ArchiveHash) -> String { - if let Some(pinned_name) = task.pins.get(hash) { - paths::pinned_archive_filename(pinned_name) - } else { - paths::archive_filename(hash) - } -} - -fn unpack_dependency(task: &Task, hash: &ArchiveHash) -> Result<()> { - let _lock = unix::lock(paths::depend_lock_filename(hash), true, true); - - let filename = get_archive_filename(task, hash); - - let dest = paths::depend_dir(hash); - if Path::new(&dest).is_dir() { - return Ok(()); - } - - (|| -> Result<()> { - let tmp_dest = paths::depend_tmp_dir(hash); - fs::ensure_removed(&tmp_dest)?; - - let file = fs::open(&filename)?; - let hasher = ArchiveHasher::new(); - let buffered_hasher = BufWriter::with_capacity(16 * 1024 * 1024, hasher); - let mut reader = TeeReader::new(file, buffered_hasher, false); - - tar::unpack(&mut reader, &tmp_dest)?; - - // Read file to end to get the correct hash - io::copy(&mut reader, &mut io::sink())?; - - let (_, buffered_hasher) = reader.into_inner(); - let hasher = buffered_hasher.into_inner()?; - - let actual_hash = ArchiveHash(StringHash(hasher.finalize().into())); - - if &actual_hash != hash { - return Err(Error::new(format!( - "Incorrect file hash for {:?} (expected: {}, actual: {})", - filename, hash, actual_hash, - ))); - } - - fs::rename(&tmp_dest, &dest)?; - - Ok(()) - })() - .with_context(|| format!("Failed to unpack {:?}", filename)) -} - -fn unpack_dependencies(input_hash: &InputHash, task: &Task) -> Result<DependMap> { - let task_tmp_dir = paths::task_tmp_dir(input_hash); - - unpack_dependency(task, &task.rootfs)?; - - let mut ret = DependMap::new(); - - for dep in &task.depends { - match dep { - Dependency::Fetch { - name, target_dir, .. - } => { - let path = paths::join(&[&task_tmp_dir, target_dir]); - fs::mkdir(&path)?; - fs::copy( - paths::join(&[paths::DOWNLOADS_DIR, name]), - paths::join(&[&path, name]), - )?; - } - Dependency::Task { output, path } => { - unpack_dependency(task, output)?; - ret.entry(path.clone()).or_default().push(*output); - } - } - } - - Ok(ret) -} - -fn collect_output(input_hash: &InputHash, task: &Task, path: &str) -> Result<Option<ArchiveHash>> { - let source = paths::join(&[&paths::task_tmp_dir(input_hash), path]); - if !Path::new(&source).is_dir() { - return Ok(None); - } - - let filename = paths::archive_tmp_filename(input_hash); - - let hash = (|| -> Result<ArchiveHash> { - let file = fs::create(&filename)?; - let hasher = ArchiveHasher::new(); - let writer = TeeWriter::new(file, hasher); - let mut buffered_writer = BufWriter::with_capacity(16 * 1024 * 1024, writer); - - super::tar::pack(&task.rootfs, &mut buffered_writer, &source)?; - - let writer = buffered_writer.into_inner()?; - let (file, hasher) = writer.into_inner(); - file.sync_all()?; - drop(file); - - Ok(ArchiveHash(StringHash(hasher.finalize().into()))) - })() - .with_context(|| format!("Failed to pack {:?} to {:?}", source, filename))?; - - fs::rename(filename, paths::archive_filename(&hash))?; - - Ok(Some(hash)) -} - -fn collect_outputs(input_hash: &InputHash, task: &Task) -> Result<HashMap<String, ArchiveHash>> { - let mut ret = HashMap::new(); - - for (name, path) in &task.outputs { - if let Some(hash) = collect_output(input_hash, task, path)? { - ret.insert(name.clone(), hash); - } - } - - Ok(ret) -} - -fn run_task(input_hash: &InputHash, task: &Task, jobserver: &mut Jobserver) -> Result<()> { - let _workdir_mount = init_task(input_hash, task).context("Failed to initialize task")?; - let _rootfs_mounts = - init_task_rootfs(input_hash, task).context("Failed to initialize task rootfs")?; - - let task_tmp_dir = paths::task_tmp_dir(input_hash); - let rootfs = paths::join(&[&task_tmp_dir, paths::TASK_TMP_ROOTFS_SUBDIR]); - - let builddir_source = paths::join(&[&task_tmp_dir, paths::TASK_BUILDDIR]); - let builddir_target = paths::join(&[&rootfs, paths::TASK_BUILDDIR]); - - let log_filename = paths::task_log_filename(input_hash); - - let mut exec_cmd = || -> Result<()> { - let log = fs::create(&log_filename)?; - - let log_stdout = log - .try_clone() - .context("Failed to duplicate log file descriptor")?; - let log_stderr = log - .try_clone() - .context("Failed to duplicate log file descriptor")?; - - mount::mount::<_, _, str, str>( - Some(paths::DEV_DIR), - paths::join(&[&rootfs, "dev"]).as_str(), - None, - MsFlags::MS_BIND | MsFlags::MS_REC, - None, - ) - .expect("Failed to bind mount /dev directory"); - mount::mount::<_, _, str, str>( - Some(builddir_source.as_str()), - builddir_target.as_str(), - None, - MsFlags::MS_BIND | MsFlags::MS_REC, - None, - ) - .expect("Failed to bind mount build directory"); - - ns::pivot_root(&rootfs); - mount::mount::<str, _, str, str>( - None, - "/", - None, - MsFlags::MS_PRIVATE | MsFlags::MS_REC, - None, - ) - .context("Failed to set MS_PRIVATE for container root")?; - ns::container_mounts().context("Failed to set up container mounts")?; - - unistd::sethostname("rebel-builder").context("Failed to set hostname")?; - - prctl::set_no_new_privs().context("set_no_new_privs()")?; - - unshare(CloneFlags::CLONE_NEWUSER | CloneFlags::CLONE_NEWNS) - .context("Failed to create user namespace")?; - ns::setup_userns(BUILD_UID, BUILD_GID, Uid::from_raw(0), Gid::from_raw(0)); - - jobserver - .set_cloexec(false) - .context("Failed to unset O_CLOEXEC on jobserver pipe")?; - - let err = Command::new("sh") - .args(&["-ex", &paths::join(&[paths::TASK_TASKDIR, paths::TASK_RUN])]) - .stdin(Stdio::null()) - .stdout(log_stdout) - .stderr(log_stderr) - .env_clear() - .env("PATH", "/usr/sbin:/usr/bin:/sbin:/bin") - .env("HOME", "/build") - .env("INPUT_HASH", input_hash.to_string()) - .env("MAKEFLAGS", jobserver.to_makeflags()) - .exec(); - eprintln!("{}", err); - process::exit(127); - }; - - let pid = unsafe { - ns::spawn( - CloneFlags::CLONE_NEWNS - | CloneFlags::CLONE_NEWPID - | CloneFlags::CLONE_NEWIPC - | CloneFlags::CLONE_NEWNET - | CloneFlags::CLONE_NEWUTS, - || exec_cmd().unwrap(), - ) - } - .context("Failed to run task container")?; - - let status = wait::waitpid(pid, None)?; - - if let Err(err) = status.check() { - return Err(Error::new(format!( - "Task failed: {}\nOutput: {}", - err, log_filename - ))); - } - - Ok(()) -} - -fn hash_layer(input_hash: &InputHash, task: &Task) -> Result<Option<LayerHash>> { - let task_state_dir = paths::task_state_dir(input_hash); - let task_layer_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_LAYER_SUBDIR]); - - (|| -> Result<Option<LayerHash>> { - if fs::is_dir_empty(&task_layer_dir)? { - return Ok(None); - } - - let hasher = LayerHasher::new(); - let mut buffered_writer = BufWriter::with_capacity(16 * 1024 * 1024, hasher); - - tar::pack(&task.rootfs, &mut buffered_writer, &task_layer_dir)?; - - let hasher = buffered_writer.into_inner()?; - Ok(Some(LayerHash(StringHash(hasher.finalize().into())))) - })() - .with_context(|| format!("Failed to hash layer directory {:?}", task_layer_dir)) -} - -fn move_layer(input_hash: &InputHash, hash: &Option<LayerHash>) -> Result<()> { - let task_state_dir = paths::task_state_dir(input_hash); - let task_layer_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_LAYER_SUBDIR]); - - if let Some(hash) = hash { - let layer_dir = paths::layer_dir(hash); - - let err = match std::fs::rename(&task_layer_dir, &layer_dir) { - Ok(_) => return Ok(()), - Err(err) => err, - }; - - if !matches!( - err.raw_os_error(), - Some(libc::EEXIST) | Some(libc::ENOTEMPTY) - ) { - return Err(err).with_context(|| { - format!("Failed to rename {:?} to {:?}", task_layer_dir, layer_dir) - }); - } - } - - fs::ensure_removed(&task_layer_dir) -} - -fn run_and_hash_task( - input_hash: &InputHash, - task: &Task, - jobserver: &mut Jobserver, -) -> Result<TaskOutput> { - run_task(input_hash, task, jobserver)?; - - let outputs = collect_outputs(input_hash, task)?; - - let layer = hash_layer(input_hash, task)?; - move_layer(input_hash, &layer)?; - - Ok(TaskOutput { - input_hash: Some(*input_hash), - layer, - outputs, - }) -} - -fn load_cached(input_hash: &InputHash) -> Result<TaskOutput> { - let filename = paths::task_cache_filename(input_hash); - let file = fs::open(&filename)?; - - serde_json::from_reader(file) - .with_context(|| format!("Failed to read task cache data from {}", filename)) -} - -fn save_cached(input_hash: &InputHash, output: &TaskOutput) -> Result<()> { - fs::mkdir(&paths::task_state_dir(input_hash))?; - - let tmp_filename = paths::task_cache_tmp_filename(input_hash); - let filename = paths::task_cache_filename(input_hash); - - cjson::to_file(&tmp_filename, output) - .with_context(|| format!("Failed to write task cache data to {}", tmp_filename))?; - - fs::rename(tmp_filename, filename)?; - - Ok(()) -} - -pub fn handle(task: Task, jobserver: &mut Jobserver) -> Result<TaskOutput> { - let input_hash = input_hash(&task); - - let _lock = unix::lock(paths::task_lock_filename(&input_hash), true, true) - .context("Failed to get task lock")?; - - let cached_output = load_cached(&input_hash); - if !task.force_run { - if let Ok(task_output) = cached_output { - return Ok(task_output); - } - } - - let token = jobserver.wait(); - - let start_time = Instant::now(); - println!("Starting task {} ({})", task.label, input_hash); - - let task_ret = run_and_hash_task(&input_hash, &task, jobserver); - let cleanup_ret = cleanup_task(&input_hash); - - jobserver.post(token); - - let task_output = task_ret?; - cleanup_ret.context("Failed to clean up after task")?; - - save_cached(&input_hash, &task_output)?; - - let duration = Instant::now().duration_since(start_time); - println!( - "Finished task {} ({}) in {}", - task.label, - input_hash, - duration.as_secs_f32() - ); - - if let Ok(cached_output) = cached_output { - if cached_output.outputs != task_output.outputs { - println!( - "WARNING: Output hashes for task {} do not match cached result\n Cached output: {}\n New output: {}", - task.label, - cjson::to_string(&cached_output.outputs).unwrap(), - cjson::to_string(&task_output.outputs).unwrap(), - ); - } - } - - Ok(task_output) -} |