From b35a75f4cb5417bb464639079d266fd708549b32 Mon Sep 17 00:00:00 2001 From: Matthias Schiffer Date: Thu, 28 Oct 2021 00:58:48 +0200 Subject: executor: move some state into separate struct --- crates/executor/src/executor.rs | 198 +++++++++++++++++++++++----------------- 1 file changed, 115 insertions(+), 83 deletions(-) diff --git a/crates/executor/src/executor.rs b/crates/executor/src/executor.rs index e29b5b3..748ff89 100644 --- a/crates/executor/src/executor.rs +++ b/crates/executor/src/executor.rs @@ -16,44 +16,18 @@ use crate::{ template, }; -pub struct Executor<'ctx> { +#[derive(Debug)] +pub struct CompletionState<'ctx> { ctx: &'ctx Context, - tasks_blocked: HashSet>, - tasks_runnable: Vec>, - tasks_running: HashMap)>, tasks_done: HashMap, TaskOutput>, - rdeps: HashMap, Vec>>, } -impl<'ctx> Executor<'ctx> { - pub fn new(ctx: &'ctx Context, taskset: HashSet>) -> Result { - let mut exc = Executor { +impl<'ctx> CompletionState<'ctx> { + pub fn new(ctx: &'ctx Context) -> Self { + CompletionState { ctx, - tasks_blocked: HashSet::new(), - tasks_runnable: Vec::new(), - tasks_running: HashMap::new(), - tasks_done: HashMap::new(), - rdeps: HashMap::new(), - }; - - for task in taskset { - let mut has_depends = false; - for dep in resolve::get_dependent_tasks(ctx, &task) - .map_err(|_| Error::new(format!("invalid dependency for {}", task)))? - { - let rdep = exc.rdeps.entry(dep.clone()).or_default(); - rdep.push(task.clone()); - has_depends = true; - } - - if has_depends { - exc.tasks_blocked.insert(task); - } else { - exc.tasks_runnable.push(task); - } + tasks_done: Default::default(), } - - Ok(exc) } // Treats both "depends" and "inherit" as dependencies @@ -137,7 +111,72 @@ impl<'ctx> Executor<'ctx> { chain } - fn task_setup(&self, task_ref: &TaskRef<'ctx>) -> Vec<&'static str> { + fn print_summary(&self) { + println!(); + println!("Summary:"); + + let mut tasks: Box<[_]> = self.tasks_done.iter().collect(); + tasks.sort_by_cached_key(|(task, _)| format!("{:#}", task)); + for (task_ref, task) in tasks.iter() { + println!(); + println!("{:#}", task_ref); + println!(" input: {}", task.input_hash); + if let Some(hash) = task.layer { + println!(" layer: {}", hash); + } + if !task.outputs.is_empty() { + println!(" outputs:"); + + let mut outputs: Box<[_]> = task.outputs.iter().collect(); + outputs.sort_by_key(|(output, _)| *output); + for (output, hash) in outputs.iter() { + println!(" {}: {}", output, hash); + } + } + } + } +} + +#[derive(Debug)] +pub struct Executor<'ctx> { + rdeps: HashMap, Vec>>, + tasks_blocked: HashSet>, + tasks_runnable: Vec>, + tasks_running: HashMap)>, + state: CompletionState<'ctx>, +} + +impl<'ctx> Executor<'ctx> { + pub fn new(ctx: &'ctx Context, taskset: HashSet>) -> Result { + let mut exc = Executor { + rdeps: Default::default(), + tasks_blocked: Default::default(), + tasks_runnable: Default::default(), + tasks_running: Default::default(), + state: CompletionState::new(ctx), + }; + + for task in taskset { + let mut has_depends = false; + for dep in resolve::get_dependent_tasks(ctx, &task) + .map_err(|_| Error::new(format!("invalid dependency for {}", task)))? + { + let rdep = exc.rdeps.entry(dep.clone()).or_default(); + rdep.push(task.clone()); + has_depends = true; + } + + if has_depends { + exc.tasks_blocked.insert(task); + } else { + exc.tasks_runnable.push(task); + } + } + + Ok(exc) + } + + fn task_setup(task_ref: &TaskRef<'ctx>) -> Vec<&'static str> { let mut ret = vec![indoc! {" export SOURCE_DATE_EPOCH=1 @@ -206,9 +245,25 @@ impl<'ctx> Executor<'ctx> { ret } - fn spawn_one(&self, task_ref: &TaskRef<'ctx>, runner: &Runner) -> Result { - let task_def = &self.ctx[task_ref.id]; - let task_deps = self.task_deps(task_ref)?; + fn update_runnable(&mut self, task_ref: TaskRef<'ctx>, task_output: TaskOutput) { + let rdeps = self.rdeps.get(&task_ref); + + self.state.tasks_done.insert(task_ref, task_output); + + for rdep in rdeps.unwrap_or(&Vec::new()) { + if !self.tasks_blocked.contains(rdep) { + continue; + } + if self.state.deps_satisfied(rdep) { + self.tasks_blocked.remove(rdep); + self.tasks_runnable.push(rdep.clone()); + } + } + } + + fn spawn_task(&self, task_ref: &TaskRef<'ctx>, runner: &Runner) -> Result { + let task_def = &self.state.ctx[task_ref.id]; + let task_deps = self.state.task_deps(task_ref)?; let task_output = task_def .output .iter() @@ -220,9 +275,9 @@ impl<'ctx> Executor<'ctx> { }) .collect(); - let inherit_chain = self.task_inherit_chain(task_ref); + let inherit_chain = self.state.task_inherit_chain(task_ref); - let mut run = self.task_setup(task_ref); + let mut run = Self::task_setup(task_ref); run.push(&task_def.action.run); let command = template::ENGINE @@ -242,30 +297,20 @@ impl<'ctx> Executor<'ctx> { Ok(runner.spawn(&task)) } - fn run_tasks(&mut self, runner: &Runner) -> Result<()> { - while let Some(task_ref) = self.tasks_runnable.pop() { - let socket = self.spawn_one(&task_ref, runner)?; - assert!(self - .tasks_running - .insert(socket.as_raw_fd(), (socket, task_ref)) - .is_none()); - } - + fn run_task(&mut self, task_ref: TaskRef<'ctx>, runner: &Runner) -> Result<()> { + let socket = self.spawn_task(&task_ref, runner)?; + assert!(self + .tasks_running + .insert(socket.as_raw_fd(), (socket, task_ref)) + .is_none()); Ok(()) } - fn update_runnable(&mut self, task_ref: &TaskRef) { - let rdeps = self.rdeps.get(task_ref); - - for rdep in rdeps.unwrap_or(&Vec::new()) { - if !self.tasks_blocked.contains(rdep) { - continue; - } - if self.deps_satisfied(rdep) { - self.tasks_blocked.remove(rdep); - self.tasks_runnable.push(rdep.clone()); - } + fn run_tasks(&mut self, runner: &Runner) -> Result<()> { + while let Some(task_ref) = self.tasks_runnable.pop() { + self.run_task(task_ref, runner)?; } + Ok(()) } fn wait_for_task(&mut self) -> Result<()> { @@ -293,43 +338,30 @@ impl<'ctx> Executor<'ctx> { let (socket, task_ref) = self.tasks_running.remove(&fd).unwrap(); let task_output = Runner::result(&socket)?; - self.tasks_done.insert(task_ref.clone(), task_output); - self.update_runnable(&task_ref); + self.update_runnable(task_ref, task_output); } Ok(()) } + fn is_empty(&self) -> bool { + self.tasks_runnable.is_empty() && self.tasks_running.is_empty() + } + + fn is_done(&self) -> bool { + self.is_empty() && self.tasks_blocked.is_empty() + } + pub fn run(&mut self, runner: &Runner) -> Result<()> { - while !(self.tasks_runnable.is_empty() && self.tasks_running.is_empty()) { + while !self.is_empty() { self.run_tasks(runner)?; self.wait_for_task()?; } - assert!(self.tasks_blocked.is_empty(), "No runnable tasks left"); + assert!(self.is_done(), "No runnable tasks left"); - println!(); - println!("Summary:"); - - let mut tasks: Box<[_]> = self.tasks_done.iter().collect(); - tasks.sort_by_cached_key(|(task, _)| format!("{:#}", task)); - for (task_ref, task) in tasks.iter() { - println!(); - println!("{:#}", task_ref); - println!(" input: {}", task.input_hash); - if let Some(hash) = task.layer { - println!(" layer: {}", hash); - } - if !task.outputs.is_empty() { - println!(" outputs:"); + self.state.print_summary(); - let mut outputs: Box<[_]> = task.outputs.iter().collect(); - outputs.sort_by_key(|(output, _)| *output); - for (output, hash) in outputs.iter() { - println!(" {}: {}", output, hash); - } - } - } Ok(()) } } -- cgit v1.2.3