summaryrefslogtreecommitdiffstats
path: root/src/executor.rs
blob: 360869636714f1a10c7cfa00e8639ea23ca39c0c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
use std::collections::{HashMap, HashSet};

use crate::{runner, types::*};

#[derive(Debug)]
pub struct Executor<'a> {
	tasks: &'a TaskMap,
	tasks_blocked: HashSet<TaskID>,
	tasks_runnable: Vec<TaskID>,
	tasks_done: HashMap<TaskID, TaskOutput>,
	rdeps: HashMap<TaskID, Vec<TaskID>>,
}

impl<'a> Executor<'a> {
	pub fn new(tasks: &'a TaskMap, taskset: HashSet<TaskID>) -> Self {
		let mut exc = Executor {
			tasks,
			tasks_blocked: HashSet::new(),
			tasks_runnable: Vec::new(),
			tasks_done: HashMap::new(),
			rdeps: HashMap::new(),
		};

		for task in taskset {
			let task_def = tasks.get(&task).expect("Invalid TaskID");
			if task_def.depends.is_empty() {
				exc.tasks_runnable.push(task);
			} else {
				for dep in &task_def.depends {
					let rdep = exc.rdeps.entry(dep.clone()).or_default();
					rdep.push(task.clone());
				}

				exc.tasks_blocked.insert(task);
			}
		}

		exc
	}

	fn deps_satisfied(&self, task: &TaskID) -> bool {
		let task_def = self.tasks.get(task).expect("Invalid TaskID");

		task_def
			.depends
			.iter()
			.all(|dep| self.tasks_done.contains_key(dep))
	}

	fn task_deps(&self, task: &TaskDef) -> HashMap<TaskID, OutputHash> {
		task.depends
			.iter()
			.map(|dep| {
				(
					dep.clone(),
					self.tasks_done
						.get(dep)
						.expect("Invalid dependency")
						.output_hash,
				)
			})
			.collect()
	}

	fn run_one(&mut self, runner: &impl runner::Runner) -> runner::Result<()> {
		let task_id = self.tasks_runnable.pop().expect("No runnable tasks left");
		let task_def = self.tasks.get(&task_id).expect("Invalid TaskID");
		let task_deps = self.task_deps(&task_def);

		let task = Task {
			id: task_id,
			def: task_def.clone(),
			depends: task_deps,
		};

		let input_hash = task.input_hash();

		let output_hash = runner.run(&task)?;
		let output = TaskOutput {
			id: task.id.clone(),
			depends: task.depends,
			input_hash,
			output_hash,
		};

		let rdeps = self.rdeps.get(&task.id);
		self.tasks_done.insert(task.id, output);

		for rdep in rdeps.unwrap_or(&Vec::new()) {
			if !self.tasks_blocked.contains(rdep) {
				continue;
			}
			if self.deps_satisfied(rdep) {
				self.tasks_blocked.remove(rdep);
				self.tasks_runnable.push(rdep.clone());
			}
		}

		Ok(())
	}

	pub fn run(&mut self, runner: &impl runner::Runner) -> runner::Result<()> {
		while !self.tasks_runnable.is_empty() {
			self.run_one(runner)?;
		}

		assert!(self.tasks_blocked.is_empty(), "No runnable tasks left");
		println!("{}", serde_json::to_string_pretty(&self.tasks_done)?);
		Ok(())
	}
}