summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Schiffer <mschiffer@universe-factory.net>2021-10-28 00:58:48 +0200
committerMatthias Schiffer <mschiffer@universe-factory.net>2021-10-28 18:26:04 +0200
commitb35a75f4cb5417bb464639079d266fd708549b32 (patch)
tree90faccdf6b0cc0dd241ae6871dd225f6490410ab
parent683f0a65fd351bc761e9114bd4bddf11ff9094ec (diff)
downloadrebel-b35a75f4cb5417bb464639079d266fd708549b32.tar
rebel-b35a75f4cb5417bb464639079d266fd708549b32.zip
executor: move some state into separate struct
-rw-r--r--crates/executor/src/executor.rs198
1 files changed, 115 insertions, 83 deletions
diff --git a/crates/executor/src/executor.rs b/crates/executor/src/executor.rs
index e29b5b3..748ff89 100644
--- a/crates/executor/src/executor.rs
+++ b/crates/executor/src/executor.rs
@@ -16,44 +16,18 @@ use crate::{
template,
};
-pub struct Executor<'ctx> {
+#[derive(Debug)]
+pub struct CompletionState<'ctx> {
ctx: &'ctx Context,
- tasks_blocked: HashSet<TaskRef<'ctx>>,
- tasks_runnable: Vec<TaskRef<'ctx>>,
- tasks_running: HashMap<RawFd, (UnixStream, TaskRef<'ctx>)>,
tasks_done: HashMap<TaskRef<'ctx>, TaskOutput>,
- rdeps: HashMap<TaskRef<'ctx>, Vec<TaskRef<'ctx>>>,
}
-impl<'ctx> Executor<'ctx> {
- pub fn new(ctx: &'ctx Context, taskset: HashSet<TaskRef<'ctx>>) -> Result<Self> {
- let mut exc = Executor {
+impl<'ctx> CompletionState<'ctx> {
+ pub fn new(ctx: &'ctx Context) -> Self {
+ CompletionState {
ctx,
- tasks_blocked: HashSet::new(),
- tasks_runnable: Vec::new(),
- tasks_running: HashMap::new(),
- tasks_done: HashMap::new(),
- rdeps: HashMap::new(),
- };
-
- for task in taskset {
- let mut has_depends = false;
- for dep in resolve::get_dependent_tasks(ctx, &task)
- .map_err(|_| Error::new(format!("invalid dependency for {}", task)))?
- {
- let rdep = exc.rdeps.entry(dep.clone()).or_default();
- rdep.push(task.clone());
- has_depends = true;
- }
-
- if has_depends {
- exc.tasks_blocked.insert(task);
- } else {
- exc.tasks_runnable.push(task);
- }
+ tasks_done: Default::default(),
}
-
- Ok(exc)
}
// Treats both "depends" and "inherit" as dependencies
@@ -137,7 +111,72 @@ impl<'ctx> Executor<'ctx> {
chain
}
- fn task_setup(&self, task_ref: &TaskRef<'ctx>) -> Vec<&'static str> {
+ fn print_summary(&self) {
+ println!();
+ println!("Summary:");
+
+ let mut tasks: Box<[_]> = self.tasks_done.iter().collect();
+ tasks.sort_by_cached_key(|(task, _)| format!("{:#}", task));
+ for (task_ref, task) in tasks.iter() {
+ println!();
+ println!("{:#}", task_ref);
+ println!(" input: {}", task.input_hash);
+ if let Some(hash) = task.layer {
+ println!(" layer: {}", hash);
+ }
+ if !task.outputs.is_empty() {
+ println!(" outputs:");
+
+ let mut outputs: Box<[_]> = task.outputs.iter().collect();
+ outputs.sort_by_key(|(output, _)| *output);
+ for (output, hash) in outputs.iter() {
+ println!(" {}: {}", output, hash);
+ }
+ }
+ }
+ }
+}
+
+#[derive(Debug)]
+pub struct Executor<'ctx> {
+ rdeps: HashMap<TaskRef<'ctx>, Vec<TaskRef<'ctx>>>,
+ tasks_blocked: HashSet<TaskRef<'ctx>>,
+ tasks_runnable: Vec<TaskRef<'ctx>>,
+ tasks_running: HashMap<RawFd, (UnixStream, TaskRef<'ctx>)>,
+ state: CompletionState<'ctx>,
+}
+
+impl<'ctx> Executor<'ctx> {
+ pub fn new(ctx: &'ctx Context, taskset: HashSet<TaskRef<'ctx>>) -> Result<Self> {
+ let mut exc = Executor {
+ rdeps: Default::default(),
+ tasks_blocked: Default::default(),
+ tasks_runnable: Default::default(),
+ tasks_running: Default::default(),
+ state: CompletionState::new(ctx),
+ };
+
+ for task in taskset {
+ let mut has_depends = false;
+ for dep in resolve::get_dependent_tasks(ctx, &task)
+ .map_err(|_| Error::new(format!("invalid dependency for {}", task)))?
+ {
+ let rdep = exc.rdeps.entry(dep.clone()).or_default();
+ rdep.push(task.clone());
+ has_depends = true;
+ }
+
+ if has_depends {
+ exc.tasks_blocked.insert(task);
+ } else {
+ exc.tasks_runnable.push(task);
+ }
+ }
+
+ Ok(exc)
+ }
+
+ fn task_setup(task_ref: &TaskRef<'ctx>) -> Vec<&'static str> {
let mut ret = vec![indoc! {"
export SOURCE_DATE_EPOCH=1
@@ -206,9 +245,25 @@ impl<'ctx> Executor<'ctx> {
ret
}
- fn spawn_one(&self, task_ref: &TaskRef<'ctx>, runner: &Runner) -> Result<UnixStream> {
- let task_def = &self.ctx[task_ref.id];
- let task_deps = self.task_deps(task_ref)?;
+ fn update_runnable(&mut self, task_ref: TaskRef<'ctx>, task_output: TaskOutput) {
+ let rdeps = self.rdeps.get(&task_ref);
+
+ self.state.tasks_done.insert(task_ref, task_output);
+
+ for rdep in rdeps.unwrap_or(&Vec::new()) {
+ if !self.tasks_blocked.contains(rdep) {
+ continue;
+ }
+ if self.state.deps_satisfied(rdep) {
+ self.tasks_blocked.remove(rdep);
+ self.tasks_runnable.push(rdep.clone());
+ }
+ }
+ }
+
+ fn spawn_task(&self, task_ref: &TaskRef<'ctx>, runner: &Runner) -> Result<UnixStream> {
+ let task_def = &self.state.ctx[task_ref.id];
+ let task_deps = self.state.task_deps(task_ref)?;
let task_output = task_def
.output
.iter()
@@ -220,9 +275,9 @@ impl<'ctx> Executor<'ctx> {
})
.collect();
- let inherit_chain = self.task_inherit_chain(task_ref);
+ let inherit_chain = self.state.task_inherit_chain(task_ref);
- let mut run = self.task_setup(task_ref);
+ let mut run = Self::task_setup(task_ref);
run.push(&task_def.action.run);
let command = template::ENGINE
@@ -242,30 +297,20 @@ impl<'ctx> Executor<'ctx> {
Ok(runner.spawn(&task))
}
- fn run_tasks(&mut self, runner: &Runner) -> Result<()> {
- while let Some(task_ref) = self.tasks_runnable.pop() {
- let socket = self.spawn_one(&task_ref, runner)?;
- assert!(self
- .tasks_running
- .insert(socket.as_raw_fd(), (socket, task_ref))
- .is_none());
- }
-
+ fn run_task(&mut self, task_ref: TaskRef<'ctx>, runner: &Runner) -> Result<()> {
+ let socket = self.spawn_task(&task_ref, runner)?;
+ assert!(self
+ .tasks_running
+ .insert(socket.as_raw_fd(), (socket, task_ref))
+ .is_none());
Ok(())
}
- fn update_runnable(&mut self, task_ref: &TaskRef) {
- let rdeps = self.rdeps.get(task_ref);
-
- for rdep in rdeps.unwrap_or(&Vec::new()) {
- if !self.tasks_blocked.contains(rdep) {
- continue;
- }
- if self.deps_satisfied(rdep) {
- self.tasks_blocked.remove(rdep);
- self.tasks_runnable.push(rdep.clone());
- }
+ fn run_tasks(&mut self, runner: &Runner) -> Result<()> {
+ while let Some(task_ref) = self.tasks_runnable.pop() {
+ self.run_task(task_ref, runner)?;
}
+ Ok(())
}
fn wait_for_task(&mut self) -> Result<()> {
@@ -293,43 +338,30 @@ impl<'ctx> Executor<'ctx> {
let (socket, task_ref) = self.tasks_running.remove(&fd).unwrap();
let task_output = Runner::result(&socket)?;
- self.tasks_done.insert(task_ref.clone(), task_output);
- self.update_runnable(&task_ref);
+ self.update_runnable(task_ref, task_output);
}
Ok(())
}
+ fn is_empty(&self) -> bool {
+ self.tasks_runnable.is_empty() && self.tasks_running.is_empty()
+ }
+
+ fn is_done(&self) -> bool {
+ self.is_empty() && self.tasks_blocked.is_empty()
+ }
+
pub fn run(&mut self, runner: &Runner) -> Result<()> {
- while !(self.tasks_runnable.is_empty() && self.tasks_running.is_empty()) {
+ while !self.is_empty() {
self.run_tasks(runner)?;
self.wait_for_task()?;
}
- assert!(self.tasks_blocked.is_empty(), "No runnable tasks left");
+ assert!(self.is_done(), "No runnable tasks left");
- println!();
- println!("Summary:");
-
- let mut tasks: Box<[_]> = self.tasks_done.iter().collect();
- tasks.sort_by_cached_key(|(task, _)| format!("{:#}", task));
- for (task_ref, task) in tasks.iter() {
- println!();
- println!("{:#}", task_ref);
- println!(" input: {}", task.input_hash);
- if let Some(hash) = task.layer {
- println!(" layer: {}", hash);
- }
- if !task.outputs.is_empty() {
- println!(" outputs:");
+ self.state.print_summary();
- let mut outputs: Box<[_]> = task.outputs.iter().collect();
- outputs.sort_by_key(|(output, _)| *output);
- for (output, hash) in outputs.iter() {
- println!(" {}: {}", output, hash);
- }
- }
- }
Ok(())
}
}