use std::{ collections::{HashMap, HashSet}, iter, os::unix::{net::UnixStream, prelude::*}, }; use indoc::indoc; use nix::{ poll, sys::{ signal, signalfd::{SfdFlags, SignalFd}, }, }; use common::{error::*, string_hash::*, types::*}; use runner::Runner; use crate::{ context::{Context, OutputRef, TaskRef}, paths, resolve, task::*, template, }; #[derive(Debug)] pub struct CompletionState<'ctx> { ctx: &'ctx Context, tasks_done: HashMap, TaskOutput>, } impl<'ctx> CompletionState<'ctx> { pub fn new(ctx: &'ctx Context) -> Self { CompletionState { ctx, tasks_done: Default::default(), } } // Treats both "depends" and "parent" as dependencies fn deps_satisfied(&self, task_ref: &TaskRef) -> bool { resolve::get_dependent_tasks(self.ctx, task_ref) .map_err(|_| Error::new(format!("invalid dependency for {}", task_ref))) .unwrap() .into_iter() .all(|dep| self.tasks_done.contains_key(&dep)) } fn fetch_deps(&self, task: &TaskRef<'ctx>) -> Result> { let task_def = &self.ctx[task]; task_def .fetch .iter() .map(|Fetch { name, sha256 }| { Ok(Dependency::Fetch { name: template::ENGINE.eval(name, &task.args).with_context(|| { format!("Failed to evaluate fetch filename for task {}", task) })?, target_dir: paths::TASK_DLDIR.to_string(), sha256: *sha256, }) }) .collect() } fn dep_closure(&self, deps: I, path: &'ctx str) -> impl Iterator + '_ where I: IntoIterator>, { resolve::runtime_depends(self.ctx, deps) .expect("invalid runtime depends") .into_iter() .filter_map(|dep| self.tasks_done[&dep.task].outputs.get(dep.output)) .map(|&output| Dependency::Task { output, path: path.to_string(), }) } fn build_deps(&self, task: &TaskRef<'ctx>) -> Result + '_> { Ok(self.dep_closure( self.ctx .get_build_depends(task) .with_context(|| format!("invalid build depends for {}", task))?, "", )) } fn host_deps(&self, task: &TaskRef<'ctx>) -> Result + '_> { Ok(self.dep_closure( self.ctx .get_host_depends(task) .with_context(|| format!("invalid depends for {}", task))?, paths::TASK_SYSROOT, )) } fn task_deps(&self, task: &TaskRef<'ctx>) -> Result> { let fetch_deps = self.fetch_deps(task)?.into_iter(); let build_deps = self.build_deps(task)?; let host_deps = self.host_deps(task)?; Ok(fetch_deps.chain(build_deps).chain(host_deps).collect()) } fn task_ancestors(&self, task_ref: &TaskRef<'ctx>) -> Vec { let Some(parent) = self .ctx .get_parent_depend(task_ref) .expect("invalid parent depends") else { return vec![]; }; let mut chain = self.task_ancestors(&parent); if let Some(layer) = self.tasks_done[&parent].layer { chain.push(layer); } chain } fn print_summary(&self) { println!(); println!("Summary:"); let mut tasks: Box<[_]> = self.tasks_done.iter().collect(); tasks.sort_by_cached_key(|(task, _)| format!("{:#}", task)); for (task_ref, task) in tasks.iter() { println!(); println!("{:#}", task_ref); if let Some(hash) = task.input_hash { println!(" input: {}", hash); } if let Some(hash) = task.layer { println!(" layer: {}", hash); } if !task.outputs.is_empty() { println!(" outputs:"); let mut outputs: Box<[_]> = task.outputs.iter().collect(); outputs.sort_by_key(|(output, _)| *output); for (output, hash) in outputs.iter() { println!(" {}: {}", output, hash); } } } } } #[derive(Debug)] enum SpawnResult { Spawned(UnixStream), Skipped(TaskOutput), } #[derive(Debug, PartialEq, Eq, Hash)] enum TaskWaitResult { Failed, Interrupted, } #[derive(Debug)] pub struct Driver<'ctx> { rdeps: HashMap, Vec>>, force_run: HashSet>, tasks_blocked: HashSet>, tasks_runnable: Vec>, tasks_running: HashMap)>, state: CompletionState<'ctx>, } impl<'ctx> Driver<'ctx> { pub fn new( ctx: &'ctx Context, taskset: HashSet>, force_run: HashSet>, ) -> Result { let mut driver = Driver { rdeps: Default::default(), force_run, tasks_blocked: Default::default(), tasks_runnable: Default::default(), tasks_running: Default::default(), state: CompletionState::new(ctx), }; for task in taskset { let mut has_depends = false; for dep in resolve::get_dependent_tasks(ctx, &task) .map_err(|_| Error::new(format!("invalid dependency for {}", task)))? { let rdep = driver.rdeps.entry(dep.clone()).or_default(); rdep.push(task.clone()); has_depends = true; } if has_depends { driver.tasks_blocked.insert(task); } else { driver.tasks_runnable.push(task); } } Ok(driver) } fn task_setup(task_ref: &TaskRef<'ctx>) -> Vec<&'static str> { let mut ret = vec![indoc! {" export PATH={{build.prefix}}/sbin:{{build.prefix}}/bin:$PATH cd {{workdir}} export SOURCE_DATE_EPOCH=1 export AR_FOR_BUILD=ar export AS_FOR_BUILD=as export DLLTOOL_FOR_BUILD=dlltool export CC_FOR_BUILD=gcc export CXX_FOR_BUILD=g++ export GCC_FOR_BUILD=gcc export GFORTRAN_FOR_BUILD=gfortran export GOC_FOR_BUILD=goc export LD_FOR_BUILD=ld export LIPO_FOR_BUILD=lipo export NM_FOR_BUILD=nm export OBJCOPY_FOR_BUILD=objcopy export OBJDUMP_FOR_BUILD=objdump export RANLIB_FOR_BUILD=ranlib export STRIP_FOR_BUILD=strip export WINDRES_FOR_BUILD=windres export WINDMC_FOR_BUILD=windmc "}]; if task_ref.args.contains_key("build_to_host") { ret.push(indoc! {" export AR={{build_to_host.cross_compile}}ar export AS={{build_to_host.cross_compile}}as export DLLTOOL={{build_to_host.cross_compile}}dlltool export CC={{build_to_host.cross_compile}}gcc export CXX={{build_to_host.cross_compile}}g++ export GCC={{build_to_host.cross_compile}}gcc export GFORTRAN={{build_to_host.cross_compile}}gfortran export GOC={{build_to_host.cross_compile}}goc export LD={{build_to_host.cross_compile}}ld export LIPO={{build_to_host.cross_compile}}lipo export NM={{build_to_host.cross_compile}}nm export OBJCOPY={{build_to_host.cross_compile}}objcopy export OBJDUMP={{build_to_host.cross_compile}}objdump export RANLIB={{build_to_host.cross_compile}}ranlib export STRIP={{build_to_host.cross_compile}}strip export WINDRES={{build_to_host.cross_compile}}windres export WINDMC={{build_to_host.cross_compile}}windmc "}); } if task_ref.args.contains_key("build_to_target") { ret.push(indoc! {" export AR_FOR_TARGET={{build_to_target.cross_compile}}ar export AS_FOR_TARGET={{build_to_target.cross_compile}}as export DLLTOOL_FOR_TARGET={{build_to_target.cross_compile}}dlltool export CC_FOR_TARGET={{build_to_target.cross_compile}}gcc export CXX_FOR_TARGET={{build_to_target.cross_compile}}g++ export GCC_FOR_TARGET={{build_to_target.cross_compile}}gcc export GFORTRAN_FOR_TARGET={{build_to_target.cross_compile}}gfortran export GOC_FOR_TARGET={{build_to_target.cross_compile}}goc export LD_FOR_TARGET={{build_to_target.cross_compile}}ld export LIPO_FOR_TARGET={{build_to_target.cross_compile}}lipo export NM_FOR_TARGET={{build_to_target.cross_compile}}nm export OBJCOPY_FOR_TARGET={{build_to_target.cross_compile}}objcopy export OBJDUMP_FOR_TARGET={{build_to_target.cross_compile}}objdump export RANLIB_FOR_TARGET={{build_to_target.cross_compile}}ranlib export STRIP_FOR_TARGET={{build_to_target.cross_compile}}strip export WINDRES_FOR_TARGET={{build_to_target.cross_compile}}windres export WINDMC_FOR_TARGET={{build_to_target.cross_compile}}windmc "}); } ret } fn update_runnable(&mut self, task_ref: TaskRef<'ctx>, task_output: TaskOutput) { let rdeps = self.rdeps.get(&task_ref); self.state.tasks_done.insert(task_ref, task_output); for rdep in rdeps.unwrap_or(&Vec::new()) { if !self.tasks_blocked.contains(rdep) { continue; } if self.state.deps_satisfied(rdep) { self.tasks_blocked.remove(rdep); self.tasks_runnable.push(rdep.clone()); } } } fn spawn_task(&self, task_ref: &TaskRef<'ctx>, runner: &Runner) -> Result { let task_def = &self.state.ctx[task_ref]; if task_def.action.is_empty() { println!("Skipping empty task {:#}", task_ref); return Ok(SpawnResult::Skipped(TaskOutput::default())); } let task_deps = self.state.task_deps(task_ref)?; let task_output = task_def .output .iter() .map(|(name, Output { path, .. })| { let output_path = if let Some(path) = path { format!("{}/{}", paths::TASK_DESTDIR, path) } else { paths::TASK_DESTDIR.to_string() }; (name.clone(), output_path) }) .collect(); let ancestors = self.state.task_ancestors(task_ref); let mut run = Self::task_setup(task_ref); run.push(&task_def.action.run); let command = template::ENGINE .eval_sh(&run.concat(), &task_ref.args) .with_context(|| { format!("Failed to evaluate command template for task {}", task_ref) })?; let rootfs = self.state.ctx.get_rootfs(); let task = Task { label: format!("{:#}", task_ref), command, workdir: paths::TASK_WORKDIR.to_string(), rootfs: rootfs.0, ancestors, depends: task_deps, outputs: task_output, pins: HashMap::from([rootfs.clone()]), force_run: self.force_run.contains(task_ref), }; Ok(SpawnResult::Spawned(runner.spawn(&task))) } fn run_task(&mut self, task_ref: TaskRef<'ctx>, runner: &Runner) -> Result<()> { match self.spawn_task(&task_ref, runner)? { SpawnResult::Spawned(socket) => { assert!(self .tasks_running .insert(socket.as_raw_fd(), (socket, task_ref)) .is_none()); } SpawnResult::Skipped(result) => { self.update_runnable(task_ref, result); } } Ok(()) } fn run_tasks(&mut self, runner: &Runner) -> Result<()> { while let Some(task_ref) = self.tasks_runnable.pop() { self.run_task(task_ref, runner)?; } Ok(()) } fn wait_for_task(&mut self, signal_fd: &mut SignalFd) -> Result> { let mut pollfds: Vec<_> = self .tasks_running .values() .map(|(socket, _)| socket.as_fd()) .chain(iter::once(signal_fd.as_fd())) .map(|fd| poll::PollFd::new(fd, poll::PollFlags::POLLIN)) .collect(); while poll::poll(&mut pollfds, poll::PollTimeout::NONE).context("poll()")? == 0 {} let pollevents: Vec<_> = pollfds .into_iter() .map(|pollfd| { ( pollfd.as_fd().as_raw_fd(), pollfd.revents().expect("Unknown events in poll() return"), ) }) .collect(); for (fd, events) in pollevents { if !events.contains(poll::PollFlags::POLLIN) { if events.intersects(!poll::PollFlags::POLLIN) { return Err(Error::new( "Unexpected error status for socket file descriptor", )); } continue; } if fd == signal_fd.as_raw_fd() { let _signal = signal_fd.read_signal().expect("read_signal()").unwrap(); return Ok(Some(TaskWaitResult::Interrupted)); } let (socket, task_ref) = self.tasks_running.remove(&fd).unwrap(); match Runner::result(&socket) { Ok(task_output) => { self.update_runnable(task_ref, task_output); } Err(error) => { eprintln!("{}", error); return Ok(Some(TaskWaitResult::Failed)); } } } Ok(None) } fn is_done(&self) -> bool { self.tasks_blocked.is_empty() && self.tasks_runnable.is_empty() && self.tasks_running.is_empty() } fn setup_signalfd() -> Result { let mut signals = signal::SigSet::empty(); signals.add(signal::Signal::SIGINT); signal::pthread_sigmask(signal::SigmaskHow::SIG_BLOCK, Some(&signals), None) .expect("pthread_sigmask()"); SignalFd::with_flags(&signals, SfdFlags::SFD_CLOEXEC) .context("Failed to create signal file descriptor") } fn raise_sigint() { let mut signals = signal::SigSet::empty(); signals.add(signal::Signal::SIGINT); signal::pthread_sigmask(signal::SigmaskHow::SIG_UNBLOCK, Some(&signals), None) .expect("pthread_sigmask()"); signal::raise(signal::Signal::SIGINT).expect("raise()"); unreachable!(); } pub fn run(&mut self, runner: &Runner, keep_going: bool) -> Result { let mut success = true; let mut interrupted = false; let mut signal_fd = Self::setup_signalfd()?; self.run_tasks(runner)?; while !self.tasks_running.is_empty() { match self.wait_for_task(&mut signal_fd)? { Some(TaskWaitResult::Failed) => { success = false; } Some(TaskWaitResult::Interrupted) => { if interrupted { Self::raise_sigint(); } eprintln!("Interrupt received, not spawning new tasks. Interrupt again to stop immediately."); interrupted = true; } None => {} } if !interrupted && (success || keep_going) { self.run_tasks(runner)?; } } if interrupted || !success { return Ok(false); } assert!(self.is_done(), "No runnable tasks left"); self.state.print_summary(); Ok(true) } }