use std::{ collections::{HashMap, HashSet}, os::unix::{net::UnixStream, prelude::*}, }; use indoc::indoc; use nix::poll; use common::{error::*, string_hash::*, types::*}; use runner::Runner; use crate::{ context::{Context, TaskRef}, paths, resolve, task::*, template, }; #[derive(Debug)] pub struct CompletionState<'ctx> { ctx: &'ctx Context, tasks_done: HashMap, TaskOutput>, } impl<'ctx> CompletionState<'ctx> { pub fn new(ctx: &'ctx Context) -> Self { CompletionState { ctx, tasks_done: Default::default(), } } // Treats both "depends" and "inherit" as dependencies fn deps_satisfied(&self, task_ref: &TaskRef) -> bool { resolve::get_dependent_tasks(self.ctx, task_ref) .map_err(|_| Error::new(format!("invalid dependency for {}", task_ref))) .unwrap() .into_iter() .all(|dep| self.tasks_done.contains_key(&dep)) } fn fetch_deps(&self, task: &TaskRef<'ctx>) -> Result> { let task_def = &self.ctx[task]; task_def .fetch .iter() .map(|Fetch { name, sha256 }| { Ok(Dependency::Fetch { name: template::ENGINE .eval_raw(name, &task.args) .with_context(|| { format!("Failed to evaluate fetch filename for task {}", task) })?, target_dir: paths::TASK_DLDIR.to_string(), sha256: *sha256, }) }) .collect() } fn task_deps(&self, task: &TaskRef<'ctx>) -> Result> { Ok(self .fetch_deps(task)? .into_iter() .chain( resolve::runtime_depends( self.ctx, self.ctx .get_build_depends(task) .with_context(|| format!("invalid build depends for {}", task))?, ) .expect("invalid runtime depends of build_depends") .into_iter() .filter_map(|dep| self.tasks_done[&dep.task].outputs.get(dep.output)) .map(|&output| Dependency::Task { output, path: "".to_string(), }), ) .chain( resolve::runtime_depends( self.ctx, self.ctx .get_host_depends(task) .with_context(|| format!("invalid depends for {}", task))?, ) .expect("invalid runtime depends of host_depends") .into_iter() .filter_map(|dep| self.tasks_done[&dep.task].outputs.get(dep.output)) .map(|&output| Dependency::Task { output, path: paths::TASK_SYSROOT.to_string(), }), ) .collect()) } fn task_inherit_chain(&self, task_ref: &TaskRef<'ctx>) -> Vec { let inherit = match self .ctx .get_inherit_depend(task_ref) .expect("invalid inherit depends") { Some(inherit) => inherit, None => return vec![], }; let mut chain = self.task_inherit_chain(&inherit); if let Some(layer) = self.tasks_done[&inherit].layer { chain.push(layer); } chain } fn print_summary(&self) { println!(); println!("Summary:"); let mut tasks: Box<[_]> = self.tasks_done.iter().collect(); tasks.sort_by_cached_key(|(task, _)| format!("{:#}", task)); for (task_ref, task) in tasks.iter() { println!(); println!("{:#}", task_ref); if let Some(hash) = task.input_hash { println!(" input: {}", hash); } if let Some(hash) = task.layer { println!(" layer: {}", hash); } if !task.outputs.is_empty() { println!(" outputs:"); let mut outputs: Box<[_]> = task.outputs.iter().collect(); outputs.sort_by_key(|(output, _)| *output); for (output, hash) in outputs.iter() { println!(" {}: {}", output, hash); } } } } } #[derive(Debug)] pub struct Driver<'ctx> { rdeps: HashMap, Vec>>, tasks_blocked: HashSet>, tasks_runnable: Vec>, tasks_running: HashMap)>, state: CompletionState<'ctx>, } impl<'ctx> Driver<'ctx> { pub fn new(ctx: &'ctx Context, taskset: HashSet>) -> Result { let mut driver = Driver { rdeps: Default::default(), tasks_blocked: Default::default(), tasks_runnable: Default::default(), tasks_running: Default::default(), state: CompletionState::new(ctx), }; for task in taskset { let mut has_depends = false; for dep in resolve::get_dependent_tasks(ctx, &task) .map_err(|_| Error::new(format!("invalid dependency for {}", task)))? { let rdep = driver.rdeps.entry(dep.clone()).or_default(); rdep.push(task.clone()); has_depends = true; } if has_depends { driver.tasks_blocked.insert(task); } else { driver.tasks_runnable.push(task); } } Ok(driver) } fn task_setup(task_ref: &TaskRef<'ctx>) -> Vec<&'static str> { let mut ret = vec![indoc! {" export PATH={{build.prefix}}/sbin:{{build.prefix}}/bin:$PATH cd {{workdir}} export SOURCE_DATE_EPOCH=1 export AR_FOR_BUILD=ar export AS_FOR_BUILD=as export DLLTOOL_FOR_BUILD=dlltool export CC_FOR_BUILD=gcc export CXX_FOR_BUILD=g++ export GCC_FOR_BUILD=gcc export GFORTRAN_FOR_BUILD=gfortran export GOC_FOR_BUILD=goc export LD_FOR_BUILD=ld export LIPO_FOR_BUILD=lipo export NM_FOR_BUILD=nm export OBJCOPY_FOR_BUILD=objcopy export OBJDUMP_FOR_BUILD=objdump export RANLIB_FOR_BUILD=ranlib export STRIP_FOR_BUILD=strip export WINDRES_FOR_BUILD=windres export WINDMC_FOR_BUILD=windmc "}]; if task_ref.args.contains_key("build_to_host") { ret.push(indoc! {" export AR={{build_to_host.cross_compile}}ar export AS={{build_to_host.cross_compile}}as export DLLTOOL={{build_to_host.cross_compile}}dlltool export CC={{build_to_host.cross_compile}}gcc export CXX={{build_to_host.cross_compile}}g++ export GCC={{build_to_host.cross_compile}}gcc export GFORTRAN={{build_to_host.cross_compile}}gfortran export GOC={{build_to_host.cross_compile}}goc export LD={{build_to_host.cross_compile}}ld export LIPO={{build_to_host.cross_compile}}lipo export NM={{build_to_host.cross_compile}}nm export OBJCOPY={{build_to_host.cross_compile}}objcopy export OBJDUMP={{build_to_host.cross_compile}}objdump export RANLIB={{build_to_host.cross_compile}}ranlib export STRIP={{build_to_host.cross_compile}}strip export WINDRES={{build_to_host.cross_compile}}windres export WINDMC={{build_to_host.cross_compile}}windmc "}); } if task_ref.args.contains_key("build_to_target") { ret.push(indoc! {" export AR_FOR_TARGET={{build_to_target.cross_compile}}ar export AS_FOR_TARGET={{build_to_target.cross_compile}}as export DLLTOOL_FOR_TARGET={{build_to_target.cross_compile}}dlltool export CC_FOR_TARGET={{build_to_target.cross_compile}}gcc export CXX_FOR_TARGET={{build_to_target.cross_compile}}g++ export GCC_FOR_TARGET={{build_to_target.cross_compile}}gcc export GFORTRAN_FOR_TARGET={{build_to_target.cross_compile}}gfortran export GOC_FOR_TARGET={{build_to_target.cross_compile}}goc export LD_FOR_TARGET={{build_to_target.cross_compile}}ld export LIPO_FOR_TARGET={{build_to_target.cross_compile}}lipo export NM_FOR_TARGET={{build_to_target.cross_compile}}nm export OBJCOPY_FOR_TARGET={{build_to_target.cross_compile}}objcopy export OBJDUMP_FOR_TARGET={{build_to_target.cross_compile}}objdump export RANLIB_FOR_TARGET={{build_to_target.cross_compile}}ranlib export STRIP_FOR_TARGET={{build_to_target.cross_compile}}strip export WINDRES_FOR_TARGET={{build_to_target.cross_compile}}windres export WINDMC_FOR_TARGET={{build_to_target.cross_compile}}windmc "}); } ret } fn update_runnable(&mut self, task_ref: TaskRef<'ctx>, task_output: TaskOutput) { let rdeps = self.rdeps.get(&task_ref); self.state.tasks_done.insert(task_ref, task_output); for rdep in rdeps.unwrap_or(&Vec::new()) { if !self.tasks_blocked.contains(rdep) { continue; } if self.state.deps_satisfied(rdep) { self.tasks_blocked.remove(rdep); self.tasks_runnable.push(rdep.clone()); } } } fn spawn_task(&self, task_ref: &TaskRef<'ctx>, runner: &Runner) -> Result> { let task_def = &self.state.ctx[task_ref]; if task_def.action.is_empty() { return Ok(None); } let task_deps = self.state.task_deps(task_ref)?; let task_output = task_def .output .iter() .map(|(name, Output { path, .. })| { let output_path = if let Some(path) = path { format!("{}/{}", paths::TASK_DESTDIR, path) } else { paths::TASK_DESTDIR.to_string() }; (name.clone(), output_path) }) .collect(); let inherit_chain = self.state.task_inherit_chain(task_ref); let mut run = Self::task_setup(task_ref); run.push(&task_def.action.run); let command = template::ENGINE .eval(&run.concat(), &task_ref.args) .with_context(|| { format!("Failed to evaluate command template for task {}", task_ref) })?; let rootfs = self.state.ctx.get_rootfs(); let task = Task { label: format!("{:#}", task_ref), command, workdir: paths::TASK_WORKDIR.to_string(), rootfs: rootfs.0, inherit: inherit_chain, depends: task_deps, outputs: task_output, pins: HashMap::from([rootfs.clone()]), }; Ok(Some(runner.spawn(&task))) } fn run_task(&mut self, task_ref: TaskRef<'ctx>, runner: &Runner) -> Result<()> { if let Some(socket) = self.spawn_task(&task_ref, runner)? { assert!(self .tasks_running .insert(socket.as_raw_fd(), (socket, task_ref)) .is_none()); } else { println!("Skipping empty task {:#}", task_ref); self.update_runnable(task_ref, TaskOutput::default()); } Ok(()) } fn run_tasks(&mut self, runner: &Runner) -> Result<()> { while let Some(task_ref) = self.tasks_runnable.pop() { self.run_task(task_ref, runner)?; } Ok(()) } fn wait_for_task(&mut self) -> Result<()> { let mut pollfds: Box<[_]> = self .tasks_running .keys() .copied() .map(|fd| poll::PollFd::new(fd, poll::PollFlags::POLLIN)) .collect(); while poll::poll(&mut pollfds, -1).context("poll()")? == 0 {} for pollfd in &*pollfds { let events = pollfd.revents().expect("Unknown events in poll() return"); if !events.contains(poll::PollFlags::POLLIN) { if events.intersects(!poll::PollFlags::POLLIN) { return Err(Error::new( "Unexpected error status for socket file descriptor", )); } continue; } let fd = pollfd.as_raw_fd(); let (socket, task_ref) = self.tasks_running.remove(&fd).unwrap(); let task_output = Runner::result(&socket)?; self.update_runnable(task_ref, task_output); } Ok(()) } fn is_empty(&self) -> bool { self.tasks_runnable.is_empty() && self.tasks_running.is_empty() } fn is_done(&self) -> bool { self.is_empty() && self.tasks_blocked.is_empty() } pub fn run(&mut self, runner: &Runner) -> Result<()> { while !self.is_empty() { self.run_tasks(runner)?; self.wait_for_task()?; } assert!(self.is_done(), "No runnable tasks left"); self.state.print_summary(); Ok(()) } }