summaryrefslogtreecommitdiffstats
path: root/crates/runner/src/task.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/runner/src/task.rs')
-rw-r--r--crates/runner/src/task.rs620
1 files changed, 0 insertions, 620 deletions
diff --git a/crates/runner/src/task.rs b/crates/runner/src/task.rs
deleted file mode 100644
index 19a7484..0000000
--- a/crates/runner/src/task.rs
+++ /dev/null
@@ -1,620 +0,0 @@
-use std::{
- collections::{BTreeMap, HashMap, HashSet},
- io::{self, BufWriter},
- os::unix::prelude::CommandExt,
- path::{Path, PathBuf},
- process::{self, Command, Stdio},
- time::Instant,
-};
-
-use capctl::prctl;
-use nix::{
- mount::{self, MsFlags},
- sched::{unshare, CloneFlags},
- sys::wait,
- unistd::{self, Gid, Uid},
-};
-use serde::Serialize;
-use tee_readwrite::{TeeReader, TeeWriter};
-
-use common::{error::*, string_hash::*, types::*};
-use walkdir::WalkDir;
-
-use super::{
- jobserver::Jobserver,
- ns, tar,
- util::{checkable::Checkable, cjson, fs},
-};
-use crate::{
- paths,
- util::{stack::Stack, unix},
-};
-
-const BUILD_UID: Uid = Uid::from_raw(1000);
-const BUILD_GID: Gid = Gid::from_raw(1000);
-
-type InputHasher = blake3::Hasher;
-type DependencyHasher = blake3::Hasher;
-type LayerHasher = blake3::Hasher;
-type ArchiveHasher = blake3::Hasher;
-
-type DependMap = BTreeMap<String, Vec<ArchiveHash>>;
-
-fn dependency_hash(dep: &Dependency) -> DependencyHash {
- DependencyHash(StringHash(
- cjson::digest::<DependencyHasher, _>(dep).unwrap().into(),
- ))
-}
-
-fn input_hash(task: &Task) -> InputHash {
- #[derive(Debug, Serialize)]
- struct HashInput<'a> {
- pub command: &'a str,
- pub workdir: &'a str,
- pub rootfs: &'a ArchiveHash,
- pub inherit: &'a [LayerHash],
- pub depends: HashMap<DependencyHash, &'a Dependency>,
- pub outputs: &'a HashMap<String, String>,
- }
- let input = HashInput {
- command: &task.command,
- workdir: &task.workdir,
- rootfs: &task.rootfs,
- inherit: &task.inherit,
- depends: task
- .depends
- .iter()
- .map(|dep| (dependency_hash(dep), dep))
- .collect(),
- outputs: &task.outputs,
- };
-
- InputHash(StringHash(
- cjson::digest::<InputHasher, _>(&input).unwrap().into(),
- ))
-}
-
-fn init_task(input_hash: &InputHash, task: &Task) -> Result<fs::Mount> {
- // Remove metadata first to ensure task invalidation
- fs::ensure_removed(&paths::task_cache_filename(input_hash))?;
-
- let task_state_dir = paths::task_state_dir(input_hash);
-
- let task_layer_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_LAYER_SUBDIR]);
- fs::ensure_removed(&task_layer_dir)?;
- fs::mkdir(&task_layer_dir)?;
- fs::fixup_permissions(&task_layer_dir)?;
-
- let task_tmp_dir = paths::task_tmp_dir(input_hash);
-
- let taskdir = paths::join(&[&task_tmp_dir, paths::TASK_TASKDIR]);
- fs::mkdir(&taskdir)?;
- let runfile = paths::join(&[&taskdir, paths::TASK_RUN]);
- std::fs::write(&runfile, &task.command)
- .with_context(|| format!("Failed to write {}", runfile))?;
-
- let mount_target = paths::join(&[&task_tmp_dir, &task.workdir]);
- let mount = if task.inherit.is_empty() {
- fs::mount(task_layer_dir, &mount_target, None, MsFlags::MS_BIND, None)
- .with_context(|| format!("Failed to bind mount to {:?}", mount_target))?
- } else {
- let task_work_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_WORK_SUBDIR]);
- fs::ensure_removed(&task_work_dir)?;
- fs::mkdir(&task_work_dir)?;
- fs::fixup_permissions(&task_work_dir)?;
-
- let lower = task
- .inherit
- .iter()
- .rev()
- .map(paths::layer_dir)
- .collect::<Vec<_>>()
- .join(":");
- let options = format!(
- "xino=off,index=off,metacopy=off,lowerdir={lower},upperdir={upper},workdir={work}",
- lower = lower,
- upper = task_layer_dir,
- work = task_work_dir
- );
- fs::mount(
- "overlay",
- &mount_target,
- Some("overlay"),
- MsFlags::empty(),
- Some(&options),
- )
- .with_context(|| format!("Failed to mount work overlay to {:?}", mount_target))?
- };
-
- Ok(mount)
-}
-
-fn get_contents<P1: AsRef<Path>, P2: AsRef<Path>>(
- path: P1,
- prefix: P2,
-) -> Result<(HashSet<PathBuf>, HashSet<PathBuf>)> {
- let mut dirs = HashSet::new();
- let mut files = HashSet::new();
-
- let root: PathBuf = Path::new("/").join(prefix.as_ref());
-
- for result in WalkDir::new(path.as_ref()).min_depth(1).into_iter() {
- let entry = result
- .with_context(|| format!("Failed to list contents of directory {:?}", path.as_ref()))?;
- let is_dir = entry.file_type().is_dir();
- let entry_path = root.join(entry.into_path().strip_prefix(path.as_ref()).unwrap());
- if is_dir {
- dirs.insert(entry_path);
- } else {
- files.insert(entry_path);
- }
- }
-
- dirs.insert(root);
-
- Ok((dirs, files))
-}
-
-fn check_conflicts(
- dirs1: &HashSet<PathBuf>,
- files1: &HashSet<PathBuf>,
- dirs2: &HashSet<PathBuf>,
- files2: &HashSet<PathBuf>,
-) -> Result<()> {
- let mut conflicts = Vec::new();
-
- conflicts.extend(files1.intersection(files2));
- conflicts.extend(dirs1.intersection(files2));
- conflicts.extend(files1.intersection(dirs2));
-
- if !conflicts.is_empty() {
- let mut conflict_strings: Box<[_]> = conflicts
- .into_iter()
- .map(|path| path.to_string_lossy().to_string())
- .collect();
- conflict_strings.sort();
- return Err(Error::new(format!(
- "Found the following file conflicts in dependencies:\n{}",
- conflict_strings.join("\n")
- )));
- }
-
- Ok(())
-}
-
-fn init_task_rootfs(input_hash: &InputHash, task: &Task) -> Result<Stack<fs::Mount>> {
- let task_tmp_dir = paths::task_tmp_dir(input_hash);
- let mount_target = paths::join(&[&task_tmp_dir, paths::TASK_TMP_ROOTFS_SUBDIR]);
- let rootfs = paths::depend_dir(&task.rootfs);
-
- let depends = unpack_dependencies(input_hash, task).context("Failed to unpack dependencies")?;
-
- let mut mounts = Stack::new();
-
- mounts.push(
- fs::mount(rootfs, &mount_target, None, MsFlags::MS_BIND, None)
- .with_context(|| format!("Failed to bind mount rootfs to {:?}", mount_target))?,
- );
- mount::mount::<str, str, str, str>(
- None,
- &mount_target,
- None,
- MsFlags::MS_REMOUNT | MsFlags::MS_BIND | MsFlags::MS_RDONLY,
- None,
- )
- .context("Failed to mount container rootfs read-only")?;
-
- let (mut dirs, mut files) = get_contents(&mount_target, "")?;
-
- for (path, dep_hashes) in depends {
- assert!(!dep_hashes.is_empty());
-
- if !path.is_empty() && !path.starts_with('/') {
- return Err(Error::new(format!(
- "Dependency path {:?} must be absolute",
- path
- )));
- }
-
- let dep_target = mount_target.clone() + &path;
- let dep_paths: Box<[_]> = dep_hashes.iter().map(paths::depend_dir).collect();
-
- for dep in dep_paths.iter() {
- let (dep_dirs, dep_files) = get_contents(dep, &path)?;
- check_conflicts(&dirs, &files, &dep_dirs, &dep_files)?;
- dirs.extend(dep_dirs);
- files.extend(dep_files);
- }
-
- let options = format!(
- "xino=off,index=off,metacopy=off,lowerdir={lower}:{base}",
- lower = dep_paths.join(":"),
- base = dep_target,
- );
-
- mounts.push(
- fs::mount(
- "overlay",
- dep_target.as_str(),
- Some("overlay"),
- MsFlags::MS_RDONLY,
- Some(&options),
- )
- .with_context(|| format!("Failed to mount overlay to {:?}", dep_target))?,
- );
- }
-
- Ok(mounts)
-}
-
-fn cleanup_task(input_hash: &InputHash) -> Result<()> {
- let task_tmp_dir = paths::task_tmp_dir(input_hash);
-
- fs::ensure_removed(&task_tmp_dir).context("Failed to remove task tmp dir")?;
-
- let task_state_dir = paths::task_state_dir(input_hash);
- let task_work_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_WORK_SUBDIR]);
- fs::ensure_removed(&task_work_dir).context("Failed to remove overlayfs workdir")?;
-
- Ok(())
-}
-
-fn get_archive_filename(task: &Task, hash: &ArchiveHash) -> String {
- if let Some(pinned_name) = task.pins.get(hash) {
- paths::pinned_archive_filename(pinned_name)
- } else {
- paths::archive_filename(hash)
- }
-}
-
-fn unpack_dependency(task: &Task, hash: &ArchiveHash) -> Result<()> {
- let _lock = unix::lock(paths::depend_lock_filename(hash), true, true);
-
- let filename = get_archive_filename(task, hash);
-
- let dest = paths::depend_dir(hash);
- if Path::new(&dest).is_dir() {
- return Ok(());
- }
-
- (|| -> Result<()> {
- let tmp_dest = paths::depend_tmp_dir(hash);
- fs::ensure_removed(&tmp_dest)?;
-
- let file = fs::open(&filename)?;
- let hasher = ArchiveHasher::new();
- let buffered_hasher = BufWriter::with_capacity(16 * 1024 * 1024, hasher);
- let mut reader = TeeReader::new(file, buffered_hasher, false);
-
- tar::unpack(&mut reader, &tmp_dest)?;
-
- // Read file to end to get the correct hash
- io::copy(&mut reader, &mut io::sink())?;
-
- let (_, buffered_hasher) = reader.into_inner();
- let hasher = buffered_hasher.into_inner()?;
-
- let actual_hash = ArchiveHash(StringHash(hasher.finalize().into()));
-
- if &actual_hash != hash {
- return Err(Error::new(format!(
- "Incorrect file hash for {:?} (expected: {}, actual: {})",
- filename, hash, actual_hash,
- )));
- }
-
- fs::rename(&tmp_dest, &dest)?;
-
- Ok(())
- })()
- .with_context(|| format!("Failed to unpack {:?}", filename))
-}
-
-fn unpack_dependencies(input_hash: &InputHash, task: &Task) -> Result<DependMap> {
- let task_tmp_dir = paths::task_tmp_dir(input_hash);
-
- unpack_dependency(task, &task.rootfs)?;
-
- let mut ret = DependMap::new();
-
- for dep in &task.depends {
- match dep {
- Dependency::Fetch {
- name, target_dir, ..
- } => {
- let path = paths::join(&[&task_tmp_dir, target_dir]);
- fs::mkdir(&path)?;
- fs::copy(
- paths::join(&[paths::DOWNLOADS_DIR, name]),
- paths::join(&[&path, name]),
- )?;
- }
- Dependency::Task { output, path } => {
- unpack_dependency(task, output)?;
- ret.entry(path.clone()).or_default().push(*output);
- }
- }
- }
-
- Ok(ret)
-}
-
-fn collect_output(input_hash: &InputHash, task: &Task, path: &str) -> Result<Option<ArchiveHash>> {
- let source = paths::join(&[&paths::task_tmp_dir(input_hash), path]);
- if !Path::new(&source).is_dir() {
- return Ok(None);
- }
-
- let filename = paths::archive_tmp_filename(input_hash);
-
- let hash = (|| -> Result<ArchiveHash> {
- let file = fs::create(&filename)?;
- let hasher = ArchiveHasher::new();
- let writer = TeeWriter::new(file, hasher);
- let mut buffered_writer = BufWriter::with_capacity(16 * 1024 * 1024, writer);
-
- super::tar::pack(&task.rootfs, &mut buffered_writer, &source)?;
-
- let writer = buffered_writer.into_inner()?;
- let (file, hasher) = writer.into_inner();
- file.sync_all()?;
- drop(file);
-
- Ok(ArchiveHash(StringHash(hasher.finalize().into())))
- })()
- .with_context(|| format!("Failed to pack {:?} to {:?}", source, filename))?;
-
- fs::rename(filename, paths::archive_filename(&hash))?;
-
- Ok(Some(hash))
-}
-
-fn collect_outputs(input_hash: &InputHash, task: &Task) -> Result<HashMap<String, ArchiveHash>> {
- let mut ret = HashMap::new();
-
- for (name, path) in &task.outputs {
- if let Some(hash) = collect_output(input_hash, task, path)? {
- ret.insert(name.clone(), hash);
- }
- }
-
- Ok(ret)
-}
-
-fn run_task(input_hash: &InputHash, task: &Task, jobserver: &mut Jobserver) -> Result<()> {
- let _workdir_mount = init_task(input_hash, task).context("Failed to initialize task")?;
- let _rootfs_mounts =
- init_task_rootfs(input_hash, task).context("Failed to initialize task rootfs")?;
-
- let task_tmp_dir = paths::task_tmp_dir(input_hash);
- let rootfs = paths::join(&[&task_tmp_dir, paths::TASK_TMP_ROOTFS_SUBDIR]);
-
- let builddir_source = paths::join(&[&task_tmp_dir, paths::TASK_BUILDDIR]);
- let builddir_target = paths::join(&[&rootfs, paths::TASK_BUILDDIR]);
-
- let log_filename = paths::task_log_filename(input_hash);
-
- let mut exec_cmd = || -> Result<()> {
- let log = fs::create(&log_filename)?;
-
- let log_stdout = log
- .try_clone()
- .context("Failed to duplicate log file descriptor")?;
- let log_stderr = log
- .try_clone()
- .context("Failed to duplicate log file descriptor")?;
-
- mount::mount::<_, _, str, str>(
- Some(paths::DEV_DIR),
- paths::join(&[&rootfs, "dev"]).as_str(),
- None,
- MsFlags::MS_BIND | MsFlags::MS_REC,
- None,
- )
- .expect("Failed to bind mount /dev directory");
- mount::mount::<_, _, str, str>(
- Some(builddir_source.as_str()),
- builddir_target.as_str(),
- None,
- MsFlags::MS_BIND | MsFlags::MS_REC,
- None,
- )
- .expect("Failed to bind mount build directory");
-
- ns::pivot_root(&rootfs);
- mount::mount::<str, _, str, str>(
- None,
- "/",
- None,
- MsFlags::MS_PRIVATE | MsFlags::MS_REC,
- None,
- )
- .context("Failed to set MS_PRIVATE for container root")?;
- ns::container_mounts().context("Failed to set up container mounts")?;
-
- unistd::sethostname("rebel-builder").context("Failed to set hostname")?;
-
- prctl::set_no_new_privs().context("set_no_new_privs()")?;
-
- unshare(CloneFlags::CLONE_NEWUSER | CloneFlags::CLONE_NEWNS)
- .context("Failed to create user namespace")?;
- ns::setup_userns(BUILD_UID, BUILD_GID, Uid::from_raw(0), Gid::from_raw(0));
-
- jobserver
- .set_cloexec(false)
- .context("Failed to unset O_CLOEXEC on jobserver pipe")?;
-
- let err = Command::new("sh")
- .args(&["-ex", &paths::join(&[paths::TASK_TASKDIR, paths::TASK_RUN])])
- .stdin(Stdio::null())
- .stdout(log_stdout)
- .stderr(log_stderr)
- .env_clear()
- .env("PATH", "/usr/sbin:/usr/bin:/sbin:/bin")
- .env("HOME", "/build")
- .env("INPUT_HASH", input_hash.to_string())
- .env("MAKEFLAGS", jobserver.to_makeflags())
- .exec();
- eprintln!("{}", err);
- process::exit(127);
- };
-
- let pid = unsafe {
- ns::spawn(
- CloneFlags::CLONE_NEWNS
- | CloneFlags::CLONE_NEWPID
- | CloneFlags::CLONE_NEWIPC
- | CloneFlags::CLONE_NEWNET
- | CloneFlags::CLONE_NEWUTS,
- || exec_cmd().unwrap(),
- )
- }
- .context("Failed to run task container")?;
-
- let status = wait::waitpid(pid, None)?;
-
- if let Err(err) = status.check() {
- return Err(Error::new(format!(
- "Task failed: {}\nOutput: {}",
- err, log_filename
- )));
- }
-
- Ok(())
-}
-
-fn hash_layer(input_hash: &InputHash, task: &Task) -> Result<Option<LayerHash>> {
- let task_state_dir = paths::task_state_dir(input_hash);
- let task_layer_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_LAYER_SUBDIR]);
-
- (|| -> Result<Option<LayerHash>> {
- if fs::is_dir_empty(&task_layer_dir)? {
- return Ok(None);
- }
-
- let hasher = LayerHasher::new();
- let mut buffered_writer = BufWriter::with_capacity(16 * 1024 * 1024, hasher);
-
- tar::pack(&task.rootfs, &mut buffered_writer, &task_layer_dir)?;
-
- let hasher = buffered_writer.into_inner()?;
- Ok(Some(LayerHash(StringHash(hasher.finalize().into()))))
- })()
- .with_context(|| format!("Failed to hash layer directory {:?}", task_layer_dir))
-}
-
-fn move_layer(input_hash: &InputHash, hash: &Option<LayerHash>) -> Result<()> {
- let task_state_dir = paths::task_state_dir(input_hash);
- let task_layer_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_LAYER_SUBDIR]);
-
- if let Some(hash) = hash {
- let layer_dir = paths::layer_dir(hash);
-
- let err = match std::fs::rename(&task_layer_dir, &layer_dir) {
- Ok(_) => return Ok(()),
- Err(err) => err,
- };
-
- if !matches!(
- err.raw_os_error(),
- Some(libc::EEXIST) | Some(libc::ENOTEMPTY)
- ) {
- return Err(err).with_context(|| {
- format!("Failed to rename {:?} to {:?}", task_layer_dir, layer_dir)
- });
- }
- }
-
- fs::ensure_removed(&task_layer_dir)
-}
-
-fn run_and_hash_task(
- input_hash: &InputHash,
- task: &Task,
- jobserver: &mut Jobserver,
-) -> Result<TaskOutput> {
- run_task(input_hash, task, jobserver)?;
-
- let outputs = collect_outputs(input_hash, task)?;
-
- let layer = hash_layer(input_hash, task)?;
- move_layer(input_hash, &layer)?;
-
- Ok(TaskOutput {
- input_hash: Some(*input_hash),
- layer,
- outputs,
- })
-}
-
-fn load_cached(input_hash: &InputHash) -> Result<TaskOutput> {
- let filename = paths::task_cache_filename(input_hash);
- let file = fs::open(&filename)?;
-
- serde_json::from_reader(file)
- .with_context(|| format!("Failed to read task cache data from {}", filename))
-}
-
-fn save_cached(input_hash: &InputHash, output: &TaskOutput) -> Result<()> {
- fs::mkdir(&paths::task_state_dir(input_hash))?;
-
- let tmp_filename = paths::task_cache_tmp_filename(input_hash);
- let filename = paths::task_cache_filename(input_hash);
-
- cjson::to_file(&tmp_filename, output)
- .with_context(|| format!("Failed to write task cache data to {}", tmp_filename))?;
-
- fs::rename(tmp_filename, filename)?;
-
- Ok(())
-}
-
-pub fn handle(task: Task, jobserver: &mut Jobserver) -> Result<TaskOutput> {
- let input_hash = input_hash(&task);
-
- let _lock = unix::lock(paths::task_lock_filename(&input_hash), true, true)
- .context("Failed to get task lock")?;
-
- let cached_output = load_cached(&input_hash);
- if !task.force_run {
- if let Ok(task_output) = cached_output {
- return Ok(task_output);
- }
- }
-
- let token = jobserver.wait();
-
- let start_time = Instant::now();
- println!("Starting task {} ({})", task.label, input_hash);
-
- let task_ret = run_and_hash_task(&input_hash, &task, jobserver);
- let cleanup_ret = cleanup_task(&input_hash);
-
- jobserver.post(token);
-
- let task_output = task_ret?;
- cleanup_ret.context("Failed to clean up after task")?;
-
- save_cached(&input_hash, &task_output)?;
-
- let duration = Instant::now().duration_since(start_time);
- println!(
- "Finished task {} ({}) in {}",
- task.label,
- input_hash,
- duration.as_secs_f32()
- );
-
- if let Ok(cached_output) = cached_output {
- if cached_output.outputs != task_output.outputs {
- println!(
- "WARNING: Output hashes for task {} do not match cached result\n Cached output: {}\n New output: {}",
- task.label,
- cjson::to_string(&cached_output.outputs).unwrap(),
- cjson::to_string(&task_output.outputs).unwrap(),
- );
- }
- }
-
- Ok(task_output)
-}