summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Schiffer <mschiffer@universe-factory.net>2021-10-16 12:29:00 +0200
committerMatthias Schiffer <mschiffer@universe-factory.net>2021-10-18 21:05:26 +0200
commit84c69faf6cee6a565f16fd82e0b1b6bcfc6ed1ff (patch)
treec411f481c70cbb3acfb70e048bcec8e2304a74f7
parent1b5cb7b5589d34133d342ca4d9177112555e0464 (diff)
downloadrebel-84c69faf6cee6a565f16fd82e0b1b6bcfc6ed1ff.tar
rebel-84c69faf6cee6a565f16fd82e0b1b6bcfc6ed1ff.zip
Move task cache handling from executor to runner
The input hash and the state directory is only a concern of the runner this way; it is only passed to the executor at all for the summary output. We're losing some cache metadata this way, as the TaskMeta struct is removed completely. Timestamps can be added back easily, but to implement proper cache cleanup we will need a replacement for the args map.
-rw-r--r--src/executor.rs93
-rw-r--r--src/paths.rs4
-rw-r--r--src/runner/container/run.rs130
-rw-r--r--src/runner/container/tar.rs9
-rw-r--r--src/runner/mod.rs1
5 files changed, 94 insertions, 143 deletions
diff --git a/src/executor.rs b/src/executor.rs
index 86b921e..e6fbccd 100644
--- a/src/executor.rs
+++ b/src/executor.rs
@@ -1,11 +1,6 @@
-use std::{
- collections::{HashMap, HashSet},
- convert::TryInto,
- time::{Instant, SystemTime, UNIX_EPOCH},
-};
+use std::collections::{HashMap, HashSet};
use indoc::indoc;
-use serde::{Deserialize, Serialize};
use crate::{
context::{Context, TaskRef},
@@ -14,58 +9,9 @@ use crate::{
task::*,
template::TemplateEngine,
types::*,
- util::{cjson, error::*, fs},
+ util::error::*,
};
-fn input_hash(task: &runner::TaskInput) -> InputHash {
- InputHash(StringHash(
- cjson::digest::<InputHasher, _>(task).unwrap().into(),
- ))
-}
-#[derive(Clone, Debug, Deserialize, Serialize)]
-struct TaskMeta {
- pub id: TaskID,
- pub args: HashMap<String, serde_json::Value>,
- pub inherit: Vec<LayerHash>,
- pub depends: HashMap<DependencyHash, Dependency>,
- pub output: TaskOutput,
- pub start_time: u64,
- pub duration: u64,
-}
-
-impl TaskMeta {
- fn load(input_hash: &InputHash) -> Result<TaskMeta> {
- let filename = paths::task_meta_filename(input_hash);
- let file = fs::open(&filename)?;
-
- serde_json::from_reader(file)
- .with_context(|| format!("Failed to read task metadata from {}", filename))
- }
-
- fn save(&self) -> Result<()> {
- fs::mkdir(&paths::task_state_dir(&self.output.input_hash))?;
-
- let tmp_filename = paths::task_meta_tmp_filename(&self.output.input_hash);
- let filename = paths::task_meta_filename(&self.output.input_hash);
-
- cjson::to_file(&tmp_filename, self)
- .with_context(|| format!("Failed to write task metadata to {}", tmp_filename))?;
-
- fs::rename(tmp_filename, filename)?;
-
- Ok(())
- }
-}
-
-fn epoch_msecs() -> u64 {
- SystemTime::now()
- .duration_since(UNIX_EPOCH)
- .unwrap()
- .as_millis()
- .try_into()
- .unwrap()
-}
-
#[derive(Debug)]
pub struct Executor<'ctx> {
ctx: &'ctx Context,
@@ -293,46 +239,13 @@ impl<'ctx> Executor<'ctx> {
depends: task_deps,
outputs: task_output,
};
- let input_hash = input_hash(&input);
-
- // Use cached result
- if let Ok(meta) = TaskMeta::load(&input_hash) {
- return Ok(meta.output);
- }
-
- // Remove metadata first to ensure task invalidation
- fs::ensure_removed(&paths::task_meta_filename(&input_hash))?;
let task = runner::Task {
label: format!("{:#}", task_ref),
input,
- input_hash,
- };
-
- let start_time = epoch_msecs();
- let start_instant = Instant::now();
-
- let output = runner.run(&task)?;
-
- let end_instant = Instant::now();
- let duration = end_instant.duration_since(start_instant);
-
- let meta = TaskMeta {
- id: task_ref.id.clone(),
- args: task_ref
- .args
- .iter()
- .map(|(k, v)| (k.clone(), serde_json::to_value(v).unwrap()))
- .collect(),
- inherit: task.input.inherit,
- depends: task.input.depends,
- output,
- start_time,
- duration: duration.as_millis().try_into().unwrap(),
};
- meta.save()?;
- Ok(meta.output)
+ runner.run(&task)
}
fn update_runnable(&mut self, task_ref: &TaskRef) {
diff --git a/src/paths.rs b/src/paths.rs
index 708413d..a5e1577 100644
--- a/src/paths.rs
+++ b/src/paths.rs
@@ -81,11 +81,11 @@ pub fn task_state_dir(hash: &InputHash) -> String {
join(&[TASK_STATE_DIR, &hash.to_string()])
}
-pub fn task_meta_tmp_filename(hash: &InputHash) -> String {
+pub fn task_cache_tmp_filename(hash: &InputHash) -> String {
join(&[TASK_STATE_DIR, &hash.to_string(), "task.json.tmp"])
}
-pub fn task_meta_filename(hash: &InputHash) -> String {
+pub fn task_cache_filename(hash: &InputHash) -> String {
join(&[TASK_STATE_DIR, &hash.to_string(), "task.json"])
}
diff --git a/src/runner/container/run.rs b/src/runner/container/run.rs
index c15e4fb..3135899 100644
--- a/src/runner/container/run.rs
+++ b/src/runner/container/run.rs
@@ -12,7 +12,7 @@ use tee_readwrite::TeeWriter;
use crate::{
paths, runner,
types::*,
- util::{error::*, fs},
+ util::{cjson, error::*, fs},
};
use super::{spec, tar};
@@ -20,8 +20,17 @@ use super::{spec, tar};
pub const BUILD_UID: u32 = 1000;
pub const BUILD_GID: u32 = 1000;
-fn init_task(task: &runner::Task) -> Result<fs::Mount> {
- let task_state_dir = paths::task_state_dir(&task.input_hash);
+fn input_hash(task: &runner::TaskInput) -> InputHash {
+ InputHash(StringHash(
+ cjson::digest::<InputHasher, _>(task).unwrap().into(),
+ ))
+}
+
+fn init_task(input_hash: &InputHash, task: &runner::Task) -> Result<fs::Mount> {
+ // Remove metadata first to ensure task invalidation
+ fs::ensure_removed(&paths::task_cache_filename(input_hash))?;
+
+ let task_state_dir = paths::task_state_dir(input_hash);
fs::ensure_removed(&task_state_dir)?;
fs::mkdir(&task_state_dir)?;
@@ -29,7 +38,7 @@ fn init_task(task: &runner::Task) -> Result<fs::Mount> {
fs::mkdir(&task_layer_dir)?;
fs::fixup_permissions(&task_layer_dir)?;
- let task_tmp_dir = paths::task_tmp_dir(&task.input_hash);
+ let task_tmp_dir = paths::task_tmp_dir(input_hash);
let mount_target = paths::join(&[&task_tmp_dir, paths::TASK_WORKDIR]);
let mount = if task.input.inherit.is_empty() {
@@ -73,8 +82,8 @@ fn init_task(task: &runner::Task) -> Result<fs::Mount> {
Ok(mount)
}
-fn init_task_rootfs(task: &runner::Task) -> Result<fs::Mount> {
- let task_tmp_dir = paths::task_tmp_dir(&task.input_hash);
+fn init_task_rootfs(input_hash: &InputHash) -> Result<fs::Mount> {
+ let task_tmp_dir = paths::task_tmp_dir(input_hash);
let depends_dir = paths::join(&[&task_tmp_dir, paths::TASK_TMP_DEPENDS_SUBDIR]);
let mount_target = paths::join(&[&task_tmp_dir, paths::TASK_TMP_ROOTFS_SUBDIR]);
@@ -96,12 +105,12 @@ fn init_task_rootfs(task: &runner::Task) -> Result<fs::Mount> {
Ok(mount)
}
-fn cleanup_task(task: &runner::Task) -> Result<()> {
- let task_tmp_dir = paths::task_tmp_dir(&task.input_hash);
+fn cleanup_task(input_hash: &InputHash) -> Result<()> {
+ let task_tmp_dir = paths::task_tmp_dir(input_hash);
fs::ensure_removed(&task_tmp_dir).context("Failed to remove task tmp dir")?;
- let task_state_dir = paths::task_state_dir(&task.input_hash);
+ let task_state_dir = paths::task_state_dir(input_hash);
let task_work_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_WORK_SUBDIR]);
fs::ensure_removed(&task_work_dir).context("Failed to remove overlayfs workdir")?;
@@ -119,8 +128,8 @@ fn unpack_dependency<P1: AsRef<Path>, P2: AsRef<Path>>(filename: P1, dest: P2) -
})
}
-fn unpack_dependencies(task: &runner::Task) -> Result<()> {
- let task_tmp_dir = paths::task_tmp_dir(&task.input_hash);
+fn unpack_dependencies(input_hash: &InputHash, task: &runner::Task) -> Result<()> {
+ let task_tmp_dir = paths::task_tmp_dir(input_hash);
let downloads_dir = paths::join(&[&task_tmp_dir, paths::TASK_DLDIR]);
let depends_dir = paths::join(&[&task_tmp_dir, paths::TASK_TMP_DEPENDS_SUBDIR]);
@@ -147,19 +156,15 @@ fn unpack_dependencies(task: &runner::Task) -> Result<()> {
Ok(())
}
-fn collect_output(task: &runner::Task, path: &str) -> Result<Option<ArchiveHash>> {
- let source: PathBuf = [
- &paths::task_tmp_dir(&task.input_hash),
- paths::TASK_DESTDIR,
- path,
- ]
- .iter()
- .collect();
+fn collect_output(input_hash: &InputHash, path: &str) -> Result<Option<ArchiveHash>> {
+ let source: PathBuf = [&paths::task_tmp_dir(input_hash), paths::TASK_DESTDIR, path]
+ .iter()
+ .collect();
if !source.is_dir() {
return Ok(None);
}
- let filename = paths::archive_tmp_filename(&task.input_hash);
+ let filename = paths::archive_tmp_filename(input_hash);
let hash = (|| -> Result<ArchiveHash> {
let file = fs::create(&filename)?;
@@ -167,7 +172,7 @@ fn collect_output(task: &runner::Task, path: &str) -> Result<Option<ArchiveHash>
let writer = TeeWriter::new(file, hasher);
let mut buffered_writer = BufWriter::with_capacity(16 * 1024 * 1024, writer);
- super::tar::pack(task, &mut buffered_writer, &source)?;
+ super::tar::pack(input_hash, &mut buffered_writer, &source)?;
let writer = buffered_writer.into_inner()?;
let (file, hasher) = writer.into_inner();
@@ -183,11 +188,14 @@ fn collect_output(task: &runner::Task, path: &str) -> Result<Option<ArchiveHash>
Ok(Some(hash))
}
-fn collect_outputs(task: &runner::Task) -> Result<HashMap<String, ArchiveHash>> {
+fn collect_outputs(
+ input_hash: &InputHash,
+ task: &runner::Task,
+) -> Result<HashMap<String, ArchiveHash>> {
let mut ret = HashMap::new();
for (name, path) in &task.input.outputs {
- if let Some(hash) = collect_output(task, path)? {
+ if let Some(hash) = collect_output(input_hash, path)? {
ret.insert(name.clone(), hash);
}
}
@@ -195,18 +203,18 @@ fn collect_outputs(task: &runner::Task) -> Result<HashMap<String, ArchiveHash>>
Ok(ret)
}
-fn run_task(task: &runner::Task) -> Result<()> {
- let _workdir_mount = init_task(task).context("Failed to initialize task")?;
- unpack_dependencies(task).context("Failed to unpack dependencies")?;
- let _rootfs_mount = init_task_rootfs(task).context("Failed to initialize task rootfs")?;
+fn run_task(input_hash: &InputHash, task: &runner::Task) -> Result<()> {
+ let _workdir_mount = init_task(input_hash, task).context("Failed to initialize task")?;
+ unpack_dependencies(input_hash, task).context("Failed to unpack dependencies")?;
+ let _rootfs_mount = init_task_rootfs(input_hash).context("Failed to initialize task rootfs")?;
- let task_tmp_dir = paths::task_tmp_dir(&task.input_hash);
+ let task_tmp_dir = paths::task_tmp_dir(input_hash);
let command = formatdoc! {"
INPUT_HASH={input_hash}
{command}",
- input_hash = task.input_hash,
+ input_hash = input_hash,
command = task.input.command,
};
@@ -223,7 +231,7 @@ fn run_task(task: &runner::Task) -> Result<()> {
.map_err(Error::new)
.context("Failed to save runtime config")?;
- let log_filename = paths::task_log_filename(&task.input_hash);
+ let log_filename = paths::task_log_filename(input_hash);
let log = fs::create(&log_filename)?;
let log_stdout = log
.try_clone()
@@ -236,7 +244,7 @@ fn run_task(task: &runner::Task) -> Result<()> {
.arg("--root")
.arg(paths::TASK_TMP_CONTAINERS_ROOT_SUBDIR)
.arg("run")
- .arg(task.input_hash.to_string())
+ .arg(input_hash.to_string())
.current_dir(task_tmp_dir)
.stdin(process::Stdio::null())
.stdout(log_stdout)
@@ -256,8 +264,8 @@ fn run_task(task: &runner::Task) -> Result<()> {
Ok(())
}
-fn hash_layer(task: &runner::Task) -> Result<Option<LayerHash>> {
- let task_state_dir = paths::task_state_dir(&task.input_hash);
+fn hash_layer(input_hash: &InputHash) -> Result<Option<LayerHash>> {
+ let task_state_dir = paths::task_state_dir(input_hash);
let task_layer_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_LAYER_SUBDIR]);
(|| -> Result<Option<LayerHash>> {
@@ -268,7 +276,7 @@ fn hash_layer(task: &runner::Task) -> Result<Option<LayerHash>> {
let hasher = LayerHasher::new();
let mut buffered_writer = BufWriter::with_capacity(16 * 1024 * 1024, hasher);
- tar::pack(task, &mut buffered_writer, &task_layer_dir)?;
+ tar::pack(input_hash, &mut buffered_writer, &task_layer_dir)?;
let hasher = buffered_writer.into_inner()?;
Ok(Some(LayerHash(StringHash(hasher.finalize().into()))))
@@ -276,8 +284,8 @@ fn hash_layer(task: &runner::Task) -> Result<Option<LayerHash>> {
.with_context(|| format!("Failed to hash layer directory {:?}", task_layer_dir))
}
-fn move_layer(task: &runner::Task, hash: &Option<LayerHash>) -> Result<()> {
- let task_state_dir = paths::task_state_dir(&task.input_hash);
+fn move_layer(input_hash: &InputHash, hash: &Option<LayerHash>) -> Result<()> {
+ let task_state_dir = paths::task_state_dir(input_hash);
let task_layer_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_LAYER_SUBDIR]);
if let Some(hash) = hash {
@@ -301,31 +309,61 @@ fn move_layer(task: &runner::Task, hash: &Option<LayerHash>) -> Result<()> {
fs::ensure_removed(&task_layer_dir)
}
-fn run_and_hash_task(task: &runner::Task) -> Result<runner::TaskOutput> {
- run_task(task)?;
+fn run_and_hash_task(input_hash: &InputHash, task: &runner::Task) -> Result<runner::TaskOutput> {
+ run_task(input_hash, task)?;
- let outputs = collect_outputs(task)?;
+ let outputs = collect_outputs(input_hash, task)?;
- let layer = hash_layer(task)?;
- move_layer(task, &layer)?;
+ let layer = hash_layer(input_hash)?;
+ move_layer(input_hash, &layer)?;
Ok(runner::TaskOutput {
- input_hash: task.input_hash,
+ input_hash: *input_hash,
layer,
outputs,
})
}
+fn load_cached(input_hash: &InputHash) -> Result<runner::TaskOutput> {
+ let filename = paths::task_cache_filename(input_hash);
+ let file = fs::open(&filename)?;
+
+ serde_json::from_reader(file)
+ .with_context(|| format!("Failed to read task cache data from {}", filename))
+}
+
+fn save_cached(input_hash: &InputHash, output: &runner::TaskOutput) -> Result<()> {
+ fs::mkdir(&paths::task_state_dir(input_hash))?;
+
+ let tmp_filename = paths::task_cache_tmp_filename(input_hash);
+ let filename = paths::task_cache_filename(input_hash);
+
+ cjson::to_file(&tmp_filename, output)
+ .with_context(|| format!("Failed to write task cache data to {}", tmp_filename))?;
+
+ fs::rename(tmp_filename, filename)?;
+
+ Ok(())
+}
+
pub fn handle_task(task: runner::Task) -> Result<runner::TaskOutput> {
- println!("Starting task {} ({})", task.label, task.input_hash);
+ let input_hash = input_hash(&task.input);
- let task_ret = run_and_hash_task(&task);
- let cleanup_ret = cleanup_task(&task);
+ if let Ok(task_output) = load_cached(&input_hash) {
+ return Ok(task_output);
+ }
+
+ println!("Starting task {} ({})", task.label, input_hash);
+
+ let task_ret = run_and_hash_task(&input_hash, &task);
+ let cleanup_ret = cleanup_task(&input_hash);
let task_output = task_ret?;
cleanup_ret.context("Failed to clean up after task")?;
- println!("Finished task {} ({})", task.label, task.input_hash);
+ save_cached(&input_hash, &task_output)?;
+
+ println!("Finished task {} ({})", task.label, input_hash);
Ok(task_output)
}
diff --git a/src/runner/container/tar.rs b/src/runner/container/tar.rs
index 74bcd16..c04f236 100644
--- a/src/runner/container/tar.rs
+++ b/src/runner/container/tar.rs
@@ -7,18 +7,19 @@ use std::{
use nix::mount::MsFlags;
use crate::{
- paths, runner,
+ paths,
+ types::InputHash,
util::{error::*, fs, Checkable},
};
use super::spec;
pub fn pack<W: Write, P: AsRef<Path>>(
- task: &runner::Task,
+ input_hash: &InputHash,
archive: &mut W,
source: P,
) -> Result<()> {
- let task_tmp_dir = paths::task_tmp_dir(&task.input_hash);
+ let task_tmp_dir = paths::task_tmp_dir(input_hash);
let rootfs_mount_target = paths::join(&[&task_tmp_dir, paths::TASK_TMP_ROOTFS_SUBDIR]);
let _rootfs_mount = fs::mount(
@@ -50,7 +51,7 @@ pub fn pack<W: Write, P: AsRef<Path>>(
.arg("--root")
.arg(paths::TASK_TMP_CONTAINERS_ROOT_SUBDIR)
.arg("run")
- .arg(task.input_hash.to_string())
+ .arg(input_hash.to_string())
.current_dir(task_tmp_dir)
.stdin(process::Stdio::null())
.stdout(process::Stdio::piped())
diff --git a/src/runner/mod.rs b/src/runner/mod.rs
index ac9a749..a0b88ef 100644
--- a/src/runner/mod.rs
+++ b/src/runner/mod.rs
@@ -24,7 +24,6 @@ pub struct TaskOutput {
pub struct Task {
pub label: String,
pub input: TaskInput,
- pub input_hash: InputHash,
}
pub trait Runner {