summaryrefslogtreecommitdiffstats
path: root/crates/executor/src/executor.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/executor/src/executor.rs')
-rw-r--r--crates/executor/src/executor.rs347
1 files changed, 347 insertions, 0 deletions
diff --git a/crates/executor/src/executor.rs b/crates/executor/src/executor.rs
new file mode 100644
index 0000000..1d6ee44
--- /dev/null
+++ b/crates/executor/src/executor.rs
@@ -0,0 +1,347 @@
+use std::collections::{HashMap, HashSet};
+
+use indoc::indoc;
+use ipc_channel::ipc;
+
+use common::{error::*, string_hash::*, types::*};
+use runner::{paths, Runner};
+
+use crate::{
+ context::{Context, TaskRef},
+ resolve,
+ task::*,
+ template::TemplateEngine,
+};
+
+pub struct Executor<'ctx> {
+ ctx: &'ctx Context,
+ receiver_set: ipc::IpcReceiverSet,
+ tasks_blocked: HashSet<TaskRef<'ctx>>,
+ tasks_runnable: Vec<TaskRef<'ctx>>,
+ tasks_running: HashMap<u64, TaskRef<'ctx>>,
+ tasks_done: HashMap<TaskRef<'ctx>, TaskOutput>,
+ rdeps: HashMap<TaskRef<'ctx>, Vec<TaskRef<'ctx>>>,
+ tpl: TemplateEngine,
+}
+
+impl<'ctx> Executor<'ctx> {
+ pub fn new(ctx: &'ctx Context, taskset: HashSet<TaskRef<'ctx>>) -> Result<Self> {
+ let mut exc = Executor {
+ ctx,
+ receiver_set: ipc::IpcReceiverSet::new().expect("IpcReceiverSet::new()"),
+ tasks_blocked: HashSet::new(),
+ tasks_runnable: Vec::new(),
+ tasks_running: HashMap::new(),
+ tasks_done: HashMap::new(),
+ rdeps: HashMap::new(),
+ tpl: TemplateEngine::new(),
+ };
+
+ for task in taskset {
+ let mut has_depends = false;
+ for dep in resolve::get_dependent_tasks(ctx, &task)
+ .map_err(|_| Error::new(format!("invalid dependency for {}", task)))?
+ {
+ let rdep = exc.rdeps.entry(dep.clone()).or_default();
+ rdep.push(task.clone());
+ has_depends = true;
+ }
+
+ if has_depends {
+ exc.tasks_blocked.insert(task);
+ } else {
+ exc.tasks_runnable.push(task);
+ }
+ }
+
+ Ok(exc)
+ }
+
+ // Treats both "depends" and "inherit" as dependencies
+ fn deps_satisfied(&self, task_ref: &TaskRef) -> bool {
+ resolve::get_dependent_tasks(self.ctx, task_ref)
+ .map_err(|_| Error::new(format!("invalid dependency for {}", task_ref)))
+ .unwrap()
+ .into_iter()
+ .all(|dep| self.tasks_done.contains_key(&dep))
+ }
+
+ fn fetch_deps(&self, task: &TaskRef<'ctx>) -> Result<Vec<Dependency>> {
+ let task_def = &self.ctx[task.id];
+ task_def
+ .fetch
+ .iter()
+ .map(|Fetch { name, sha256 }| {
+ Ok(Dependency::Fetch {
+ name: self.tpl.eval_raw(name, &task.args).with_context(|| {
+ format!("Failed to evaluate fetch filename for task {}", task)
+ })?,
+ sha256: *sha256,
+ })
+ })
+ .collect()
+ }
+
+ fn task_deps(&self, task: &TaskRef<'ctx>) -> Result<HashSet<Dependency>> {
+ Ok(self
+ .fetch_deps(task)?
+ .into_iter()
+ .chain(
+ resolve::runtime_depends(
+ self.ctx,
+ self.ctx
+ .get_build_depends(task)
+ .with_context(|| format!("invalid build depends for {}", task))?,
+ )
+ .expect("invalid runtime depends of build_depends")
+ .into_iter()
+ .filter_map(|dep| self.tasks_done[&dep.task].outputs.get(dep.output))
+ .map(|&output| Dependency::Task {
+ output,
+ path: "".to_string(),
+ }),
+ )
+ .chain(
+ resolve::runtime_depends(
+ self.ctx,
+ self.ctx
+ .get_host_depends(task)
+ .with_context(|| format!("invalid depends for {}", task))?,
+ )
+ .expect("invalid runtime depends of host_depends")
+ .into_iter()
+ .filter_map(|dep| self.tasks_done[&dep.task].outputs.get(dep.output))
+ .map(|&output| Dependency::Task {
+ output,
+ path: paths::abs(paths::TASK_SYSROOT),
+ }),
+ )
+ .collect())
+ }
+
+ fn task_inherit_chain(&self, task_ref: &TaskRef<'ctx>) -> Vec<LayerHash> {
+ let inherit = match self
+ .ctx
+ .get_inherit_depend(task_ref)
+ .expect("invalid inherit depends")
+ {
+ Some(inherit) => inherit,
+ None => return vec![],
+ };
+
+ let mut chain = self.task_inherit_chain(&inherit);
+ if let Some(layer) = self.tasks_done[&inherit].layer {
+ chain.push(layer);
+ }
+ chain
+ }
+
+ fn task_setup(&self, task_ref: &TaskRef<'ctx>) -> Vec<&'static str> {
+ let mut ret = vec![indoc! {"
+ export SOURCE_DATE_EPOCH=1
+
+ export AR_FOR_BUILD=ar
+ export AS_FOR_BUILD=as
+ export DLLTOOL_FOR_BUILD=dlltool
+ export CC_FOR_BUILD=gcc
+ export CXX_FOR_BUILD=g++
+ export GCC_FOR_BUILD=gcc
+ export GFORTRAN_FOR_BUILD=gfortran
+ export GOC_FOR_BUILD=goc
+ export LD_FOR_BUILD=ld
+ export LIPO_FOR_BUILD=lipo
+ export NM_FOR_BUILD=nm
+ export OBJCOPY_FOR_BUILD=objcopy
+ export OBJDUMP_FOR_BUILD=objdump
+ export RANLIB_FOR_BUILD=ranlib
+ export STRIP_FOR_BUILD=strip
+ export WINDRES_FOR_BUILD=windres
+ export WINDMC_FOR_BUILD=windmc
+ "}];
+
+ if task_ref.args.contains_key("build_to_host") {
+ ret.push(indoc! {"
+ export AR={{build_to_host.cross_compile}}ar
+ export AS={{build_to_host.cross_compile}}as
+ export DLLTOOL={{build_to_host.cross_compile}}dlltool
+ export CC={{build_to_host.cross_compile}}gcc
+ export CXX={{build_to_host.cross_compile}}g++
+ export GCC={{build_to_host.cross_compile}}gcc
+ export GFORTRAN={{build_to_host.cross_compile}}gfortran
+ export GOC={{build_to_host.cross_compile}}goc
+ export LD={{build_to_host.cross_compile}}ld
+ export LIPO={{build_to_host.cross_compile}}lipo
+ export NM={{build_to_host.cross_compile}}nm
+ export OBJCOPY={{build_to_host.cross_compile}}objcopy
+ export OBJDUMP={{build_to_host.cross_compile}}objdump
+ export RANLIB={{build_to_host.cross_compile}}ranlib
+ export STRIP={{build_to_host.cross_compile}}strip
+ export WINDRES={{build_to_host.cross_compile}}windres
+ export WINDMC={{build_to_host.cross_compile}}windmc
+ "});
+ }
+
+ if task_ref.args.contains_key("build_to_target") {
+ ret.push(indoc! {"
+ export AR_FOR_TARGET={{build_to_target.cross_compile}}ar
+ export AS_FOR_TARGET={{build_to_target.cross_compile}}as
+ export DLLTOOL_FOR_TARGET={{build_to_target.cross_compile}}dlltool
+ export CC_FOR_TARGET={{build_to_target.cross_compile}}gcc
+ export CXX_FOR_TARGET={{build_to_target.cross_compile}}g++
+ export GCC_FOR_TARGET={{build_to_target.cross_compile}}gcc
+ export GFORTRAN_FOR_TARGET={{build_to_target.cross_compile}}gfortran
+ export GOC_FOR_TARGET={{build_to_target.cross_compile}}goc
+ export LD_FOR_TARGET={{build_to_target.cross_compile}}ld
+ export LIPO_FOR_TARGET={{build_to_target.cross_compile}}lipo
+ export NM_FOR_TARGET={{build_to_target.cross_compile}}nm
+ export OBJCOPY_FOR_TARGET={{build_to_target.cross_compile}}objcopy
+ export OBJDUMP_FOR_TARGET={{build_to_target.cross_compile}}objdump
+ export RANLIB_FOR_TARGET={{build_to_target.cross_compile}}ranlib
+ export STRIP_FOR_TARGET={{build_to_target.cross_compile}}strip
+ export WINDRES_FOR_TARGET={{build_to_target.cross_compile}}windres
+ export WINDMC_FOR_TARGET={{build_to_target.cross_compile}}windmc
+ "});
+ }
+ ret
+ }
+
+ fn spawn_one(
+ &self,
+ task_ref: &TaskRef<'ctx>,
+ runner: &Runner,
+ ) -> Result<ipc::IpcReceiver<Result<TaskOutput>>> {
+ let task_def = &self.ctx[task_ref.id];
+ let task_deps = self.task_deps(task_ref)?;
+ let task_output = task_def
+ .output
+ .iter()
+ .map(|(name, Output { path, .. })| {
+ (
+ name.clone(),
+ path.as_ref().map(String::as_str).unwrap_or(".").to_string(),
+ )
+ })
+ .collect();
+
+ let inherit_chain = self.task_inherit_chain(task_ref);
+
+ let mut run = self.task_setup(task_ref);
+ run.push(&task_def.action.run);
+
+ let command = self
+ .tpl
+ .eval(&run.concat(), &task_ref.args)
+ .with_context(|| {
+ format!("Failed to evaluate command template for task {}", task_ref)
+ })?;
+
+ let task = Task {
+ label: format!("{:#}", task_ref),
+ command,
+ inherit: inherit_chain,
+ depends: task_deps,
+ outputs: task_output,
+ };
+
+ Ok(runner.spawn(&task))
+ }
+
+ fn run_tasks(&mut self, runner: &Runner) -> Result<()> {
+ while let Some(task_ref) = self.tasks_runnable.pop() {
+ let channel = self.spawn_one(&task_ref, runner)?;
+ let id = self
+ .receiver_set
+ .add(channel)
+ .expect("Failed to add channel to receiver set");
+ self.tasks_running.insert(id, task_ref);
+ }
+
+ Ok(())
+ }
+
+ fn update_runnable(&mut self, task_ref: &TaskRef) {
+ let rdeps = self.rdeps.get(task_ref);
+
+ for rdep in rdeps.unwrap_or(&Vec::new()) {
+ if !self.tasks_blocked.contains(rdep) {
+ continue;
+ }
+ if self.deps_satisfied(rdep) {
+ self.tasks_blocked.remove(rdep);
+ self.tasks_runnable.push(rdep.clone());
+ }
+ }
+ }
+
+ fn wait_for_task(&mut self) -> Result<()> {
+ let mut progress = false;
+
+ while !progress {
+ let events = self
+ .receiver_set
+ .select()
+ .expect("Failed to get messages from receiver set");
+ for event in events {
+ match event {
+ ipc::IpcSelectionResult::MessageReceived(id, msg) => {
+ let task_ref = self
+ .tasks_running
+ .remove(&id)
+ .expect("Received message for unknown task");
+ let task_output = msg
+ .to::<Result<TaskOutput>>()
+ .expect("Failed to decode message from runner")?;
+
+ self.tasks_done.insert(task_ref.clone(), task_output);
+ self.update_runnable(&task_ref);
+
+ progress = true;
+ }
+ ipc::IpcSelectionResult::ChannelClosed(id) => {
+ if let Some(task) = self.tasks_running.remove(&id) {
+ return Err(Error::new(format!(
+ "Unexpectedly got no result for task {:#}",
+ task
+ )));
+ }
+ }
+ }
+ }
+ }
+
+ Ok(())
+ }
+
+ pub fn run(&mut self, runner: &Runner) -> Result<()> {
+ while !(self.tasks_runnable.is_empty() && self.tasks_running.is_empty()) {
+ self.run_tasks(runner)?;
+ self.wait_for_task()?;
+ }
+
+ assert!(self.tasks_blocked.is_empty(), "No runnable tasks left");
+
+ println!();
+ println!("Summary:");
+
+ let mut tasks: Box<[_]> = self.tasks_done.iter().collect();
+ tasks.sort_by_cached_key(|(task, _)| format!("{:#}", task));
+ for (task_ref, task) in tasks.iter() {
+ println!();
+ println!("{:#}", task_ref);
+ println!(" input: {}", task.input_hash);
+ if let Some(hash) = task.layer {
+ println!(" layer: {}", hash);
+ }
+ if !task.outputs.is_empty() {
+ println!(" outputs:");
+
+ let mut outputs: Box<[_]> = task.outputs.iter().collect();
+ outputs.sort_by_key(|(output, _)| *output);
+ for (output, hash) in outputs.iter() {
+ println!(" {}: {}", output, hash);
+ }
+ }
+ }
+ Ok(())
+ }
+}