From 86f4c08b81e2129b5d1012c1350e68e3c0560282 Mon Sep 17 00:00:00 2001 From: Matthias Schiffer Date: Thu, 28 Oct 2021 18:29:57 +0200 Subject: Rename executor to driver --- crates/driver/src/driver.rs | 367 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 367 insertions(+) create mode 100644 crates/driver/src/driver.rs (limited to 'crates/driver/src/driver.rs') diff --git a/crates/driver/src/driver.rs b/crates/driver/src/driver.rs new file mode 100644 index 0000000..f43f8e8 --- /dev/null +++ b/crates/driver/src/driver.rs @@ -0,0 +1,367 @@ +use std::{ + collections::{HashMap, HashSet}, + os::unix::{net::UnixStream, prelude::*}, +}; + +use indoc::indoc; +use nix::poll; + +use common::{error::*, string_hash::*, types::*}; +use runner::{paths, Runner}; + +use crate::{ + context::{Context, TaskRef}, + resolve, + task::*, + template, +}; + +#[derive(Debug)] +pub struct CompletionState<'ctx> { + ctx: &'ctx Context, + tasks_done: HashMap, TaskOutput>, +} + +impl<'ctx> CompletionState<'ctx> { + pub fn new(ctx: &'ctx Context) -> Self { + CompletionState { + ctx, + tasks_done: Default::default(), + } + } + + // Treats both "depends" and "inherit" as dependencies + fn deps_satisfied(&self, task_ref: &TaskRef) -> bool { + resolve::get_dependent_tasks(self.ctx, task_ref) + .map_err(|_| Error::new(format!("invalid dependency for {}", task_ref))) + .unwrap() + .into_iter() + .all(|dep| self.tasks_done.contains_key(&dep)) + } + + fn fetch_deps(&self, task: &TaskRef<'ctx>) -> Result> { + let task_def = &self.ctx[task.id]; + task_def + .fetch + .iter() + .map(|Fetch { name, sha256 }| { + Ok(Dependency::Fetch { + name: template::ENGINE + .eval_raw(name, &task.args) + .with_context(|| { + format!("Failed to evaluate fetch filename for task {}", task) + })?, + sha256: *sha256, + }) + }) + .collect() + } + + fn task_deps(&self, task: &TaskRef<'ctx>) -> Result> { + Ok(self + .fetch_deps(task)? + .into_iter() + .chain( + resolve::runtime_depends( + self.ctx, + self.ctx + .get_build_depends(task) + .with_context(|| format!("invalid build depends for {}", task))?, + ) + .expect("invalid runtime depends of build_depends") + .into_iter() + .filter_map(|dep| self.tasks_done[&dep.task].outputs.get(dep.output)) + .map(|&output| Dependency::Task { + output, + path: "".to_string(), + }), + ) + .chain( + resolve::runtime_depends( + self.ctx, + self.ctx + .get_host_depends(task) + .with_context(|| format!("invalid depends for {}", task))?, + ) + .expect("invalid runtime depends of host_depends") + .into_iter() + .filter_map(|dep| self.tasks_done[&dep.task].outputs.get(dep.output)) + .map(|&output| Dependency::Task { + output, + path: paths::abs(paths::TASK_SYSROOT), + }), + ) + .collect()) + } + + fn task_inherit_chain(&self, task_ref: &TaskRef<'ctx>) -> Vec { + let inherit = match self + .ctx + .get_inherit_depend(task_ref) + .expect("invalid inherit depends") + { + Some(inherit) => inherit, + None => return vec![], + }; + + let mut chain = self.task_inherit_chain(&inherit); + if let Some(layer) = self.tasks_done[&inherit].layer { + chain.push(layer); + } + chain + } + + fn print_summary(&self) { + println!(); + println!("Summary:"); + + let mut tasks: Box<[_]> = self.tasks_done.iter().collect(); + tasks.sort_by_cached_key(|(task, _)| format!("{:#}", task)); + for (task_ref, task) in tasks.iter() { + println!(); + println!("{:#}", task_ref); + println!(" input: {}", task.input_hash); + if let Some(hash) = task.layer { + println!(" layer: {}", hash); + } + if !task.outputs.is_empty() { + println!(" outputs:"); + + let mut outputs: Box<[_]> = task.outputs.iter().collect(); + outputs.sort_by_key(|(output, _)| *output); + for (output, hash) in outputs.iter() { + println!(" {}: {}", output, hash); + } + } + } + } +} + +#[derive(Debug)] +pub struct Driver<'ctx> { + rdeps: HashMap, Vec>>, + tasks_blocked: HashSet>, + tasks_runnable: Vec>, + tasks_running: HashMap)>, + state: CompletionState<'ctx>, +} + +impl<'ctx> Driver<'ctx> { + pub fn new(ctx: &'ctx Context, taskset: HashSet>) -> Result { + let mut driver = Driver { + rdeps: Default::default(), + tasks_blocked: Default::default(), + tasks_runnable: Default::default(), + tasks_running: Default::default(), + state: CompletionState::new(ctx), + }; + + for task in taskset { + let mut has_depends = false; + for dep in resolve::get_dependent_tasks(ctx, &task) + .map_err(|_| Error::new(format!("invalid dependency for {}", task)))? + { + let rdep = driver.rdeps.entry(dep.clone()).or_default(); + rdep.push(task.clone()); + has_depends = true; + } + + if has_depends { + driver.tasks_blocked.insert(task); + } else { + driver.tasks_runnable.push(task); + } + } + + Ok(driver) + } + + fn task_setup(task_ref: &TaskRef<'ctx>) -> Vec<&'static str> { + let mut ret = vec![indoc! {" + export SOURCE_DATE_EPOCH=1 + + export AR_FOR_BUILD=ar + export AS_FOR_BUILD=as + export DLLTOOL_FOR_BUILD=dlltool + export CC_FOR_BUILD=gcc + export CXX_FOR_BUILD=g++ + export GCC_FOR_BUILD=gcc + export GFORTRAN_FOR_BUILD=gfortran + export GOC_FOR_BUILD=goc + export LD_FOR_BUILD=ld + export LIPO_FOR_BUILD=lipo + export NM_FOR_BUILD=nm + export OBJCOPY_FOR_BUILD=objcopy + export OBJDUMP_FOR_BUILD=objdump + export RANLIB_FOR_BUILD=ranlib + export STRIP_FOR_BUILD=strip + export WINDRES_FOR_BUILD=windres + export WINDMC_FOR_BUILD=windmc + "}]; + + if task_ref.args.contains_key("build_to_host") { + ret.push(indoc! {" + export AR={{build_to_host.cross_compile}}ar + export AS={{build_to_host.cross_compile}}as + export DLLTOOL={{build_to_host.cross_compile}}dlltool + export CC={{build_to_host.cross_compile}}gcc + export CXX={{build_to_host.cross_compile}}g++ + export GCC={{build_to_host.cross_compile}}gcc + export GFORTRAN={{build_to_host.cross_compile}}gfortran + export GOC={{build_to_host.cross_compile}}goc + export LD={{build_to_host.cross_compile}}ld + export LIPO={{build_to_host.cross_compile}}lipo + export NM={{build_to_host.cross_compile}}nm + export OBJCOPY={{build_to_host.cross_compile}}objcopy + export OBJDUMP={{build_to_host.cross_compile}}objdump + export RANLIB={{build_to_host.cross_compile}}ranlib + export STRIP={{build_to_host.cross_compile}}strip + export WINDRES={{build_to_host.cross_compile}}windres + export WINDMC={{build_to_host.cross_compile}}windmc + "}); + } + + if task_ref.args.contains_key("build_to_target") { + ret.push(indoc! {" + export AR_FOR_TARGET={{build_to_target.cross_compile}}ar + export AS_FOR_TARGET={{build_to_target.cross_compile}}as + export DLLTOOL_FOR_TARGET={{build_to_target.cross_compile}}dlltool + export CC_FOR_TARGET={{build_to_target.cross_compile}}gcc + export CXX_FOR_TARGET={{build_to_target.cross_compile}}g++ + export GCC_FOR_TARGET={{build_to_target.cross_compile}}gcc + export GFORTRAN_FOR_TARGET={{build_to_target.cross_compile}}gfortran + export GOC_FOR_TARGET={{build_to_target.cross_compile}}goc + export LD_FOR_TARGET={{build_to_target.cross_compile}}ld + export LIPO_FOR_TARGET={{build_to_target.cross_compile}}lipo + export NM_FOR_TARGET={{build_to_target.cross_compile}}nm + export OBJCOPY_FOR_TARGET={{build_to_target.cross_compile}}objcopy + export OBJDUMP_FOR_TARGET={{build_to_target.cross_compile}}objdump + export RANLIB_FOR_TARGET={{build_to_target.cross_compile}}ranlib + export STRIP_FOR_TARGET={{build_to_target.cross_compile}}strip + export WINDRES_FOR_TARGET={{build_to_target.cross_compile}}windres + export WINDMC_FOR_TARGET={{build_to_target.cross_compile}}windmc + "}); + } + ret + } + + fn update_runnable(&mut self, task_ref: TaskRef<'ctx>, task_output: TaskOutput) { + let rdeps = self.rdeps.get(&task_ref); + + self.state.tasks_done.insert(task_ref, task_output); + + for rdep in rdeps.unwrap_or(&Vec::new()) { + if !self.tasks_blocked.contains(rdep) { + continue; + } + if self.state.deps_satisfied(rdep) { + self.tasks_blocked.remove(rdep); + self.tasks_runnable.push(rdep.clone()); + } + } + } + + fn spawn_task(&self, task_ref: &TaskRef<'ctx>, runner: &Runner) -> Result { + let task_def = &self.state.ctx[task_ref.id]; + let task_deps = self.state.task_deps(task_ref)?; + let task_output = task_def + .output + .iter() + .map(|(name, Output { path, .. })| { + ( + name.clone(), + path.as_ref().map(String::as_str).unwrap_or(".").to_string(), + ) + }) + .collect(); + + let inherit_chain = self.state.task_inherit_chain(task_ref); + + let mut run = Self::task_setup(task_ref); + run.push(&task_def.action.run); + + let command = template::ENGINE + .eval(&run.concat(), &task_ref.args) + .with_context(|| { + format!("Failed to evaluate command template for task {}", task_ref) + })?; + + let task = Task { + label: format!("{:#}", task_ref), + command, + inherit: inherit_chain, + depends: task_deps, + outputs: task_output, + }; + + Ok(runner.spawn(&task)) + } + + fn run_task(&mut self, task_ref: TaskRef<'ctx>, runner: &Runner) -> Result<()> { + let socket = self.spawn_task(&task_ref, runner)?; + assert!(self + .tasks_running + .insert(socket.as_raw_fd(), (socket, task_ref)) + .is_none()); + Ok(()) + } + + fn run_tasks(&mut self, runner: &Runner) -> Result<()> { + while let Some(task_ref) = self.tasks_runnable.pop() { + self.run_task(task_ref, runner)?; + } + Ok(()) + } + + fn wait_for_task(&mut self) -> Result<()> { + let mut pollfds: Box<[_]> = self + .tasks_running + .keys() + .copied() + .map(|fd| poll::PollFd::new(fd, poll::PollFlags::POLLIN)) + .collect(); + + while poll::poll(&mut pollfds, -1).context("poll()")? == 0 {} + + for pollfd in &*pollfds { + let events = pollfd.revents().expect("Unknown events in poll() return"); + if !events.contains(poll::PollFlags::POLLIN) { + if events.intersects(!poll::PollFlags::POLLIN) { + return Err(Error::new( + "Unexpected error status for socket file descriptor", + )); + } + continue; + } + + let fd = pollfd.as_raw_fd(); + let (socket, task_ref) = self.tasks_running.remove(&fd).unwrap(); + + let task_output = Runner::result(&socket)?; + self.update_runnable(task_ref, task_output); + } + + Ok(()) + } + + fn is_empty(&self) -> bool { + self.tasks_runnable.is_empty() && self.tasks_running.is_empty() + } + + fn is_done(&self) -> bool { + self.is_empty() && self.tasks_blocked.is_empty() + } + + pub fn run(&mut self, runner: &Runner) -> Result<()> { + while !self.is_empty() { + self.run_tasks(runner)?; + self.wait_for_task()?; + } + + assert!(self.is_done(), "No runnable tasks left"); + + self.state.print_summary(); + + Ok(()) + } +} -- cgit v1.2.3