summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock36
-rw-r--r--Cargo.toml1
-rw-r--r--src/executor.rs50
-rw-r--r--src/resolve.rs12
-rw-r--r--src/runner.rs2
-rw-r--r--src/runner/runc.rs6
-rw-r--r--src/runner/runc/run.rs6
-rw-r--r--src/types.rs21
-rw-r--r--src/util.rs1
-rw-r--r--src/util/cjson.rs21
10 files changed, 117 insertions, 39 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 9c353d9..7867c03 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -322,6 +322,17 @@ dependencies = [
]
[[package]]
+name = "olpc-cjson"
+version = "0.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9409e2493366c8f19387c98c5189ab9c937541b5bf48f11390d038a59fdfd9c1"
+dependencies = [
+ "serde",
+ "serde_json",
+ "unicode-normalization",
+]
+
+[[package]]
name = "opaque-debug"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -441,6 +452,7 @@ dependencies = [
"libc",
"nix",
"oci-spec",
+ "olpc-cjson",
"serde",
"serde_json",
"serde_yaml",
@@ -589,12 +601,36 @@ dependencies = [
]
[[package]]
+name = "tinyvec"
+version = "1.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5b5220f05bb7de7f3f53c7c065e1199b3172696fe2db9f9c4d8ad9b4ee74c342"
+dependencies = [
+ "tinyvec_macros",
+]
+
+[[package]]
+name = "tinyvec_macros"
+version = "0.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
+
+[[package]]
name = "typenum"
version = "1.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "879f6906492a7cd215bfa4cf595b600146ccfac0c79bcbd1f3000162af5e8b06"
[[package]]
+name = "unicode-normalization"
+version = "0.1.19"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d54590932941a9e9266f0832deed84ebe1bf2e4c9e4a3554d393d18f5e854bf9"
+dependencies = [
+ "tinyvec",
+]
+
+[[package]]
name = "unicode-xid"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/Cargo.toml b/Cargo.toml
index 074267d..0bf8d69 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -13,6 +13,7 @@ ipc-channel = { git = "https://github.com/servo/ipc-channel.git" }
libc = "0.2.84"
nix = "0.21.0"
oci-spec = "0.2.8"
+olpc-cjson = "0.1.0"
serde = { version = "1", features = ["derive"] }
serde_json = "1.0.62"
serde_yaml = "0.8"
diff --git a/src/executor.rs b/src/executor.rs
index 350acc3..639cd62 100644
--- a/src/executor.rs
+++ b/src/executor.rs
@@ -1,18 +1,18 @@
use std::collections::{HashMap, HashSet};
-use crate::{runner, types::*};
+use crate::{runner, types::*, util::cjson};
#[derive(Debug)]
pub struct Executor<'a> {
tasks: &'a TaskMap,
- tasks_blocked: HashSet<TaskRef>,
- tasks_runnable: Vec<TaskRef>,
- tasks_done: HashMap<TaskRef, TaskOutput>,
- rdeps: HashMap<TaskRef, Vec<TaskRef>>,
+ tasks_blocked: HashSet<TaskID>,
+ tasks_runnable: Vec<TaskID>,
+ tasks_done: HashMap<TaskID, TaskOutput>,
+ rdeps: HashMap<TaskID, Vec<TaskID>>,
}
impl<'a> Executor<'a> {
- pub fn new(tasks: &'a TaskMap, taskset: HashSet<TaskRef>) -> Self {
+ pub fn new(tasks: &'a TaskMap, taskset: HashSet<TaskID>) -> Self {
let mut exc = Executor {
tasks,
tasks_blocked: HashSet::new(),
@@ -22,7 +22,7 @@ impl<'a> Executor<'a> {
};
for task in taskset {
- let task_def = tasks.get(&task).expect("Invalid TaskRef");
+ let task_def = tasks.get(&task).expect("Invalid TaskID");
if task_def.depends.is_empty() {
exc.tasks_runnable.push(task);
} else {
@@ -38,8 +38,8 @@ impl<'a> Executor<'a> {
exc
}
- fn deps_satisfied(&self, task: &TaskRef) -> bool {
- let task_def = self.tasks.get(task).expect("Invalid TaskRef");
+ fn deps_satisfied(&self, task: &TaskID) -> bool {
+ let task_def = self.tasks.get(task).expect("Invalid TaskID");
task_def
.depends
@@ -47,10 +47,8 @@ impl<'a> Executor<'a> {
.all(|dep| self.tasks_done.contains_key(dep))
}
- fn task_deps(&self, task: &TaskRef) -> HashMap<TaskRef, OutputHash> {
- let task_def = self.tasks.get(&task).expect("Invalid TaskRef");
- task_def
- .depends
+ fn task_deps(&self, task: &TaskDef) -> HashMap<TaskID, OutputHash> {
+ task.depends
.iter()
.map(|dep| {
(
@@ -65,18 +63,28 @@ impl<'a> Executor<'a> {
}
fn run_one(&mut self, runner: &impl runner::Runner) -> runner::Result<()> {
- let task = self.tasks_runnable.pop().expect("No runnable tasks left");
- let task_deps = self.task_deps(&task);
+ let task_id = self.tasks_runnable.pop().expect("No runnable tasks left");
+ let task_def = self.tasks.get(&task_id).expect("Invalid TaskID");
+ let task_deps = self.task_deps(&task_def);
- let hash = runner.run(self.tasks, &task)?;
- let output = TaskOutput {
- task_ref: task.clone(),
+ let task = Task {
+ id: task_id,
+ def: task_def.clone(),
depends: task_deps,
- output_hash: hash,
};
- let rdeps = self.rdeps.get(&task);
- self.tasks_done.insert(task, output);
+ let input_hash = StringHash(cjson::digest::<InputHasher, _>(&task)?.into());
+
+ let output_hash = runner.run(self.tasks, &task.id)?;
+ let output = TaskOutput {
+ id: task.id.clone(),
+ depends: task.depends,
+ input_hash,
+ output_hash,
+ };
+
+ let rdeps = self.rdeps.get(&task.id);
+ self.tasks_done.insert(task.id, output);
for rdep in rdeps.unwrap_or(&Vec::new()) {
if !self.tasks_blocked.contains(rdep) {
diff --git a/src/resolve.rs b/src/resolve.rs
index f3ad284..b2e350d 100644
--- a/src/resolve.rs
+++ b/src/resolve.rs
@@ -4,7 +4,7 @@ use std::fmt;
use crate::types::*;
#[derive(Debug)]
-pub struct DepChain(pub Vec<TaskRef>);
+pub struct DepChain(pub Vec<TaskID>);
impl fmt::Display for DepChain {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
@@ -29,7 +29,7 @@ pub enum Error {
}
impl Error {
- fn extend(&mut self, task: TaskRef) {
+ fn extend(&mut self, task: TaskID) {
let tasks = match self {
Error::TaskNotFound(ref mut tasks) => tasks,
Error::DependencyCycle(ref mut tasks) => tasks,
@@ -62,7 +62,7 @@ enum ResolveState {
#[derive(Debug)]
pub struct Resolver<'a> {
tasks: &'a TaskMap,
- resolve_state: HashMap<TaskRef, ResolveState>,
+ resolve_state: HashMap<TaskID, ResolveState>,
}
impl<'a> Resolver<'a> {
@@ -79,7 +79,7 @@ impl<'a> Resolver<'a> {
.all(|resolved| *resolved == ResolveState::Resolved)
}
- pub fn add_task(&mut self, task: &TaskRef) -> Vec<Error> {
+ pub fn add_task(&mut self, task: &TaskID) -> Vec<Error> {
match self.resolve_state.get(task) {
Some(ResolveState::Resolving) => {
return vec![Error::DependencyCycle(DepChain(vec![task.clone()]))]
@@ -118,13 +118,13 @@ impl<'a> Resolver<'a> {
ret
}
- pub fn add_goal(&mut self, task: &TaskRef) -> Vec<Error> {
+ pub fn add_goal(&mut self, task: &TaskID) -> Vec<Error> {
let ret = self.add_task(task);
debug_assert!(self.tasks_resolved());
ret
}
- pub fn to_taskset(self) -> HashSet<TaskRef> {
+ pub fn to_taskset(self) -> HashSet<TaskID> {
debug_assert!(self.tasks_resolved());
self.resolve_state
diff --git a/src/runner.rs b/src/runner.rs
index ebad475..8bb39eb 100644
--- a/src/runner.rs
+++ b/src/runner.rs
@@ -7,5 +7,5 @@ use crate::types::*;
pub type Result<T> = io::Result<T>;
pub trait Runner {
- fn run(&self, tasks: &TaskMap, task: &TaskRef) -> Result<OutputHash>;
+ fn run(&self, tasks: &TaskMap, task: &TaskID) -> Result<OutputHash>;
}
diff --git a/src/runner/runc.rs b/src/runner/runc.rs
index 9725d92..f4016e3 100644
--- a/src/runner/runc.rs
+++ b/src/runner/runc.rs
@@ -14,7 +14,7 @@ use crate::util::ipc::CheckDisconnect;
#[derive(Debug, Deserialize, Serialize)]
struct Request(
- TaskRef,
+ TaskID,
TaskDef,
ipc::IpcSender<Result<OutputHash, run::Error>>,
);
@@ -106,8 +106,8 @@ impl RuncRunner {
}
impl super::Runner for RuncRunner {
- fn run(&self, tasks: &TaskMap, task: &TaskRef) -> super::Result<OutputHash> {
- let task_def = tasks.get(task).expect("Invalid TaskRef");
+ fn run(&self, tasks: &TaskMap, task: &TaskID) -> super::Result<OutputHash> {
+ let task_def = tasks.get(task).expect("Invalid TaskID");
let (reply_tx, reply_rx) = ipc::channel().expect("IPC channel creation failed");
self.channel
diff --git a/src/runner/runc/run.rs b/src/runner/runc/run.rs
index 99396a0..1091b1b 100644
--- a/src/runner/runc/run.rs
+++ b/src/runner/runc/run.rs
@@ -65,11 +65,11 @@ fn init_task() -> Result<(), Error> {
Ok(())
}
-fn output_filename(task: TaskRef) -> PathBuf {
+fn output_filename(task: TaskID) -> PathBuf {
Path::new("build/state").join(format!("{}.tar", task))
}
-fn collect_output(task: TaskRef, task_def: TaskDef) -> Result<OutputHash, io::Error> {
+fn collect_output(task: TaskID, task_def: TaskDef) -> Result<OutputHash, io::Error> {
let file = util::unix::create_as(
output_filename(task),
Some(unshare::BUILD_UID),
@@ -86,7 +86,7 @@ fn collect_output(task: TaskRef, task_def: TaskDef) -> Result<OutputHash, io::Er
Ok(StringHash(hasher.finalize().into()))
}
-pub fn handle_task(task: TaskRef, task_def: TaskDef) -> Result<OutputHash, Error> {
+pub fn handle_task(task: TaskID, task_def: TaskDef) -> Result<OutputHash, Error> {
init_task()?;
spec::generate_spec(task_def.run.as_str())
diff --git a/src/types.rs b/src/types.rs
index 3cf4826..4da58d8 100644
--- a/src/types.rs
+++ b/src/types.rs
@@ -2,22 +2,29 @@ use serde::{Deserialize, Serialize};
use sha2::Sha256;
use std::collections::HashMap;
-pub type TaskRef = String;
+pub type TaskID = String;
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct TaskDef {
#[serde(default)]
- pub depends: Vec<TaskRef>,
+ pub depends: Vec<TaskID>,
#[serde(default)]
pub output: Vec<String>,
pub run: String,
}
+#[derive(Clone, Debug, Deserialize, Serialize)]
+pub struct Task {
+ pub id: TaskID,
+ pub def: TaskDef,
+ pub depends: HashMap<TaskID, OutputHash>,
+}
+
#[derive(Debug, Default)]
pub struct TaskMap(pub HashMap<String, TaskDef>);
impl TaskMap {
- pub fn get(&self, task: &TaskRef) -> Option<&TaskDef> {
+ pub fn get(&self, task: &TaskID) -> Option<&TaskDef> {
self.0.get(task)
}
}
@@ -42,12 +49,16 @@ impl<'de> Deserialize<'de> for StringHash {
}
}
+pub type InputHasher = Sha256;
+pub type InputHash = StringHash;
+
pub type OutputHasher = Sha256;
pub type OutputHash = StringHash;
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct TaskOutput {
- pub task_ref: TaskRef,
- pub depends: HashMap<TaskRef, OutputHash>,
+ pub id: TaskID,
+ pub depends: HashMap<TaskID, OutputHash>,
+ pub input_hash: InputHash,
pub output_hash: OutputHash,
}
diff --git a/src/util.rs b/src/util.rs
index 0dbee5b..5791564 100644
--- a/src/util.rs
+++ b/src/util.rs
@@ -1,3 +1,4 @@
+pub mod cjson;
pub mod ipc;
pub mod tar;
pub mod unix;
diff --git a/src/util/cjson.rs b/src/util/cjson.rs
new file mode 100644
index 0000000..6627f42
--- /dev/null
+++ b/src/util/cjson.rs
@@ -0,0 +1,21 @@
+use std::io::Write;
+
+use olpc_cjson::CanonicalFormatter;
+use serde::Serialize;
+use serde_json::error::Result;
+use sha2::{digest, Digest};
+
+pub fn new_serializer<W: Write>(writer: W) -> serde_json::Serializer<W, CanonicalFormatter> {
+ serde_json::Serializer::with_formatter(writer, CanonicalFormatter::new())
+}
+
+pub fn to_writer<W: Write, T: ?Sized + Serialize>(writer: W, value: &T) -> Result<()> {
+ let mut ser = new_serializer(writer);
+ value.serialize(&mut ser)
+}
+
+pub fn digest<D: Digest + Write, T: ?Sized + Serialize>(value: &T) -> Result<digest::Output<D>> {
+ let mut digest = <D as Digest>::new();
+ to_writer(&mut digest, value)?;
+ Ok(digest.finalize())
+}