diff options
Diffstat (limited to 'crates/driver/src/driver.rs')
-rw-r--r-- | crates/driver/src/driver.rs | 402 |
1 files changed, 0 insertions, 402 deletions
diff --git a/crates/driver/src/driver.rs b/crates/driver/src/driver.rs deleted file mode 100644 index dfdd6e9..0000000 --- a/crates/driver/src/driver.rs +++ /dev/null @@ -1,402 +0,0 @@ -use std::{ - collections::{HashMap, HashSet}, - os::unix::{net::UnixStream, prelude::*}, -}; - -use indoc::indoc; -use nix::poll; - -use common::{error::*, string_hash::*, types::*}; -use runner::Runner; - -use crate::{ - context::{Context, TaskRef}, - paths, resolve, - task::*, - template, -}; - -#[derive(Debug)] -pub struct CompletionState<'ctx> { - ctx: &'ctx Context, - tasks_done: HashMap<TaskRef<'ctx>, TaskOutput>, -} - -impl<'ctx> CompletionState<'ctx> { - pub fn new(ctx: &'ctx Context) -> Self { - CompletionState { - ctx, - tasks_done: Default::default(), - } - } - - // Treats both "depends" and "inherit" as dependencies - fn deps_satisfied(&self, task_ref: &TaskRef) -> bool { - resolve::get_dependent_tasks(self.ctx, task_ref) - .map_err(|_| Error::new(format!("invalid dependency for {}", task_ref))) - .unwrap() - .into_iter() - .all(|dep| self.tasks_done.contains_key(&dep)) - } - - fn fetch_deps(&self, task: &TaskRef<'ctx>) -> Result<Vec<Dependency>> { - let task_def = &self.ctx[task]; - task_def - .fetch - .iter() - .map(|Fetch { name, sha256 }| { - Ok(Dependency::Fetch { - name: template::ENGINE - .eval_raw(name, &task.args) - .with_context(|| { - format!("Failed to evaluate fetch filename for task {}", task) - })?, - target_dir: paths::TASK_DLDIR.to_string(), - sha256: *sha256, - }) - }) - .collect() - } - - fn task_deps(&self, task: &TaskRef<'ctx>) -> Result<HashSet<Dependency>> { - Ok(self - .fetch_deps(task)? - .into_iter() - .chain( - resolve::runtime_depends( - self.ctx, - self.ctx - .get_build_depends(task) - .with_context(|| format!("invalid build depends for {}", task))?, - ) - .expect("invalid runtime depends of build_depends") - .into_iter() - .filter_map(|dep| self.tasks_done[&dep.task].outputs.get(dep.output)) - .map(|&output| Dependency::Task { - output, - path: "".to_string(), - }), - ) - .chain( - resolve::runtime_depends( - self.ctx, - self.ctx - .get_host_depends(task) - .with_context(|| format!("invalid depends for {}", task))?, - ) - .expect("invalid runtime depends of host_depends") - .into_iter() - .filter_map(|dep| self.tasks_done[&dep.task].outputs.get(dep.output)) - .map(|&output| Dependency::Task { - output, - path: paths::TASK_SYSROOT.to_string(), - }), - ) - .collect()) - } - - fn task_inherit_chain(&self, task_ref: &TaskRef<'ctx>) -> Vec<LayerHash> { - let inherit = match self - .ctx - .get_inherit_depend(task_ref) - .expect("invalid inherit depends") - { - Some(inherit) => inherit, - None => return vec![], - }; - - let mut chain = self.task_inherit_chain(&inherit); - if let Some(layer) = self.tasks_done[&inherit].layer { - chain.push(layer); - } - chain - } - - fn print_summary(&self) { - println!(); - println!("Summary:"); - - let mut tasks: Box<[_]> = self.tasks_done.iter().collect(); - tasks.sort_by_cached_key(|(task, _)| format!("{:#}", task)); - for (task_ref, task) in tasks.iter() { - println!(); - println!("{:#}", task_ref); - if let Some(hash) = task.input_hash { - println!(" input: {}", hash); - } - if let Some(hash) = task.layer { - println!(" layer: {}", hash); - } - if !task.outputs.is_empty() { - println!(" outputs:"); - - let mut outputs: Box<[_]> = task.outputs.iter().collect(); - outputs.sort_by_key(|(output, _)| *output); - for (output, hash) in outputs.iter() { - println!(" {}: {}", output, hash); - } - } - } - } -} - -enum SpawnResult { - Spawned(UnixStream), - Skipped(TaskOutput), -} - -#[derive(Debug)] -pub struct Driver<'ctx> { - rdeps: HashMap<TaskRef<'ctx>, Vec<TaskRef<'ctx>>>, - force_run: HashSet<TaskRef<'ctx>>, - tasks_blocked: HashSet<TaskRef<'ctx>>, - tasks_runnable: Vec<TaskRef<'ctx>>, - tasks_running: HashMap<RawFd, (UnixStream, TaskRef<'ctx>)>, - state: CompletionState<'ctx>, -} - -impl<'ctx> Driver<'ctx> { - pub fn new( - ctx: &'ctx Context, - taskset: HashSet<TaskRef<'ctx>>, - force_run: HashSet<TaskRef<'ctx>>, - ) -> Result<Self> { - let mut driver = Driver { - rdeps: Default::default(), - force_run, - tasks_blocked: Default::default(), - tasks_runnable: Default::default(), - tasks_running: Default::default(), - state: CompletionState::new(ctx), - }; - - for task in taskset { - let mut has_depends = false; - for dep in resolve::get_dependent_tasks(ctx, &task) - .map_err(|_| Error::new(format!("invalid dependency for {}", task)))? - { - let rdep = driver.rdeps.entry(dep.clone()).or_default(); - rdep.push(task.clone()); - has_depends = true; - } - - if has_depends { - driver.tasks_blocked.insert(task); - } else { - driver.tasks_runnable.push(task); - } - } - - Ok(driver) - } - - fn task_setup(task_ref: &TaskRef<'ctx>) -> Vec<&'static str> { - let mut ret = vec![indoc! {" - export PATH={{build.prefix}}/sbin:{{build.prefix}}/bin:$PATH - cd {{workdir}} - - export SOURCE_DATE_EPOCH=1 - - export AR_FOR_BUILD=ar - export AS_FOR_BUILD=as - export DLLTOOL_FOR_BUILD=dlltool - export CC_FOR_BUILD=gcc - export CXX_FOR_BUILD=g++ - export GCC_FOR_BUILD=gcc - export GFORTRAN_FOR_BUILD=gfortran - export GOC_FOR_BUILD=goc - export LD_FOR_BUILD=ld - export LIPO_FOR_BUILD=lipo - export NM_FOR_BUILD=nm - export OBJCOPY_FOR_BUILD=objcopy - export OBJDUMP_FOR_BUILD=objdump - export RANLIB_FOR_BUILD=ranlib - export STRIP_FOR_BUILD=strip - export WINDRES_FOR_BUILD=windres - export WINDMC_FOR_BUILD=windmc - "}]; - - if task_ref.args.contains_key("build_to_host") { - ret.push(indoc! {" - export AR={{build_to_host.cross_compile}}ar - export AS={{build_to_host.cross_compile}}as - export DLLTOOL={{build_to_host.cross_compile}}dlltool - export CC={{build_to_host.cross_compile}}gcc - export CXX={{build_to_host.cross_compile}}g++ - export GCC={{build_to_host.cross_compile}}gcc - export GFORTRAN={{build_to_host.cross_compile}}gfortran - export GOC={{build_to_host.cross_compile}}goc - export LD={{build_to_host.cross_compile}}ld - export LIPO={{build_to_host.cross_compile}}lipo - export NM={{build_to_host.cross_compile}}nm - export OBJCOPY={{build_to_host.cross_compile}}objcopy - export OBJDUMP={{build_to_host.cross_compile}}objdump - export RANLIB={{build_to_host.cross_compile}}ranlib - export STRIP={{build_to_host.cross_compile}}strip - export WINDRES={{build_to_host.cross_compile}}windres - export WINDMC={{build_to_host.cross_compile}}windmc - "}); - } - - if task_ref.args.contains_key("build_to_target") { - ret.push(indoc! {" - export AR_FOR_TARGET={{build_to_target.cross_compile}}ar - export AS_FOR_TARGET={{build_to_target.cross_compile}}as - export DLLTOOL_FOR_TARGET={{build_to_target.cross_compile}}dlltool - export CC_FOR_TARGET={{build_to_target.cross_compile}}gcc - export CXX_FOR_TARGET={{build_to_target.cross_compile}}g++ - export GCC_FOR_TARGET={{build_to_target.cross_compile}}gcc - export GFORTRAN_FOR_TARGET={{build_to_target.cross_compile}}gfortran - export GOC_FOR_TARGET={{build_to_target.cross_compile}}goc - export LD_FOR_TARGET={{build_to_target.cross_compile}}ld - export LIPO_FOR_TARGET={{build_to_target.cross_compile}}lipo - export NM_FOR_TARGET={{build_to_target.cross_compile}}nm - export OBJCOPY_FOR_TARGET={{build_to_target.cross_compile}}objcopy - export OBJDUMP_FOR_TARGET={{build_to_target.cross_compile}}objdump - export RANLIB_FOR_TARGET={{build_to_target.cross_compile}}ranlib - export STRIP_FOR_TARGET={{build_to_target.cross_compile}}strip - export WINDRES_FOR_TARGET={{build_to_target.cross_compile}}windres - export WINDMC_FOR_TARGET={{build_to_target.cross_compile}}windmc - "}); - } - ret - } - - fn update_runnable(&mut self, task_ref: TaskRef<'ctx>, task_output: TaskOutput) { - let rdeps = self.rdeps.get(&task_ref); - - self.state.tasks_done.insert(task_ref, task_output); - - for rdep in rdeps.unwrap_or(&Vec::new()) { - if !self.tasks_blocked.contains(rdep) { - continue; - } - if self.state.deps_satisfied(rdep) { - self.tasks_blocked.remove(rdep); - self.tasks_runnable.push(rdep.clone()); - } - } - } - - fn spawn_task(&self, task_ref: &TaskRef<'ctx>, runner: &Runner) -> Result<SpawnResult> { - let task_def = &self.state.ctx[task_ref]; - if task_def.action.is_empty() { - println!("Skipping empty task {:#}", task_ref); - return Ok(SpawnResult::Skipped(TaskOutput::default())); - } - - let task_deps = self.state.task_deps(task_ref)?; - let task_output = task_def - .output - .iter() - .map(|(name, Output { path, .. })| { - let output_path = if let Some(path) = path { - format!("{}/{}", paths::TASK_DESTDIR, path) - } else { - paths::TASK_DESTDIR.to_string() - }; - (name.clone(), output_path) - }) - .collect(); - - let inherit_chain = self.state.task_inherit_chain(task_ref); - - let mut run = Self::task_setup(task_ref); - run.push(&task_def.action.run); - - let command = template::ENGINE - .eval(&run.concat(), &task_ref.args) - .with_context(|| { - format!("Failed to evaluate command template for task {}", task_ref) - })?; - - let rootfs = self.state.ctx.get_rootfs(); - let task = Task { - label: format!("{:#}", task_ref), - command, - workdir: paths::TASK_WORKDIR.to_string(), - rootfs: rootfs.0, - inherit: inherit_chain, - depends: task_deps, - outputs: task_output, - pins: HashMap::from([rootfs.clone()]), - force_run: self.force_run.contains(task_ref), - }; - - Ok(SpawnResult::Spawned(runner.spawn(&task))) - } - - fn run_task(&mut self, task_ref: TaskRef<'ctx>, runner: &Runner) -> Result<()> { - match self.spawn_task(&task_ref, runner)? { - SpawnResult::Spawned(socket) => { - assert!(self - .tasks_running - .insert(socket.as_raw_fd(), (socket, task_ref)) - .is_none()); - } - SpawnResult::Skipped(result) => { - self.update_runnable(task_ref, result); - } - } - Ok(()) - } - - fn run_tasks(&mut self, runner: &Runner) -> Result<()> { - while let Some(task_ref) = self.tasks_runnable.pop() { - self.run_task(task_ref, runner)?; - } - Ok(()) - } - - fn wait_for_task(&mut self) -> Result<()> { - let mut pollfds: Box<[_]> = self - .tasks_running - .keys() - .copied() - .map(|fd| poll::PollFd::new(fd, poll::PollFlags::POLLIN)) - .collect(); - - while poll::poll(&mut pollfds, -1).context("poll()")? == 0 {} - - for pollfd in &*pollfds { - let events = pollfd.revents().expect("Unknown events in poll() return"); - if !events.contains(poll::PollFlags::POLLIN) { - if events.intersects(!poll::PollFlags::POLLIN) { - return Err(Error::new( - "Unexpected error status for socket file descriptor", - )); - } - continue; - } - - let fd = pollfd.as_raw_fd(); - let (socket, task_ref) = self.tasks_running.remove(&fd).unwrap(); - - let task_output = Runner::result(&socket)?; - self.update_runnable(task_ref, task_output); - } - - Ok(()) - } - - fn is_empty(&self) -> bool { - self.tasks_runnable.is_empty() && self.tasks_running.is_empty() - } - - fn is_done(&self) -> bool { - self.is_empty() && self.tasks_blocked.is_empty() - } - - pub fn run(&mut self, runner: &Runner) -> Result<()> { - while !self.is_empty() { - self.run_tasks(runner)?; - self.wait_for_task()?; - } - - assert!(self.is_done(), "No runnable tasks left"); - - self.state.print_summary(); - - Ok(()) - } -} |