1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
|
use std::collections::{HashMap, HashSet};
use crate::{runner, types::*};
#[derive(Debug)]
pub struct Executor<'a> {
tasks: &'a TaskMap,
tasks_blocked: HashSet<TaskRef>,
tasks_runnable: Vec<TaskRef>,
tasks_done: HashMap<TaskRef, TaskOutput>,
rdeps: HashMap<TaskRef, Vec<TaskRef>>,
}
impl<'a> Executor<'a> {
pub fn new(tasks: &'a TaskMap, taskset: HashSet<TaskRef>) -> Self {
let mut exc = Executor {
tasks,
tasks_blocked: HashSet::new(),
tasks_runnable: Vec::new(),
tasks_done: HashMap::new(),
rdeps: HashMap::new(),
};
for task in taskset {
let task_def = tasks.get(&task).expect("Invalid TaskRef");
if task_def.depends.is_empty() {
exc.tasks_runnable.push(task);
} else {
for dep in &task_def.depends {
let rdep = exc.rdeps.entry(dep.clone()).or_default();
rdep.push(task.clone());
}
exc.tasks_blocked.insert(task);
}
}
exc
}
fn deps_satisfied(&self, task: &TaskRef) -> bool {
let task_def = self.tasks.get(task).expect("Invalid TaskRef");
task_def
.depends
.iter()
.all(|dep| self.tasks_done.contains_key(dep))
}
fn task_deps(&self, task: &TaskRef) -> HashMap<TaskRef, OutputHash> {
let task_def = self.tasks.get(&task).expect("Invalid TaskRef");
task_def
.depends
.iter()
.map(|dep| {
(
dep.clone(),
self.tasks_done
.get(dep)
.expect("Invalid dependency")
.output_hash,
)
})
.collect()
}
fn run_one(&mut self, runner: &impl runner::Runner) -> runner::Result<()> {
let task = self.tasks_runnable.pop().expect("No runnable tasks left");
let task_deps = self.task_deps(&task);
let hash = runner.run(self.tasks, &task)?;
let output = TaskOutput {
task_ref: task.clone(),
depends: task_deps,
output_hash: hash,
};
let rdeps = self.rdeps.get(&task);
self.tasks_done.insert(task, output);
for rdep in rdeps.unwrap_or(&Vec::new()) {
if !self.tasks_blocked.contains(rdep) {
continue;
}
if self.deps_satisfied(rdep) {
self.tasks_blocked.remove(rdep);
self.tasks_runnable.push(rdep.clone());
}
}
Ok(())
}
pub fn run(&mut self, runner: &impl runner::Runner) -> runner::Result<()> {
while !self.tasks_runnable.is_empty() {
self.run_one(runner)?;
}
assert!(self.tasks_blocked.is_empty(), "No runnable tasks left");
println!("{}", serde_json::to_string_pretty(&self.tasks_done)?);
Ok(())
}
}
|