diff options
Diffstat (limited to 'crates/executor/src/executor.rs')
-rw-r--r-- | crates/executor/src/executor.rs | 347 |
1 files changed, 347 insertions, 0 deletions
diff --git a/crates/executor/src/executor.rs b/crates/executor/src/executor.rs new file mode 100644 index 0000000..1d6ee44 --- /dev/null +++ b/crates/executor/src/executor.rs @@ -0,0 +1,347 @@ +use std::collections::{HashMap, HashSet}; + +use indoc::indoc; +use ipc_channel::ipc; + +use common::{error::*, string_hash::*, types::*}; +use runner::{paths, Runner}; + +use crate::{ + context::{Context, TaskRef}, + resolve, + task::*, + template::TemplateEngine, +}; + +pub struct Executor<'ctx> { + ctx: &'ctx Context, + receiver_set: ipc::IpcReceiverSet, + tasks_blocked: HashSet<TaskRef<'ctx>>, + tasks_runnable: Vec<TaskRef<'ctx>>, + tasks_running: HashMap<u64, TaskRef<'ctx>>, + tasks_done: HashMap<TaskRef<'ctx>, TaskOutput>, + rdeps: HashMap<TaskRef<'ctx>, Vec<TaskRef<'ctx>>>, + tpl: TemplateEngine, +} + +impl<'ctx> Executor<'ctx> { + pub fn new(ctx: &'ctx Context, taskset: HashSet<TaskRef<'ctx>>) -> Result<Self> { + let mut exc = Executor { + ctx, + receiver_set: ipc::IpcReceiverSet::new().expect("IpcReceiverSet::new()"), + tasks_blocked: HashSet::new(), + tasks_runnable: Vec::new(), + tasks_running: HashMap::new(), + tasks_done: HashMap::new(), + rdeps: HashMap::new(), + tpl: TemplateEngine::new(), + }; + + for task in taskset { + let mut has_depends = false; + for dep in resolve::get_dependent_tasks(ctx, &task) + .map_err(|_| Error::new(format!("invalid dependency for {}", task)))? + { + let rdep = exc.rdeps.entry(dep.clone()).or_default(); + rdep.push(task.clone()); + has_depends = true; + } + + if has_depends { + exc.tasks_blocked.insert(task); + } else { + exc.tasks_runnable.push(task); + } + } + + Ok(exc) + } + + // Treats both "depends" and "inherit" as dependencies + fn deps_satisfied(&self, task_ref: &TaskRef) -> bool { + resolve::get_dependent_tasks(self.ctx, task_ref) + .map_err(|_| Error::new(format!("invalid dependency for {}", task_ref))) + .unwrap() + .into_iter() + .all(|dep| self.tasks_done.contains_key(&dep)) + } + + fn fetch_deps(&self, task: &TaskRef<'ctx>) -> Result<Vec<Dependency>> { + let task_def = &self.ctx[task.id]; + task_def + .fetch + .iter() + .map(|Fetch { name, sha256 }| { + Ok(Dependency::Fetch { + name: self.tpl.eval_raw(name, &task.args).with_context(|| { + format!("Failed to evaluate fetch filename for task {}", task) + })?, + sha256: *sha256, + }) + }) + .collect() + } + + fn task_deps(&self, task: &TaskRef<'ctx>) -> Result<HashSet<Dependency>> { + Ok(self + .fetch_deps(task)? + .into_iter() + .chain( + resolve::runtime_depends( + self.ctx, + self.ctx + .get_build_depends(task) + .with_context(|| format!("invalid build depends for {}", task))?, + ) + .expect("invalid runtime depends of build_depends") + .into_iter() + .filter_map(|dep| self.tasks_done[&dep.task].outputs.get(dep.output)) + .map(|&output| Dependency::Task { + output, + path: "".to_string(), + }), + ) + .chain( + resolve::runtime_depends( + self.ctx, + self.ctx + .get_host_depends(task) + .with_context(|| format!("invalid depends for {}", task))?, + ) + .expect("invalid runtime depends of host_depends") + .into_iter() + .filter_map(|dep| self.tasks_done[&dep.task].outputs.get(dep.output)) + .map(|&output| Dependency::Task { + output, + path: paths::abs(paths::TASK_SYSROOT), + }), + ) + .collect()) + } + + fn task_inherit_chain(&self, task_ref: &TaskRef<'ctx>) -> Vec<LayerHash> { + let inherit = match self + .ctx + .get_inherit_depend(task_ref) + .expect("invalid inherit depends") + { + Some(inherit) => inherit, + None => return vec![], + }; + + let mut chain = self.task_inherit_chain(&inherit); + if let Some(layer) = self.tasks_done[&inherit].layer { + chain.push(layer); + } + chain + } + + fn task_setup(&self, task_ref: &TaskRef<'ctx>) -> Vec<&'static str> { + let mut ret = vec![indoc! {" + export SOURCE_DATE_EPOCH=1 + + export AR_FOR_BUILD=ar + export AS_FOR_BUILD=as + export DLLTOOL_FOR_BUILD=dlltool + export CC_FOR_BUILD=gcc + export CXX_FOR_BUILD=g++ + export GCC_FOR_BUILD=gcc + export GFORTRAN_FOR_BUILD=gfortran + export GOC_FOR_BUILD=goc + export LD_FOR_BUILD=ld + export LIPO_FOR_BUILD=lipo + export NM_FOR_BUILD=nm + export OBJCOPY_FOR_BUILD=objcopy + export OBJDUMP_FOR_BUILD=objdump + export RANLIB_FOR_BUILD=ranlib + export STRIP_FOR_BUILD=strip + export WINDRES_FOR_BUILD=windres + export WINDMC_FOR_BUILD=windmc + "}]; + + if task_ref.args.contains_key("build_to_host") { + ret.push(indoc! {" + export AR={{build_to_host.cross_compile}}ar + export AS={{build_to_host.cross_compile}}as + export DLLTOOL={{build_to_host.cross_compile}}dlltool + export CC={{build_to_host.cross_compile}}gcc + export CXX={{build_to_host.cross_compile}}g++ + export GCC={{build_to_host.cross_compile}}gcc + export GFORTRAN={{build_to_host.cross_compile}}gfortran + export GOC={{build_to_host.cross_compile}}goc + export LD={{build_to_host.cross_compile}}ld + export LIPO={{build_to_host.cross_compile}}lipo + export NM={{build_to_host.cross_compile}}nm + export OBJCOPY={{build_to_host.cross_compile}}objcopy + export OBJDUMP={{build_to_host.cross_compile}}objdump + export RANLIB={{build_to_host.cross_compile}}ranlib + export STRIP={{build_to_host.cross_compile}}strip + export WINDRES={{build_to_host.cross_compile}}windres + export WINDMC={{build_to_host.cross_compile}}windmc + "}); + } + + if task_ref.args.contains_key("build_to_target") { + ret.push(indoc! {" + export AR_FOR_TARGET={{build_to_target.cross_compile}}ar + export AS_FOR_TARGET={{build_to_target.cross_compile}}as + export DLLTOOL_FOR_TARGET={{build_to_target.cross_compile}}dlltool + export CC_FOR_TARGET={{build_to_target.cross_compile}}gcc + export CXX_FOR_TARGET={{build_to_target.cross_compile}}g++ + export GCC_FOR_TARGET={{build_to_target.cross_compile}}gcc + export GFORTRAN_FOR_TARGET={{build_to_target.cross_compile}}gfortran + export GOC_FOR_TARGET={{build_to_target.cross_compile}}goc + export LD_FOR_TARGET={{build_to_target.cross_compile}}ld + export LIPO_FOR_TARGET={{build_to_target.cross_compile}}lipo + export NM_FOR_TARGET={{build_to_target.cross_compile}}nm + export OBJCOPY_FOR_TARGET={{build_to_target.cross_compile}}objcopy + export OBJDUMP_FOR_TARGET={{build_to_target.cross_compile}}objdump + export RANLIB_FOR_TARGET={{build_to_target.cross_compile}}ranlib + export STRIP_FOR_TARGET={{build_to_target.cross_compile}}strip + export WINDRES_FOR_TARGET={{build_to_target.cross_compile}}windres + export WINDMC_FOR_TARGET={{build_to_target.cross_compile}}windmc + "}); + } + ret + } + + fn spawn_one( + &self, + task_ref: &TaskRef<'ctx>, + runner: &Runner, + ) -> Result<ipc::IpcReceiver<Result<TaskOutput>>> { + let task_def = &self.ctx[task_ref.id]; + let task_deps = self.task_deps(task_ref)?; + let task_output = task_def + .output + .iter() + .map(|(name, Output { path, .. })| { + ( + name.clone(), + path.as_ref().map(String::as_str).unwrap_or(".").to_string(), + ) + }) + .collect(); + + let inherit_chain = self.task_inherit_chain(task_ref); + + let mut run = self.task_setup(task_ref); + run.push(&task_def.action.run); + + let command = self + .tpl + .eval(&run.concat(), &task_ref.args) + .with_context(|| { + format!("Failed to evaluate command template for task {}", task_ref) + })?; + + let task = Task { + label: format!("{:#}", task_ref), + command, + inherit: inherit_chain, + depends: task_deps, + outputs: task_output, + }; + + Ok(runner.spawn(&task)) + } + + fn run_tasks(&mut self, runner: &Runner) -> Result<()> { + while let Some(task_ref) = self.tasks_runnable.pop() { + let channel = self.spawn_one(&task_ref, runner)?; + let id = self + .receiver_set + .add(channel) + .expect("Failed to add channel to receiver set"); + self.tasks_running.insert(id, task_ref); + } + + Ok(()) + } + + fn update_runnable(&mut self, task_ref: &TaskRef) { + let rdeps = self.rdeps.get(task_ref); + + for rdep in rdeps.unwrap_or(&Vec::new()) { + if !self.tasks_blocked.contains(rdep) { + continue; + } + if self.deps_satisfied(rdep) { + self.tasks_blocked.remove(rdep); + self.tasks_runnable.push(rdep.clone()); + } + } + } + + fn wait_for_task(&mut self) -> Result<()> { + let mut progress = false; + + while !progress { + let events = self + .receiver_set + .select() + .expect("Failed to get messages from receiver set"); + for event in events { + match event { + ipc::IpcSelectionResult::MessageReceived(id, msg) => { + let task_ref = self + .tasks_running + .remove(&id) + .expect("Received message for unknown task"); + let task_output = msg + .to::<Result<TaskOutput>>() + .expect("Failed to decode message from runner")?; + + self.tasks_done.insert(task_ref.clone(), task_output); + self.update_runnable(&task_ref); + + progress = true; + } + ipc::IpcSelectionResult::ChannelClosed(id) => { + if let Some(task) = self.tasks_running.remove(&id) { + return Err(Error::new(format!( + "Unexpectedly got no result for task {:#}", + task + ))); + } + } + } + } + } + + Ok(()) + } + + pub fn run(&mut self, runner: &Runner) -> Result<()> { + while !(self.tasks_runnable.is_empty() && self.tasks_running.is_empty()) { + self.run_tasks(runner)?; + self.wait_for_task()?; + } + + assert!(self.tasks_blocked.is_empty(), "No runnable tasks left"); + + println!(); + println!("Summary:"); + + let mut tasks: Box<[_]> = self.tasks_done.iter().collect(); + tasks.sort_by_cached_key(|(task, _)| format!("{:#}", task)); + for (task_ref, task) in tasks.iter() { + println!(); + println!("{:#}", task_ref); + println!(" input: {}", task.input_hash); + if let Some(hash) = task.layer { + println!(" layer: {}", hash); + } + if !task.outputs.is_empty() { + println!(" outputs:"); + + let mut outputs: Box<[_]> = task.outputs.iter().collect(); + outputs.sort_by_key(|(output, _)| *output); + for (output, hash) in outputs.iter() { + println!(" {}: {}", output, hash); + } + } + } + Ok(()) + } +} |