diff options
-rw-r--r-- | Cargo.lock | 36 | ||||
-rw-r--r-- | Cargo.toml | 1 | ||||
-rw-r--r-- | src/executor.rs | 50 | ||||
-rw-r--r-- | src/resolve.rs | 12 | ||||
-rw-r--r-- | src/runner.rs | 2 | ||||
-rw-r--r-- | src/runner/runc.rs | 6 | ||||
-rw-r--r-- | src/runner/runc/run.rs | 6 | ||||
-rw-r--r-- | src/types.rs | 21 | ||||
-rw-r--r-- | src/util.rs | 1 | ||||
-rw-r--r-- | src/util/cjson.rs | 21 |
10 files changed, 117 insertions, 39 deletions
@@ -322,6 +322,17 @@ dependencies = [ ] [[package]] +name = "olpc-cjson" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9409e2493366c8f19387c98c5189ab9c937541b5bf48f11390d038a59fdfd9c1" +dependencies = [ + "serde", + "serde_json", + "unicode-normalization", +] + +[[package]] name = "opaque-debug" version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -441,6 +452,7 @@ dependencies = [ "libc", "nix", "oci-spec", + "olpc-cjson", "serde", "serde_json", "serde_yaml", @@ -589,12 +601,36 @@ dependencies = [ ] [[package]] +name = "tinyvec" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b5220f05bb7de7f3f53c7c065e1199b3172696fe2db9f9c4d8ad9b4ee74c342" +dependencies = [ + "tinyvec_macros", +] + +[[package]] +name = "tinyvec_macros" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" + +[[package]] name = "typenum" version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "879f6906492a7cd215bfa4cf595b600146ccfac0c79bcbd1f3000162af5e8b06" [[package]] +name = "unicode-normalization" +version = "0.1.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d54590932941a9e9266f0832deed84ebe1bf2e4c9e4a3554d393d18f5e854bf9" +dependencies = [ + "tinyvec", +] + +[[package]] name = "unicode-xid" version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -13,6 +13,7 @@ ipc-channel = { git = "https://github.com/servo/ipc-channel.git" } libc = "0.2.84" nix = "0.21.0" oci-spec = "0.2.8" +olpc-cjson = "0.1.0" serde = { version = "1", features = ["derive"] } serde_json = "1.0.62" serde_yaml = "0.8" diff --git a/src/executor.rs b/src/executor.rs index 350acc3..639cd62 100644 --- a/src/executor.rs +++ b/src/executor.rs @@ -1,18 +1,18 @@ use std::collections::{HashMap, HashSet}; -use crate::{runner, types::*}; +use crate::{runner, types::*, util::cjson}; #[derive(Debug)] pub struct Executor<'a> { tasks: &'a TaskMap, - tasks_blocked: HashSet<TaskRef>, - tasks_runnable: Vec<TaskRef>, - tasks_done: HashMap<TaskRef, TaskOutput>, - rdeps: HashMap<TaskRef, Vec<TaskRef>>, + tasks_blocked: HashSet<TaskID>, + tasks_runnable: Vec<TaskID>, + tasks_done: HashMap<TaskID, TaskOutput>, + rdeps: HashMap<TaskID, Vec<TaskID>>, } impl<'a> Executor<'a> { - pub fn new(tasks: &'a TaskMap, taskset: HashSet<TaskRef>) -> Self { + pub fn new(tasks: &'a TaskMap, taskset: HashSet<TaskID>) -> Self { let mut exc = Executor { tasks, tasks_blocked: HashSet::new(), @@ -22,7 +22,7 @@ impl<'a> Executor<'a> { }; for task in taskset { - let task_def = tasks.get(&task).expect("Invalid TaskRef"); + let task_def = tasks.get(&task).expect("Invalid TaskID"); if task_def.depends.is_empty() { exc.tasks_runnable.push(task); } else { @@ -38,8 +38,8 @@ impl<'a> Executor<'a> { exc } - fn deps_satisfied(&self, task: &TaskRef) -> bool { - let task_def = self.tasks.get(task).expect("Invalid TaskRef"); + fn deps_satisfied(&self, task: &TaskID) -> bool { + let task_def = self.tasks.get(task).expect("Invalid TaskID"); task_def .depends @@ -47,10 +47,8 @@ impl<'a> Executor<'a> { .all(|dep| self.tasks_done.contains_key(dep)) } - fn task_deps(&self, task: &TaskRef) -> HashMap<TaskRef, OutputHash> { - let task_def = self.tasks.get(&task).expect("Invalid TaskRef"); - task_def - .depends + fn task_deps(&self, task: &TaskDef) -> HashMap<TaskID, OutputHash> { + task.depends .iter() .map(|dep| { ( @@ -65,18 +63,28 @@ impl<'a> Executor<'a> { } fn run_one(&mut self, runner: &impl runner::Runner) -> runner::Result<()> { - let task = self.tasks_runnable.pop().expect("No runnable tasks left"); - let task_deps = self.task_deps(&task); + let task_id = self.tasks_runnable.pop().expect("No runnable tasks left"); + let task_def = self.tasks.get(&task_id).expect("Invalid TaskID"); + let task_deps = self.task_deps(&task_def); - let hash = runner.run(self.tasks, &task)?; - let output = TaskOutput { - task_ref: task.clone(), + let task = Task { + id: task_id, + def: task_def.clone(), depends: task_deps, - output_hash: hash, }; - let rdeps = self.rdeps.get(&task); - self.tasks_done.insert(task, output); + let input_hash = StringHash(cjson::digest::<InputHasher, _>(&task)?.into()); + + let output_hash = runner.run(self.tasks, &task.id)?; + let output = TaskOutput { + id: task.id.clone(), + depends: task.depends, + input_hash, + output_hash, + }; + + let rdeps = self.rdeps.get(&task.id); + self.tasks_done.insert(task.id, output); for rdep in rdeps.unwrap_or(&Vec::new()) { if !self.tasks_blocked.contains(rdep) { diff --git a/src/resolve.rs b/src/resolve.rs index f3ad284..b2e350d 100644 --- a/src/resolve.rs +++ b/src/resolve.rs @@ -4,7 +4,7 @@ use std::fmt; use crate::types::*; #[derive(Debug)] -pub struct DepChain(pub Vec<TaskRef>); +pub struct DepChain(pub Vec<TaskID>); impl fmt::Display for DepChain { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -29,7 +29,7 @@ pub enum Error { } impl Error { - fn extend(&mut self, task: TaskRef) { + fn extend(&mut self, task: TaskID) { let tasks = match self { Error::TaskNotFound(ref mut tasks) => tasks, Error::DependencyCycle(ref mut tasks) => tasks, @@ -62,7 +62,7 @@ enum ResolveState { #[derive(Debug)] pub struct Resolver<'a> { tasks: &'a TaskMap, - resolve_state: HashMap<TaskRef, ResolveState>, + resolve_state: HashMap<TaskID, ResolveState>, } impl<'a> Resolver<'a> { @@ -79,7 +79,7 @@ impl<'a> Resolver<'a> { .all(|resolved| *resolved == ResolveState::Resolved) } - pub fn add_task(&mut self, task: &TaskRef) -> Vec<Error> { + pub fn add_task(&mut self, task: &TaskID) -> Vec<Error> { match self.resolve_state.get(task) { Some(ResolveState::Resolving) => { return vec![Error::DependencyCycle(DepChain(vec![task.clone()]))] @@ -118,13 +118,13 @@ impl<'a> Resolver<'a> { ret } - pub fn add_goal(&mut self, task: &TaskRef) -> Vec<Error> { + pub fn add_goal(&mut self, task: &TaskID) -> Vec<Error> { let ret = self.add_task(task); debug_assert!(self.tasks_resolved()); ret } - pub fn to_taskset(self) -> HashSet<TaskRef> { + pub fn to_taskset(self) -> HashSet<TaskID> { debug_assert!(self.tasks_resolved()); self.resolve_state diff --git a/src/runner.rs b/src/runner.rs index ebad475..8bb39eb 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -7,5 +7,5 @@ use crate::types::*; pub type Result<T> = io::Result<T>; pub trait Runner { - fn run(&self, tasks: &TaskMap, task: &TaskRef) -> Result<OutputHash>; + fn run(&self, tasks: &TaskMap, task: &TaskID) -> Result<OutputHash>; } diff --git a/src/runner/runc.rs b/src/runner/runc.rs index 9725d92..f4016e3 100644 --- a/src/runner/runc.rs +++ b/src/runner/runc.rs @@ -14,7 +14,7 @@ use crate::util::ipc::CheckDisconnect; #[derive(Debug, Deserialize, Serialize)] struct Request( - TaskRef, + TaskID, TaskDef, ipc::IpcSender<Result<OutputHash, run::Error>>, ); @@ -106,8 +106,8 @@ impl RuncRunner { } impl super::Runner for RuncRunner { - fn run(&self, tasks: &TaskMap, task: &TaskRef) -> super::Result<OutputHash> { - let task_def = tasks.get(task).expect("Invalid TaskRef"); + fn run(&self, tasks: &TaskMap, task: &TaskID) -> super::Result<OutputHash> { + let task_def = tasks.get(task).expect("Invalid TaskID"); let (reply_tx, reply_rx) = ipc::channel().expect("IPC channel creation failed"); self.channel diff --git a/src/runner/runc/run.rs b/src/runner/runc/run.rs index 99396a0..1091b1b 100644 --- a/src/runner/runc/run.rs +++ b/src/runner/runc/run.rs @@ -65,11 +65,11 @@ fn init_task() -> Result<(), Error> { Ok(()) } -fn output_filename(task: TaskRef) -> PathBuf { +fn output_filename(task: TaskID) -> PathBuf { Path::new("build/state").join(format!("{}.tar", task)) } -fn collect_output(task: TaskRef, task_def: TaskDef) -> Result<OutputHash, io::Error> { +fn collect_output(task: TaskID, task_def: TaskDef) -> Result<OutputHash, io::Error> { let file = util::unix::create_as( output_filename(task), Some(unshare::BUILD_UID), @@ -86,7 +86,7 @@ fn collect_output(task: TaskRef, task_def: TaskDef) -> Result<OutputHash, io::Er Ok(StringHash(hasher.finalize().into())) } -pub fn handle_task(task: TaskRef, task_def: TaskDef) -> Result<OutputHash, Error> { +pub fn handle_task(task: TaskID, task_def: TaskDef) -> Result<OutputHash, Error> { init_task()?; spec::generate_spec(task_def.run.as_str()) diff --git a/src/types.rs b/src/types.rs index 3cf4826..4da58d8 100644 --- a/src/types.rs +++ b/src/types.rs @@ -2,22 +2,29 @@ use serde::{Deserialize, Serialize}; use sha2::Sha256; use std::collections::HashMap; -pub type TaskRef = String; +pub type TaskID = String; #[derive(Clone, Debug, Deserialize, Serialize)] pub struct TaskDef { #[serde(default)] - pub depends: Vec<TaskRef>, + pub depends: Vec<TaskID>, #[serde(default)] pub output: Vec<String>, pub run: String, } +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct Task { + pub id: TaskID, + pub def: TaskDef, + pub depends: HashMap<TaskID, OutputHash>, +} + #[derive(Debug, Default)] pub struct TaskMap(pub HashMap<String, TaskDef>); impl TaskMap { - pub fn get(&self, task: &TaskRef) -> Option<&TaskDef> { + pub fn get(&self, task: &TaskID) -> Option<&TaskDef> { self.0.get(task) } } @@ -42,12 +49,16 @@ impl<'de> Deserialize<'de> for StringHash { } } +pub type InputHasher = Sha256; +pub type InputHash = StringHash; + pub type OutputHasher = Sha256; pub type OutputHash = StringHash; #[derive(Clone, Debug, Deserialize, Serialize)] pub struct TaskOutput { - pub task_ref: TaskRef, - pub depends: HashMap<TaskRef, OutputHash>, + pub id: TaskID, + pub depends: HashMap<TaskID, OutputHash>, + pub input_hash: InputHash, pub output_hash: OutputHash, } diff --git a/src/util.rs b/src/util.rs index 0dbee5b..5791564 100644 --- a/src/util.rs +++ b/src/util.rs @@ -1,3 +1,4 @@ +pub mod cjson; pub mod ipc; pub mod tar; pub mod unix; diff --git a/src/util/cjson.rs b/src/util/cjson.rs new file mode 100644 index 0000000..6627f42 --- /dev/null +++ b/src/util/cjson.rs @@ -0,0 +1,21 @@ +use std::io::Write; + +use olpc_cjson::CanonicalFormatter; +use serde::Serialize; +use serde_json::error::Result; +use sha2::{digest, Digest}; + +pub fn new_serializer<W: Write>(writer: W) -> serde_json::Serializer<W, CanonicalFormatter> { + serde_json::Serializer::with_formatter(writer, CanonicalFormatter::new()) +} + +pub fn to_writer<W: Write, T: ?Sized + Serialize>(writer: W, value: &T) -> Result<()> { + let mut ser = new_serializer(writer); + value.serialize(&mut ser) +} + +pub fn digest<D: Digest + Write, T: ?Sized + Serialize>(value: &T) -> Result<digest::Output<D>> { + let mut digest = <D as Digest>::new(); + to_writer(&mut digest, value)?; + Ok(digest.finalize()) +} |