summaryrefslogtreecommitdiffstats
path: root/crates/driver/src/driver.rs
diff options
context:
space:
mode:
authorMatthias Schiffer <mschiffer@universe-factory.net>2021-10-28 18:29:57 +0200
committerMatthias Schiffer <mschiffer@universe-factory.net>2021-10-28 18:31:02 +0200
commit86f4c08b81e2129b5d1012c1350e68e3c0560282 (patch)
tree750c520fcf6518d96a51d84dd279e437203518d1 /crates/driver/src/driver.rs
parentb35a75f4cb5417bb464639079d266fd708549b32 (diff)
downloadrebel-86f4c08b81e2129b5d1012c1350e68e3c0560282.tar
rebel-86f4c08b81e2129b5d1012c1350e68e3c0560282.zip
Rename executor to driver
Diffstat (limited to 'crates/driver/src/driver.rs')
-rw-r--r--crates/driver/src/driver.rs367
1 files changed, 367 insertions, 0 deletions
diff --git a/crates/driver/src/driver.rs b/crates/driver/src/driver.rs
new file mode 100644
index 0000000..f43f8e8
--- /dev/null
+++ b/crates/driver/src/driver.rs
@@ -0,0 +1,367 @@
+use std::{
+ collections::{HashMap, HashSet},
+ os::unix::{net::UnixStream, prelude::*},
+};
+
+use indoc::indoc;
+use nix::poll;
+
+use common::{error::*, string_hash::*, types::*};
+use runner::{paths, Runner};
+
+use crate::{
+ context::{Context, TaskRef},
+ resolve,
+ task::*,
+ template,
+};
+
+#[derive(Debug)]
+pub struct CompletionState<'ctx> {
+ ctx: &'ctx Context,
+ tasks_done: HashMap<TaskRef<'ctx>, TaskOutput>,
+}
+
+impl<'ctx> CompletionState<'ctx> {
+ pub fn new(ctx: &'ctx Context) -> Self {
+ CompletionState {
+ ctx,
+ tasks_done: Default::default(),
+ }
+ }
+
+ // Treats both "depends" and "inherit" as dependencies
+ fn deps_satisfied(&self, task_ref: &TaskRef) -> bool {
+ resolve::get_dependent_tasks(self.ctx, task_ref)
+ .map_err(|_| Error::new(format!("invalid dependency for {}", task_ref)))
+ .unwrap()
+ .into_iter()
+ .all(|dep| self.tasks_done.contains_key(&dep))
+ }
+
+ fn fetch_deps(&self, task: &TaskRef<'ctx>) -> Result<Vec<Dependency>> {
+ let task_def = &self.ctx[task.id];
+ task_def
+ .fetch
+ .iter()
+ .map(|Fetch { name, sha256 }| {
+ Ok(Dependency::Fetch {
+ name: template::ENGINE
+ .eval_raw(name, &task.args)
+ .with_context(|| {
+ format!("Failed to evaluate fetch filename for task {}", task)
+ })?,
+ sha256: *sha256,
+ })
+ })
+ .collect()
+ }
+
+ fn task_deps(&self, task: &TaskRef<'ctx>) -> Result<HashSet<Dependency>> {
+ Ok(self
+ .fetch_deps(task)?
+ .into_iter()
+ .chain(
+ resolve::runtime_depends(
+ self.ctx,
+ self.ctx
+ .get_build_depends(task)
+ .with_context(|| format!("invalid build depends for {}", task))?,
+ )
+ .expect("invalid runtime depends of build_depends")
+ .into_iter()
+ .filter_map(|dep| self.tasks_done[&dep.task].outputs.get(dep.output))
+ .map(|&output| Dependency::Task {
+ output,
+ path: "".to_string(),
+ }),
+ )
+ .chain(
+ resolve::runtime_depends(
+ self.ctx,
+ self.ctx
+ .get_host_depends(task)
+ .with_context(|| format!("invalid depends for {}", task))?,
+ )
+ .expect("invalid runtime depends of host_depends")
+ .into_iter()
+ .filter_map(|dep| self.tasks_done[&dep.task].outputs.get(dep.output))
+ .map(|&output| Dependency::Task {
+ output,
+ path: paths::abs(paths::TASK_SYSROOT),
+ }),
+ )
+ .collect())
+ }
+
+ fn task_inherit_chain(&self, task_ref: &TaskRef<'ctx>) -> Vec<LayerHash> {
+ let inherit = match self
+ .ctx
+ .get_inherit_depend(task_ref)
+ .expect("invalid inherit depends")
+ {
+ Some(inherit) => inherit,
+ None => return vec![],
+ };
+
+ let mut chain = self.task_inherit_chain(&inherit);
+ if let Some(layer) = self.tasks_done[&inherit].layer {
+ chain.push(layer);
+ }
+ chain
+ }
+
+ fn print_summary(&self) {
+ println!();
+ println!("Summary:");
+
+ let mut tasks: Box<[_]> = self.tasks_done.iter().collect();
+ tasks.sort_by_cached_key(|(task, _)| format!("{:#}", task));
+ for (task_ref, task) in tasks.iter() {
+ println!();
+ println!("{:#}", task_ref);
+ println!(" input: {}", task.input_hash);
+ if let Some(hash) = task.layer {
+ println!(" layer: {}", hash);
+ }
+ if !task.outputs.is_empty() {
+ println!(" outputs:");
+
+ let mut outputs: Box<[_]> = task.outputs.iter().collect();
+ outputs.sort_by_key(|(output, _)| *output);
+ for (output, hash) in outputs.iter() {
+ println!(" {}: {}", output, hash);
+ }
+ }
+ }
+ }
+}
+
+#[derive(Debug)]
+pub struct Driver<'ctx> {
+ rdeps: HashMap<TaskRef<'ctx>, Vec<TaskRef<'ctx>>>,
+ tasks_blocked: HashSet<TaskRef<'ctx>>,
+ tasks_runnable: Vec<TaskRef<'ctx>>,
+ tasks_running: HashMap<RawFd, (UnixStream, TaskRef<'ctx>)>,
+ state: CompletionState<'ctx>,
+}
+
+impl<'ctx> Driver<'ctx> {
+ pub fn new(ctx: &'ctx Context, taskset: HashSet<TaskRef<'ctx>>) -> Result<Self> {
+ let mut driver = Driver {
+ rdeps: Default::default(),
+ tasks_blocked: Default::default(),
+ tasks_runnable: Default::default(),
+ tasks_running: Default::default(),
+ state: CompletionState::new(ctx),
+ };
+
+ for task in taskset {
+ let mut has_depends = false;
+ for dep in resolve::get_dependent_tasks(ctx, &task)
+ .map_err(|_| Error::new(format!("invalid dependency for {}", task)))?
+ {
+ let rdep = driver.rdeps.entry(dep.clone()).or_default();
+ rdep.push(task.clone());
+ has_depends = true;
+ }
+
+ if has_depends {
+ driver.tasks_blocked.insert(task);
+ } else {
+ driver.tasks_runnable.push(task);
+ }
+ }
+
+ Ok(driver)
+ }
+
+ fn task_setup(task_ref: &TaskRef<'ctx>) -> Vec<&'static str> {
+ let mut ret = vec![indoc! {"
+ export SOURCE_DATE_EPOCH=1
+
+ export AR_FOR_BUILD=ar
+ export AS_FOR_BUILD=as
+ export DLLTOOL_FOR_BUILD=dlltool
+ export CC_FOR_BUILD=gcc
+ export CXX_FOR_BUILD=g++
+ export GCC_FOR_BUILD=gcc
+ export GFORTRAN_FOR_BUILD=gfortran
+ export GOC_FOR_BUILD=goc
+ export LD_FOR_BUILD=ld
+ export LIPO_FOR_BUILD=lipo
+ export NM_FOR_BUILD=nm
+ export OBJCOPY_FOR_BUILD=objcopy
+ export OBJDUMP_FOR_BUILD=objdump
+ export RANLIB_FOR_BUILD=ranlib
+ export STRIP_FOR_BUILD=strip
+ export WINDRES_FOR_BUILD=windres
+ export WINDMC_FOR_BUILD=windmc
+ "}];
+
+ if task_ref.args.contains_key("build_to_host") {
+ ret.push(indoc! {"
+ export AR={{build_to_host.cross_compile}}ar
+ export AS={{build_to_host.cross_compile}}as
+ export DLLTOOL={{build_to_host.cross_compile}}dlltool
+ export CC={{build_to_host.cross_compile}}gcc
+ export CXX={{build_to_host.cross_compile}}g++
+ export GCC={{build_to_host.cross_compile}}gcc
+ export GFORTRAN={{build_to_host.cross_compile}}gfortran
+ export GOC={{build_to_host.cross_compile}}goc
+ export LD={{build_to_host.cross_compile}}ld
+ export LIPO={{build_to_host.cross_compile}}lipo
+ export NM={{build_to_host.cross_compile}}nm
+ export OBJCOPY={{build_to_host.cross_compile}}objcopy
+ export OBJDUMP={{build_to_host.cross_compile}}objdump
+ export RANLIB={{build_to_host.cross_compile}}ranlib
+ export STRIP={{build_to_host.cross_compile}}strip
+ export WINDRES={{build_to_host.cross_compile}}windres
+ export WINDMC={{build_to_host.cross_compile}}windmc
+ "});
+ }
+
+ if task_ref.args.contains_key("build_to_target") {
+ ret.push(indoc! {"
+ export AR_FOR_TARGET={{build_to_target.cross_compile}}ar
+ export AS_FOR_TARGET={{build_to_target.cross_compile}}as
+ export DLLTOOL_FOR_TARGET={{build_to_target.cross_compile}}dlltool
+ export CC_FOR_TARGET={{build_to_target.cross_compile}}gcc
+ export CXX_FOR_TARGET={{build_to_target.cross_compile}}g++
+ export GCC_FOR_TARGET={{build_to_target.cross_compile}}gcc
+ export GFORTRAN_FOR_TARGET={{build_to_target.cross_compile}}gfortran
+ export GOC_FOR_TARGET={{build_to_target.cross_compile}}goc
+ export LD_FOR_TARGET={{build_to_target.cross_compile}}ld
+ export LIPO_FOR_TARGET={{build_to_target.cross_compile}}lipo
+ export NM_FOR_TARGET={{build_to_target.cross_compile}}nm
+ export OBJCOPY_FOR_TARGET={{build_to_target.cross_compile}}objcopy
+ export OBJDUMP_FOR_TARGET={{build_to_target.cross_compile}}objdump
+ export RANLIB_FOR_TARGET={{build_to_target.cross_compile}}ranlib
+ export STRIP_FOR_TARGET={{build_to_target.cross_compile}}strip
+ export WINDRES_FOR_TARGET={{build_to_target.cross_compile}}windres
+ export WINDMC_FOR_TARGET={{build_to_target.cross_compile}}windmc
+ "});
+ }
+ ret
+ }
+
+ fn update_runnable(&mut self, task_ref: TaskRef<'ctx>, task_output: TaskOutput) {
+ let rdeps = self.rdeps.get(&task_ref);
+
+ self.state.tasks_done.insert(task_ref, task_output);
+
+ for rdep in rdeps.unwrap_or(&Vec::new()) {
+ if !self.tasks_blocked.contains(rdep) {
+ continue;
+ }
+ if self.state.deps_satisfied(rdep) {
+ self.tasks_blocked.remove(rdep);
+ self.tasks_runnable.push(rdep.clone());
+ }
+ }
+ }
+
+ fn spawn_task(&self, task_ref: &TaskRef<'ctx>, runner: &Runner) -> Result<UnixStream> {
+ let task_def = &self.state.ctx[task_ref.id];
+ let task_deps = self.state.task_deps(task_ref)?;
+ let task_output = task_def
+ .output
+ .iter()
+ .map(|(name, Output { path, .. })| {
+ (
+ name.clone(),
+ path.as_ref().map(String::as_str).unwrap_or(".").to_string(),
+ )
+ })
+ .collect();
+
+ let inherit_chain = self.state.task_inherit_chain(task_ref);
+
+ let mut run = Self::task_setup(task_ref);
+ run.push(&task_def.action.run);
+
+ let command = template::ENGINE
+ .eval(&run.concat(), &task_ref.args)
+ .with_context(|| {
+ format!("Failed to evaluate command template for task {}", task_ref)
+ })?;
+
+ let task = Task {
+ label: format!("{:#}", task_ref),
+ command,
+ inherit: inherit_chain,
+ depends: task_deps,
+ outputs: task_output,
+ };
+
+ Ok(runner.spawn(&task))
+ }
+
+ fn run_task(&mut self, task_ref: TaskRef<'ctx>, runner: &Runner) -> Result<()> {
+ let socket = self.spawn_task(&task_ref, runner)?;
+ assert!(self
+ .tasks_running
+ .insert(socket.as_raw_fd(), (socket, task_ref))
+ .is_none());
+ Ok(())
+ }
+
+ fn run_tasks(&mut self, runner: &Runner) -> Result<()> {
+ while let Some(task_ref) = self.tasks_runnable.pop() {
+ self.run_task(task_ref, runner)?;
+ }
+ Ok(())
+ }
+
+ fn wait_for_task(&mut self) -> Result<()> {
+ let mut pollfds: Box<[_]> = self
+ .tasks_running
+ .keys()
+ .copied()
+ .map(|fd| poll::PollFd::new(fd, poll::PollFlags::POLLIN))
+ .collect();
+
+ while poll::poll(&mut pollfds, -1).context("poll()")? == 0 {}
+
+ for pollfd in &*pollfds {
+ let events = pollfd.revents().expect("Unknown events in poll() return");
+ if !events.contains(poll::PollFlags::POLLIN) {
+ if events.intersects(!poll::PollFlags::POLLIN) {
+ return Err(Error::new(
+ "Unexpected error status for socket file descriptor",
+ ));
+ }
+ continue;
+ }
+
+ let fd = pollfd.as_raw_fd();
+ let (socket, task_ref) = self.tasks_running.remove(&fd).unwrap();
+
+ let task_output = Runner::result(&socket)?;
+ self.update_runnable(task_ref, task_output);
+ }
+
+ Ok(())
+ }
+
+ fn is_empty(&self) -> bool {
+ self.tasks_runnable.is_empty() && self.tasks_running.is_empty()
+ }
+
+ fn is_done(&self) -> bool {
+ self.is_empty() && self.tasks_blocked.is_empty()
+ }
+
+ pub fn run(&mut self, runner: &Runner) -> Result<()> {
+ while !self.is_empty() {
+ self.run_tasks(runner)?;
+ self.wait_for_task()?;
+ }
+
+ assert!(self.is_done(), "No runnable tasks left");
+
+ self.state.print_summary();
+
+ Ok(())
+ }
+}