summaryrefslogtreecommitdiffstats
path: root/crates/runner/src/task.rs
diff options
context:
space:
mode:
authorMatthias Schiffer <mschiffer@universe-factory.net>2021-10-25 00:19:45 +0200
committerMatthias Schiffer <mschiffer@universe-factory.net>2021-10-25 00:19:45 +0200
commit34ac18d20c13a78914d447fee83204811a27b1e4 (patch)
tree56763d4ea46927105fcc6a71e03d5bd75a6947a6 /crates/runner/src/task.rs
parenta1a185370da27f2cc3df84d3a8d7141f9ce7db16 (diff)
downloadrebel-34ac18d20c13a78914d447fee83204811a27b1e4.tar
rebel-34ac18d20c13a78914d447fee83204811a27b1e4.zip
Move runner into separate crate
Diffstat (limited to 'crates/runner/src/task.rs')
-rw-r--r--crates/runner/src/task.rs460
1 files changed, 460 insertions, 0 deletions
diff --git a/crates/runner/src/task.rs b/crates/runner/src/task.rs
new file mode 100644
index 0000000..296c1a0
--- /dev/null
+++ b/crates/runner/src/task.rs
@@ -0,0 +1,460 @@
+use std::{
+ collections::HashMap,
+ io::BufWriter,
+ os::unix::prelude::CommandExt,
+ path::{Path, PathBuf},
+ process::{self, Command, Stdio},
+ time::Instant,
+};
+
+use capctl::prctl;
+use nix::{
+ mount::{self, MsFlags},
+ sched::{unshare, CloneFlags},
+ sys::wait,
+ unistd::{self, Gid, Uid},
+};
+use serde::Serialize;
+use tee_readwrite::TeeWriter;
+
+use common::{error::*, string_hash::*, types::*};
+
+use super::{
+ jobserver::Jobserver,
+ ns, tar,
+ util::{cjson, fs, Checkable},
+};
+use crate::paths;
+
+const BUILD_UID: Uid = Uid::from_raw(1000);
+const BUILD_GID: Gid = Gid::from_raw(1000);
+
+type InputHasher = blake3::Hasher;
+type DependencyHasher = blake3::Hasher;
+type LayerHasher = blake3::Hasher;
+type ArchiveHasher = blake3::Hasher;
+
+fn dependency_hash(dep: &Dependency) -> DependencyHash {
+ DependencyHash(StringHash(
+ cjson::digest::<DependencyHasher, _>(dep).unwrap().into(),
+ ))
+}
+
+fn input_hash(task: &Task) -> InputHash {
+ #[derive(Debug, Serialize)]
+ struct HashInput<'a> {
+ pub command: &'a str,
+ pub inherit: &'a [LayerHash],
+ pub depends: HashMap<DependencyHash, &'a Dependency>,
+ pub outputs: &'a HashMap<String, String>,
+ }
+ let input = HashInput {
+ command: &task.command,
+ inherit: &task.inherit,
+ depends: task
+ .depends
+ .iter()
+ .map(|dep| (dependency_hash(dep), dep))
+ .collect(),
+ outputs: &task.outputs,
+ };
+
+ InputHash(StringHash(
+ cjson::digest::<InputHasher, _>(&input).unwrap().into(),
+ ))
+}
+
+fn init_task(input_hash: &InputHash, task: &Task) -> Result<fs::Mount> {
+ // Remove metadata first to ensure task invalidation
+ fs::ensure_removed(&paths::task_cache_filename(input_hash))?;
+
+ let task_state_dir = paths::task_state_dir(input_hash);
+ fs::ensure_removed(&task_state_dir)?;
+ fs::mkdir(&task_state_dir)?;
+
+ let task_layer_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_LAYER_SUBDIR]);
+ fs::mkdir(&task_layer_dir)?;
+ fs::fixup_permissions(&task_layer_dir)?;
+
+ let task_tmp_dir = paths::task_tmp_dir(input_hash);
+
+ let taskdir = paths::join(&[&task_tmp_dir, paths::TASK_TASKDIR]);
+ fs::mkdir(&taskdir)?;
+ let runfile = paths::join(&[&taskdir, paths::TASK_RUN]);
+ std::fs::write(&runfile, &task.command)
+ .with_context(|| format!("Failed to write {}", runfile))?;
+
+ let mount_target = paths::join(&[&task_tmp_dir, paths::TASK_WORKDIR]);
+ let mount = if task.inherit.is_empty() {
+ fs::mount(task_layer_dir, &mount_target, None, MsFlags::MS_BIND, None)
+ .with_context(|| format!("Failed to bind mount to {:?}", mount_target))?
+ } else {
+ let task_work_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_WORK_SUBDIR]);
+ fs::mkdir(&task_work_dir)?;
+ fs::fixup_permissions(&task_work_dir)?;
+
+ let lower = task
+ .inherit
+ .iter()
+ .rev()
+ .map(paths::layer_dir)
+ .collect::<Vec<_>>()
+ .join(":");
+ let options = format!(
+ "xino=off,index=off,metacopy=off,lowerdir={lower},upperdir={upper},workdir={work}",
+ lower = lower,
+ upper = task_layer_dir,
+ work = task_work_dir
+ );
+ fs::mount(
+ "overlay",
+ &mount_target,
+ Some("overlay"),
+ MsFlags::empty(),
+ Some(&options),
+ )
+ .with_context(|| format!("Failed to mount work overlay to {:?}", mount_target))?
+ };
+
+ Ok(mount)
+}
+
+fn init_task_rootfs(input_hash: &InputHash) -> Result<fs::Mount> {
+ let task_tmp_dir = paths::task_tmp_dir(input_hash);
+ let depends_dir = paths::join(&[&task_tmp_dir, paths::TASK_TMP_DEPENDS_SUBDIR]);
+ let mount_target = paths::join(&[&task_tmp_dir, paths::TASK_TMP_ROOTFS_SUBDIR]);
+
+ let lower = [&depends_dir, paths::ROOTFS_DIR].join(":");
+ let options = format!(
+ "xino=off,index=off,metacopy=off,lowerdir={lower}",
+ lower = lower,
+ );
+
+ let mount = fs::mount(
+ "overlay",
+ &mount_target,
+ Some("overlay"),
+ MsFlags::empty(),
+ Some(&options),
+ )
+ .with_context(|| format!("Failed to mount rootfs overlay to {:?}", mount_target))?;
+
+ Ok(mount)
+}
+
+fn cleanup_task(input_hash: &InputHash) -> Result<()> {
+ let task_tmp_dir = paths::task_tmp_dir(input_hash);
+
+ fs::ensure_removed(&task_tmp_dir).context("Failed to remove task tmp dir")?;
+
+ let task_state_dir = paths::task_state_dir(input_hash);
+ let task_work_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_WORK_SUBDIR]);
+ fs::ensure_removed(&task_work_dir).context("Failed to remove overlayfs workdir")?;
+
+ Ok(())
+}
+
+fn unpack_dependency<P1: AsRef<Path>, P2: AsRef<Path>>(filename: P1, dest: P2) -> Result<()> {
+ let file = fs::open(filename.as_ref())?;
+ tar::unpack(file, dest.as_ref()).with_context(|| {
+ format!(
+ "Failed to unpack {:?} to {:?}",
+ filename.as_ref(),
+ dest.as_ref()
+ )
+ })
+}
+
+fn unpack_dependencies(input_hash: &InputHash, task: &Task) -> Result<()> {
+ let task_tmp_dir = paths::task_tmp_dir(input_hash);
+ let downloads_dir = paths::join(&[&task_tmp_dir, paths::TASK_DLDIR]);
+ let depends_dir = paths::join(&[&task_tmp_dir, paths::TASK_TMP_DEPENDS_SUBDIR]);
+
+ fs::mkdir(&downloads_dir)?;
+ fs::mkdir(&depends_dir)?;
+
+ for dep in &task.depends {
+ match dep {
+ Dependency::Fetch { name, .. } => {
+ fs::copy(
+ paths::join(&[paths::DOWNLOADS_DIR, name]),
+ paths::join(&[&downloads_dir, name]),
+ )?;
+ }
+ Dependency::Task { output, path } => {
+ unpack_dependency(
+ paths::archive_filename(output),
+ paths::join(&[&depends_dir, path]),
+ )?;
+ }
+ }
+ }
+
+ Ok(())
+}
+
+fn collect_output(input_hash: &InputHash, path: &str) -> Result<Option<ArchiveHash>> {
+ let source: PathBuf = [&paths::task_tmp_dir(input_hash), paths::TASK_DESTDIR, path]
+ .iter()
+ .collect();
+ if !source.is_dir() {
+ return Ok(None);
+ }
+
+ let filename = paths::archive_tmp_filename(input_hash);
+
+ let hash = (|| -> Result<ArchiveHash> {
+ let file = fs::create(&filename)?;
+ let hasher = ArchiveHasher::new();
+ let writer = TeeWriter::new(file, hasher);
+ let mut buffered_writer = BufWriter::with_capacity(16 * 1024 * 1024, writer);
+
+ super::tar::pack(&mut buffered_writer, &source)?;
+
+ let writer = buffered_writer.into_inner()?;
+ let (file, hasher) = writer.into_inner();
+ file.sync_all()?;
+ drop(file);
+
+ Ok(ArchiveHash(StringHash(hasher.finalize().into())))
+ })()
+ .with_context(|| format!("Failed to pack {:?} to {:?}", source, filename))?;
+
+ fs::rename(filename, paths::archive_filename(&hash))?;
+
+ Ok(Some(hash))
+}
+
+fn collect_outputs(input_hash: &InputHash, task: &Task) -> Result<HashMap<String, ArchiveHash>> {
+ let mut ret = HashMap::new();
+
+ for (name, path) in &task.outputs {
+ if let Some(hash) = collect_output(input_hash, path)? {
+ ret.insert(name.clone(), hash);
+ }
+ }
+
+ Ok(ret)
+}
+
+fn run_task(input_hash: &InputHash, task: &Task, mut jobserver: Jobserver) -> Result<()> {
+ let _workdir_mount = init_task(input_hash, task).context("Failed to initialize task")?;
+ unpack_dependencies(input_hash, task).context("Failed to unpack dependencies")?;
+ let _rootfs_mount = init_task_rootfs(input_hash).context("Failed to initialize task rootfs")?;
+
+ let task_tmp_dir = paths::task_tmp_dir(input_hash);
+ let rootfs = paths::join(&[&task_tmp_dir, paths::TASK_TMP_ROOTFS_SUBDIR]);
+
+ let builddir_source = paths::join(&[&task_tmp_dir, paths::TASK_BUILDDIR]);
+ let builddir_target = paths::join(&[&rootfs, paths::TASK_BUILDDIR]);
+
+ let log_filename = paths::task_log_filename(input_hash);
+
+ let mut exec_cmd = || -> Result<()> {
+ let log = fs::create(&log_filename)?;
+
+ let log_stdout = log
+ .try_clone()
+ .context("Failed to duplicate log file descriptor")?;
+ let log_stderr = log
+ .try_clone()
+ .context("Failed to duplicate log file descriptor")?;
+
+ mount::mount::<_, _, str, str>(
+ Some(paths::join(&[paths::ROOTFS_DIR, "dev"]).as_str()),
+ paths::join(&[&rootfs, "dev"]).as_str(),
+ None,
+ MsFlags::MS_BIND | MsFlags::MS_REC,
+ None,
+ )
+ .expect("Failed to bind mount /dev directory");
+ mount::mount::<_, _, str, str>(
+ Some(builddir_source.as_str()),
+ builddir_target.as_str(),
+ None,
+ MsFlags::MS_BIND | MsFlags::MS_REC,
+ None,
+ )
+ .expect("Failed to bind mount build directory");
+
+ ns::pivot_root(&rootfs);
+ mount::mount::<str, _, str, str>(
+ None,
+ "/",
+ None,
+ MsFlags::MS_PRIVATE | MsFlags::MS_REC,
+ None,
+ )
+ .context("Failed to set MS_PRIVATE for container root")?;
+ ns::container_mounts().context("Failed to set up container mounts")?;
+
+ unistd::sethostname("rebel-builder").context("Failed to set hostname")?;
+
+ prctl::set_no_new_privs().context("set_no_new_privs()")?;
+
+ unshare(CloneFlags::CLONE_NEWUSER | CloneFlags::CLONE_NEWNS)
+ .context("Failed to create user namespace")?;
+ ns::setup_userns(BUILD_UID, BUILD_GID, Uid::from_raw(0), Gid::from_raw(0));
+
+ jobserver
+ .set_cloexec(false)
+ .context("Failed to unset O_CLOEXEC on jobserver pipe")?;
+
+ let err = Command::new("sh")
+ .args(&[
+ "-ex",
+ &paths::abs(&paths::join(&[paths::TASK_TASKDIR, paths::TASK_RUN])),
+ ])
+ .stdin(Stdio::null())
+ .stdout(log_stdout)
+ .stderr(log_stderr)
+ .current_dir(paths::TASK_WORKDIR)
+ .env_clear()
+ .env("PATH", "/usr/sbin:/usr/bin:/sbin:/bin")
+ .env("HOME", "/build")
+ .env("INPUT_HASH", input_hash.to_string())
+ .env("MAKEFLAGS", jobserver.to_makeflags())
+ .exec();
+ eprintln!("{}", err);
+ process::exit(127);
+ };
+
+ let (pid, ()) = unsafe {
+ ns::spawn(
+ CloneFlags::CLONE_NEWNS
+ | CloneFlags::CLONE_NEWPID
+ | CloneFlags::CLONE_NEWIPC
+ | CloneFlags::CLONE_NEWNET
+ | CloneFlags::CLONE_NEWUTS,
+ (),
+ |()| exec_cmd().unwrap(),
+ )
+ }
+ .context("Failed to run task container")?;
+
+ let status = wait::waitpid(pid, None)?;
+
+ if let Err(err) = status.check() {
+ return Err(Error::new(format!(
+ "Task failed: {}\nOutput: {}",
+ err, log_filename
+ )));
+ }
+
+ Ok(())
+}
+
+fn hash_layer(input_hash: &InputHash) -> Result<Option<LayerHash>> {
+ let task_state_dir = paths::task_state_dir(input_hash);
+ let task_layer_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_LAYER_SUBDIR]);
+
+ (|| -> Result<Option<LayerHash>> {
+ if fs::is_dir_empty(&task_layer_dir)? {
+ return Ok(None);
+ }
+
+ let hasher = LayerHasher::new();
+ let mut buffered_writer = BufWriter::with_capacity(16 * 1024 * 1024, hasher);
+
+ tar::pack(&mut buffered_writer, &task_layer_dir)?;
+
+ let hasher = buffered_writer.into_inner()?;
+ Ok(Some(LayerHash(StringHash(hasher.finalize().into()))))
+ })()
+ .with_context(|| format!("Failed to hash layer directory {:?}", task_layer_dir))
+}
+
+fn move_layer(input_hash: &InputHash, hash: &Option<LayerHash>) -> Result<()> {
+ let task_state_dir = paths::task_state_dir(input_hash);
+ let task_layer_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_LAYER_SUBDIR]);
+
+ if let Some(hash) = hash {
+ let layer_dir = paths::layer_dir(hash);
+
+ let err = match std::fs::rename(&task_layer_dir, &layer_dir) {
+ Ok(_) => return Ok(()),
+ Err(err) => err,
+ };
+
+ if !matches!(
+ err.raw_os_error(),
+ Some(libc::EEXIST) | Some(libc::ENOTEMPTY)
+ ) {
+ return Err(err).with_context(|| {
+ format!("Failed to rename {:?} to {:?}", task_layer_dir, layer_dir)
+ });
+ }
+ }
+
+ fs::ensure_removed(&task_layer_dir)
+}
+
+fn run_and_hash_task(
+ input_hash: &InputHash,
+ task: &Task,
+ jobserver: Jobserver,
+) -> Result<TaskOutput> {
+ run_task(input_hash, task, jobserver)?;
+
+ let outputs = collect_outputs(input_hash, task)?;
+
+ let layer = hash_layer(input_hash)?;
+ move_layer(input_hash, &layer)?;
+
+ Ok(TaskOutput {
+ input_hash: *input_hash,
+ layer,
+ outputs,
+ })
+}
+
+fn load_cached(input_hash: &InputHash) -> Result<TaskOutput> {
+ let filename = paths::task_cache_filename(input_hash);
+ let file = fs::open(&filename)?;
+
+ serde_json::from_reader(file)
+ .with_context(|| format!("Failed to read task cache data from {}", filename))
+}
+
+fn save_cached(input_hash: &InputHash, output: &TaskOutput) -> Result<()> {
+ fs::mkdir(&paths::task_state_dir(input_hash))?;
+
+ let tmp_filename = paths::task_cache_tmp_filename(input_hash);
+ let filename = paths::task_cache_filename(input_hash);
+
+ cjson::to_file(&tmp_filename, output)
+ .with_context(|| format!("Failed to write task cache data to {}", tmp_filename))?;
+
+ fs::rename(tmp_filename, filename)?;
+
+ Ok(())
+}
+
+pub fn handle(task: Task, jobserver: Jobserver) -> Result<TaskOutput> {
+ let input_hash = input_hash(&task);
+
+ if let Ok(task_output) = load_cached(&input_hash) {
+ return Ok(task_output);
+ }
+
+ let start_time = Instant::now();
+ println!("Starting task {} ({})", task.label, input_hash);
+
+ let task_ret = run_and_hash_task(&input_hash, &task, jobserver);
+ let cleanup_ret = cleanup_task(&input_hash);
+
+ let task_output = task_ret?;
+ cleanup_ret.context("Failed to clean up after task")?;
+
+ save_cached(&input_hash, &task_output)?;
+
+ let duration = Instant::now().duration_since(start_time);
+ println!(
+ "Finished task {} ({}) in {}",
+ task.label,
+ input_hash,
+ duration.as_secs_f32()
+ );
+
+ Ok(task_output)
+}