summaryrefslogtreecommitdiffstats
path: root/crates/rebel-runner/src/task.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/rebel-runner/src/task.rs')
-rw-r--r--crates/rebel-runner/src/task.rs638
1 files changed, 638 insertions, 0 deletions
diff --git a/crates/rebel-runner/src/task.rs b/crates/rebel-runner/src/task.rs
new file mode 100644
index 0000000..5bb253a
--- /dev/null
+++ b/crates/rebel-runner/src/task.rs
@@ -0,0 +1,638 @@
+use std::{
+ collections::{BTreeMap, HashMap, HashSet},
+ io::{self, BufWriter},
+ os::unix::prelude::CommandExt,
+ path::{Path, PathBuf},
+ process::{self, Command, Stdio},
+ time::Instant,
+};
+
+use capctl::prctl;
+use nix::{
+ mount::{self, MsFlags},
+ sched::{unshare, CloneFlags},
+ sys::{resource, time::TimeVal, wait},
+ unistd::{self, Gid, Uid},
+};
+use serde::Serialize;
+use tee_readwrite::{TeeReader, TeeWriter};
+
+use rebel_common::{error::*, string_hash::*, types::*};
+use walkdir::WalkDir;
+
+use super::{
+ jobserver::Jobserver,
+ ns, tar,
+ util::{checkable::Checkable, cjson, fs},
+};
+use crate::{
+ paths,
+ util::{stack::Stack, unix},
+};
+
+const BUILD_UID: Uid = Uid::from_raw(1000);
+const BUILD_GID: Gid = Gid::from_raw(1000);
+
+type InputHasher = blake3::Hasher;
+type DependencyHasher = blake3::Hasher;
+type LayerHasher = blake3::Hasher;
+type ArchiveHasher = blake3::Hasher;
+
+type DependMap = BTreeMap<String, Vec<ArchiveHash>>;
+
+fn dependency_hash(dep: &Dependency) -> DependencyHash {
+ DependencyHash(StringHash(
+ cjson::digest::<DependencyHasher, _>(dep).unwrap().into(),
+ ))
+}
+
+fn input_hash(task: &Task) -> InputHash {
+ #[derive(Debug, Serialize)]
+ struct HashInput<'a> {
+ pub command: &'a str,
+ pub workdir: &'a str,
+ pub rootfs: &'a ArchiveHash,
+ pub ancestors: &'a [LayerHash],
+ pub depends: HashMap<DependencyHash, &'a Dependency>,
+ pub outputs: &'a HashMap<String, String>,
+ }
+ let input = HashInput {
+ command: &task.command,
+ workdir: &task.workdir,
+ rootfs: &task.rootfs,
+ ancestors: &task.ancestors,
+ depends: task
+ .depends
+ .iter()
+ .map(|dep| (dependency_hash(dep), dep))
+ .collect(),
+ outputs: &task.outputs,
+ };
+
+ InputHash(StringHash(
+ cjson::digest::<InputHasher, _>(&input).unwrap().into(),
+ ))
+}
+
+fn init_task(input_hash: &InputHash, task: &Task) -> Result<fs::Mount> {
+ // Remove metadata first to ensure task invalidation
+ fs::ensure_removed(paths::task_cache_filename(input_hash))?;
+
+ let task_state_dir = paths::task_state_dir(input_hash);
+
+ let task_layer_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_LAYER_SUBDIR]);
+ fs::ensure_removed(&task_layer_dir)?;
+ fs::mkdir(&task_layer_dir)?;
+ fs::fixup_permissions(&task_layer_dir)?;
+
+ let task_tmp_dir = paths::task_tmp_dir(input_hash);
+
+ let taskdir = paths::join(&[&task_tmp_dir, paths::TASK_TASKDIR]);
+ fs::mkdir(&taskdir)?;
+ let runfile = paths::join(&[&taskdir, paths::TASK_RUN]);
+ std::fs::write(&runfile, &task.command)
+ .with_context(|| format!("Failed to write {}", runfile))?;
+
+ let mount_target = paths::join(&[&task_tmp_dir, &task.workdir]);
+ let mount = if task.ancestors.is_empty() {
+ fs::mount(task_layer_dir, &mount_target, None, MsFlags::MS_BIND, None)
+ .with_context(|| format!("Failed to bind mount to {:?}", mount_target))?
+ } else {
+ let task_work_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_WORK_SUBDIR]);
+ fs::ensure_removed(&task_work_dir)?;
+ fs::mkdir(&task_work_dir)?;
+ fs::fixup_permissions(&task_work_dir)?;
+
+ let lower = task
+ .ancestors
+ .iter()
+ .rev()
+ .map(paths::layer_dir)
+ .collect::<Vec<_>>()
+ .join(":");
+ let options = format!(
+ "xino=off,index=off,metacopy=off,lowerdir={lower},upperdir={upper},workdir={work}",
+ lower = lower,
+ upper = task_layer_dir,
+ work = task_work_dir
+ );
+ fs::mount(
+ "overlay",
+ &mount_target,
+ Some("overlay"),
+ MsFlags::empty(),
+ Some(&options),
+ )
+ .with_context(|| format!("Failed to mount work overlay to {:?}", mount_target))?
+ };
+
+ Ok(mount)
+}
+
+fn get_contents<P1: AsRef<Path>, P2: AsRef<Path>>(
+ path: P1,
+ prefix: P2,
+) -> Result<(HashSet<PathBuf>, HashSet<PathBuf>)> {
+ let mut dirs = HashSet::new();
+ let mut files = HashSet::new();
+
+ let root: PathBuf = Path::new("/").join(prefix.as_ref());
+
+ for result in WalkDir::new(path.as_ref()).min_depth(1).into_iter() {
+ let entry = result
+ .with_context(|| format!("Failed to list contents of directory {:?}", path.as_ref()))?;
+ let is_dir = entry.file_type().is_dir();
+ let entry_path = root.join(entry.into_path().strip_prefix(path.as_ref()).unwrap());
+ if is_dir {
+ dirs.insert(entry_path);
+ } else {
+ files.insert(entry_path);
+ }
+ }
+
+ dirs.insert(root);
+
+ Ok((dirs, files))
+}
+
+fn check_conflicts(
+ dirs1: &HashSet<PathBuf>,
+ files1: &HashSet<PathBuf>,
+ dirs2: &HashSet<PathBuf>,
+ files2: &HashSet<PathBuf>,
+) -> Result<()> {
+ let mut conflicts = Vec::new();
+
+ conflicts.extend(files1.intersection(files2));
+ conflicts.extend(dirs1.intersection(files2));
+ conflicts.extend(files1.intersection(dirs2));
+
+ if !conflicts.is_empty() {
+ let mut conflict_strings: Box<[_]> = conflicts
+ .into_iter()
+ .map(|path| path.to_string_lossy().to_string())
+ .collect();
+ conflict_strings.sort();
+ return Err(Error::new(format!(
+ "Found the following file conflicts in dependencies:\n{}",
+ conflict_strings.join("\n")
+ )));
+ }
+
+ Ok(())
+}
+
+fn init_task_rootfs(input_hash: &InputHash, task: &Task) -> Result<Stack<fs::Mount>> {
+ let task_tmp_dir = paths::task_tmp_dir(input_hash);
+ let mount_target = paths::join(&[&task_tmp_dir, paths::TASK_TMP_ROOTFS_SUBDIR]);
+ let rootfs = paths::depend_dir(&task.rootfs);
+
+ let depends = unpack_dependencies(input_hash, task).context("Failed to unpack dependencies")?;
+
+ let mut mounts = Stack::new();
+
+ mounts.push(
+ fs::mount(rootfs, &mount_target, None, MsFlags::MS_BIND, None)
+ .with_context(|| format!("Failed to bind mount rootfs to {:?}", mount_target))?,
+ );
+ mount::mount::<str, str, str, str>(
+ None,
+ &mount_target,
+ None,
+ MsFlags::MS_REMOUNT | MsFlags::MS_BIND | MsFlags::MS_RDONLY,
+ None,
+ )
+ .context("Failed to mount container rootfs read-only")?;
+
+ let (mut dirs, mut files) = get_contents(&mount_target, "")?;
+
+ for (path, dep_hashes) in depends {
+ assert!(!dep_hashes.is_empty());
+
+ if !path.is_empty() && !path.starts_with('/') {
+ return Err(Error::new(format!(
+ "Dependency path {:?} must be absolute",
+ path
+ )));
+ }
+
+ let dep_target = mount_target.clone() + &path;
+ let dep_paths: Box<[_]> = dep_hashes.iter().map(paths::depend_dir).collect();
+
+ for dep in dep_paths.iter() {
+ let (dep_dirs, dep_files) = get_contents(dep, &path)?;
+ check_conflicts(&dirs, &files, &dep_dirs, &dep_files)?;
+ dirs.extend(dep_dirs);
+ files.extend(dep_files);
+ }
+
+ let options = format!(
+ "xino=off,index=off,metacopy=off,lowerdir={lower}:{base}",
+ lower = dep_paths.join(":"),
+ base = dep_target,
+ );
+
+ mounts.push(
+ fs::mount(
+ "overlay",
+ dep_target.as_str(),
+ Some("overlay"),
+ MsFlags::MS_RDONLY,
+ Some(&options),
+ )
+ .with_context(|| format!("Failed to mount overlay to {:?}", dep_target))?,
+ );
+ }
+
+ Ok(mounts)
+}
+
+fn cleanup_task(input_hash: &InputHash) -> Result<()> {
+ let task_tmp_dir = paths::task_tmp_dir(input_hash);
+
+ fs::ensure_removed(task_tmp_dir).context("Failed to remove task tmp dir")?;
+
+ let task_state_dir = paths::task_state_dir(input_hash);
+ let task_work_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_WORK_SUBDIR]);
+ fs::ensure_removed(task_work_dir).context("Failed to remove overlayfs workdir")?;
+
+ Ok(())
+}
+
+fn get_archive_filename(task: &Task, hash: &ArchiveHash) -> String {
+ if let Some(pinned_name) = task.pins.get(hash) {
+ paths::pinned_archive_filename(pinned_name)
+ } else {
+ paths::archive_filename(hash)
+ }
+}
+
+fn unpack_dependency(task: &Task, hash: &ArchiveHash) -> Result<()> {
+ let _lock = unix::lock(paths::depend_lock_filename(hash), true, true);
+
+ let filename = get_archive_filename(task, hash);
+
+ let dest = paths::depend_dir(hash);
+ if Path::new(&dest).is_dir() {
+ return Ok(());
+ }
+
+ (|| -> Result<()> {
+ let tmp_dest = paths::depend_tmp_dir(hash);
+ fs::ensure_removed(&tmp_dest)?;
+
+ let file = fs::open(&filename)?;
+ let hasher = ArchiveHasher::new();
+ let buffered_hasher = BufWriter::with_capacity(16 * 1024 * 1024, hasher);
+ let mut reader = TeeReader::new(file, buffered_hasher, false);
+
+ tar::unpack(&mut reader, &tmp_dest)?;
+
+ // Read file to end to get the correct hash
+ io::copy(&mut reader, &mut io::sink())?;
+
+ let (_, buffered_hasher) = reader.into_inner();
+ let hasher = buffered_hasher.into_inner()?;
+
+ let actual_hash = ArchiveHash(StringHash(hasher.finalize().into()));
+
+ if &actual_hash != hash {
+ return Err(Error::new(format!(
+ "Incorrect file hash for {:?} (expected: {}, actual: {})",
+ filename, hash, actual_hash,
+ )));
+ }
+
+ fs::rename(&tmp_dest, &dest)?;
+
+ Ok(())
+ })()
+ .with_context(|| format!("Failed to unpack {:?}", filename))
+}
+
+fn unpack_dependencies(input_hash: &InputHash, task: &Task) -> Result<DependMap> {
+ let task_tmp_dir = paths::task_tmp_dir(input_hash);
+
+ unpack_dependency(task, &task.rootfs)?;
+
+ let mut ret = DependMap::new();
+
+ for dep in &task.depends {
+ match dep {
+ Dependency::Fetch {
+ name, target_dir, ..
+ } => {
+ let path = paths::join(&[&task_tmp_dir, target_dir]);
+ fs::mkdir(&path)?;
+ fs::copy(
+ paths::join(&[paths::DOWNLOADS_DIR, name]),
+ paths::join(&[&path, name]),
+ )?;
+ }
+ Dependency::Task { output, path } => {
+ unpack_dependency(task, output)?;
+ ret.entry(path.clone()).or_default().push(*output);
+ }
+ }
+ }
+
+ Ok(ret)
+}
+
+fn collect_output(input_hash: &InputHash, task: &Task, path: &str) -> Result<Option<ArchiveHash>> {
+ let source = paths::join(&[&paths::task_tmp_dir(input_hash), path]);
+ if !Path::new(&source).is_dir() {
+ return Ok(None);
+ }
+
+ let filename = paths::archive_tmp_filename(input_hash);
+
+ let hash = (|| -> Result<ArchiveHash> {
+ let file = fs::create(&filename)?;
+ let hasher = ArchiveHasher::new();
+ let writer = TeeWriter::new(file, hasher);
+ let mut buffered_writer = BufWriter::with_capacity(16 * 1024 * 1024, writer);
+
+ super::tar::pack(&task.rootfs, &mut buffered_writer, &source)?;
+
+ let writer = buffered_writer.into_inner()?;
+ let (file, hasher) = writer.into_inner();
+ file.sync_all()?;
+ drop(file);
+
+ Ok(ArchiveHash(StringHash(hasher.finalize().into())))
+ })()
+ .with_context(|| format!("Failed to pack {:?} to {:?}", source, filename))?;
+
+ fs::rename(filename, paths::archive_filename(&hash))?;
+
+ Ok(Some(hash))
+}
+
+fn collect_outputs(input_hash: &InputHash, task: &Task) -> Result<HashMap<String, ArchiveHash>> {
+ let mut ret = HashMap::new();
+
+ for (name, path) in &task.outputs {
+ if let Some(hash) = collect_output(input_hash, task, path)? {
+ ret.insert(name.clone(), hash);
+ }
+ }
+
+ Ok(ret)
+}
+
+fn run_task(input_hash: &InputHash, task: &Task, jobserver: &mut Jobserver) -> Result<()> {
+ let _workdir_mount = init_task(input_hash, task).context("Failed to initialize task")?;
+ let _rootfs_mounts =
+ init_task_rootfs(input_hash, task).context("Failed to initialize task rootfs")?;
+
+ let task_tmp_dir = paths::task_tmp_dir(input_hash);
+ let rootfs = paths::join(&[&task_tmp_dir, paths::TASK_TMP_ROOTFS_SUBDIR]);
+
+ let builddir_source = paths::join(&[&task_tmp_dir, paths::TASK_BUILDDIR]);
+ let builddir_target = paths::join(&[&rootfs, paths::TASK_BUILDDIR]);
+
+ let log_filename = paths::task_log_filename(input_hash);
+
+ let mut exec_cmd = || -> Result<()> {
+ let log = fs::create(&log_filename)?;
+
+ let log_stdout = log
+ .try_clone()
+ .context("Failed to duplicate log file descriptor")?;
+ let log_stderr = log
+ .try_clone()
+ .context("Failed to duplicate log file descriptor")?;
+
+ mount::mount::<_, _, str, str>(
+ Some(paths::DEV_DIR),
+ paths::join(&[&rootfs, "dev"]).as_str(),
+ None,
+ MsFlags::MS_BIND | MsFlags::MS_REC,
+ None,
+ )
+ .expect("Failed to bind mount /dev directory");
+ mount::mount::<_, _, str, str>(
+ Some(builddir_source.as_str()),
+ builddir_target.as_str(),
+ None,
+ MsFlags::MS_BIND | MsFlags::MS_REC,
+ None,
+ )
+ .expect("Failed to bind mount build directory");
+
+ ns::pivot_root(&rootfs);
+ mount::mount::<str, _, str, str>(
+ None,
+ "/",
+ None,
+ MsFlags::MS_PRIVATE | MsFlags::MS_REC,
+ None,
+ )
+ .context("Failed to set MS_PRIVATE for container root")?;
+ ns::container_mounts().context("Failed to set up container mounts")?;
+
+ unistd::sethostname("rebel-builder").context("Failed to set hostname")?;
+
+ prctl::set_no_new_privs().context("set_no_new_privs()")?;
+
+ unshare(CloneFlags::CLONE_NEWUSER | CloneFlags::CLONE_NEWNS)
+ .context("Failed to create user namespace")?;
+ ns::setup_userns(BUILD_UID, BUILD_GID, Uid::from_raw(0), Gid::from_raw(0));
+
+ jobserver
+ .set_cloexec(false)
+ .context("Failed to unset O_CLOEXEC on jobserver pipe")?;
+
+ let err = Command::new("sh")
+ .args(["-ex", &paths::join(&[paths::TASK_TASKDIR, paths::TASK_RUN])])
+ .stdin(Stdio::null())
+ .stdout(log_stdout)
+ .stderr(log_stderr)
+ .env_clear()
+ .env("PATH", "/usr/sbin:/usr/bin:/sbin:/bin")
+ .env("HOME", "/build")
+ .env("MAKEFLAGS", jobserver.to_makeflags())
+ .exec();
+ eprintln!("{}", err);
+ process::exit(127);
+ };
+
+ let pid = unsafe {
+ ns::spawn(
+ CloneFlags::CLONE_NEWNS
+ | CloneFlags::CLONE_NEWPID
+ | CloneFlags::CLONE_NEWIPC
+ | CloneFlags::CLONE_NEWNET
+ | CloneFlags::CLONE_NEWUTS,
+ || exec_cmd().unwrap(),
+ )
+ }
+ .context("Failed to run task container")?;
+
+ let status = wait::waitpid(pid, None)?;
+
+ if let Err(err) = status.check() {
+ return Err(Error::new(format!(
+ "Task failed: {}\nOutput: {}",
+ err, log_filename
+ )));
+ }
+
+ Ok(())
+}
+
+fn hash_layer(input_hash: &InputHash, task: &Task) -> Result<Option<LayerHash>> {
+ let task_state_dir = paths::task_state_dir(input_hash);
+ let task_layer_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_LAYER_SUBDIR]);
+
+ (|| -> Result<Option<LayerHash>> {
+ if fs::is_dir_empty(&task_layer_dir)? {
+ return Ok(None);
+ }
+
+ let hasher = LayerHasher::new();
+ let mut buffered_writer = BufWriter::with_capacity(16 * 1024 * 1024, hasher);
+
+ tar::pack(&task.rootfs, &mut buffered_writer, &task_layer_dir)?;
+
+ let hasher = buffered_writer.into_inner()?;
+ Ok(Some(LayerHash(StringHash(hasher.finalize().into()))))
+ })()
+ .with_context(|| format!("Failed to hash layer directory {:?}", task_layer_dir))
+}
+
+fn move_layer(input_hash: &InputHash, hash: &Option<LayerHash>) -> Result<()> {
+ let task_state_dir = paths::task_state_dir(input_hash);
+ let task_layer_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_LAYER_SUBDIR]);
+
+ if let Some(hash) = hash {
+ let layer_dir = paths::layer_dir(hash);
+
+ let err = match std::fs::rename(&task_layer_dir, &layer_dir) {
+ Ok(_) => return Ok(()),
+ Err(err) => err,
+ };
+
+ if !matches!(
+ err.raw_os_error(),
+ Some(libc::EEXIST) | Some(libc::ENOTEMPTY)
+ ) {
+ return Err(err).with_context(|| {
+ format!("Failed to rename {:?} to {:?}", task_layer_dir, layer_dir)
+ });
+ }
+ }
+
+ fs::ensure_removed(&task_layer_dir)
+}
+
+fn run_and_hash_task(
+ input_hash: &InputHash,
+ task: &Task,
+ jobserver: &mut Jobserver,
+) -> Result<TaskOutput> {
+ run_task(input_hash, task, jobserver)?;
+
+ let outputs = collect_outputs(input_hash, task)?;
+
+ let layer = hash_layer(input_hash, task)?;
+ move_layer(input_hash, &layer)?;
+
+ Ok(TaskOutput {
+ input_hash: Some(*input_hash),
+ layer,
+ outputs,
+ })
+}
+
+fn load_cached(input_hash: &InputHash) -> Result<TaskOutput> {
+ let filename = paths::task_cache_filename(input_hash);
+ let file = fs::open(&filename)?;
+
+ serde_json::from_reader(file)
+ .with_context(|| format!("Failed to read task cache data from {}", filename))
+}
+
+fn save_cached(input_hash: &InputHash, output: &TaskOutput) -> Result<()> {
+ fs::mkdir(paths::task_state_dir(input_hash))?;
+
+ let tmp_filename = paths::task_cache_tmp_filename(input_hash);
+ let filename = paths::task_cache_filename(input_hash);
+
+ cjson::to_file(&tmp_filename, output)
+ .with_context(|| format!("Failed to write task cache data to {}", tmp_filename))?;
+
+ fs::rename(tmp_filename, filename)?;
+
+ Ok(())
+}
+
+trait AsSecsF32 {
+ fn as_secs_f32(&self) -> f32;
+}
+
+impl AsSecsF32 for TimeVal {
+ fn as_secs_f32(&self) -> f32 {
+ self.tv_sec() as f32 + 1e-6 * (self.tv_usec() as f32)
+ }
+}
+
+fn get_usage(total: f32) -> String {
+ let usage = resource::getrusage(resource::UsageWho::RUSAGE_CHILDREN).expect("getrusage()");
+
+ let user = usage.user_time().as_secs_f32();
+ let system = usage.system_time().as_secs_f32();
+ let cpu = (100.0 * (user + system) / total).round();
+
+ format!("{user:.2}s user {system:.2}s system {cpu:.0}% cpu")
+}
+
+pub fn handle(task: Task, jobserver: &mut Jobserver) -> Result<TaskOutput> {
+ let input_hash = input_hash(&task);
+
+ let _lock = unix::lock(paths::task_lock_filename(&input_hash), true, true)
+ .context("Failed to get task lock")?;
+
+ let cached_output = load_cached(&input_hash);
+ if !task.force_run {
+ if let Ok(task_output) = cached_output {
+ return Ok(task_output);
+ }
+ }
+
+ let token = jobserver.wait();
+
+ let start_time = Instant::now();
+ println!("Starting task {} ({})", task.label, input_hash);
+
+ let task_ret = run_and_hash_task(&input_hash, &task, jobserver);
+ let cleanup_ret = cleanup_task(&input_hash);
+
+ jobserver.post(token);
+
+ let task_output = task_ret?;
+ cleanup_ret.context("Failed to clean up after task")?;
+
+ save_cached(&input_hash, &task_output)?;
+
+ let duration = Instant::now().duration_since(start_time).as_secs_f32();
+ let usage = get_usage(duration);
+ println!(
+ "Finished task {} ({}) in {:.2}s ({})",
+ task.label, input_hash, duration, usage,
+ );
+
+ if let Ok(cached_output) = cached_output {
+ if cached_output.outputs != task_output.outputs {
+ println!(
+ "WARNING: Output hashes for task {} do not match cached result\n Cached output: {}\n New output: {}",
+ task.label,
+ cjson::to_string(&cached_output.outputs).unwrap(),
+ cjson::to_string(&task_output.outputs).unwrap(),
+ );
+ }
+ }
+
+ Ok(task_output)
+}