diff options
Diffstat (limited to 'crates/rebel/src/driver.rs')
-rw-r--r-- | crates/rebel/src/driver.rs | 481 |
1 files changed, 481 insertions, 0 deletions
diff --git a/crates/rebel/src/driver.rs b/crates/rebel/src/driver.rs new file mode 100644 index 0000000..e4de2a7 --- /dev/null +++ b/crates/rebel/src/driver.rs @@ -0,0 +1,481 @@ +use std::{ + collections::{HashMap, HashSet}, + iter, + os::unix::{net::UnixStream, prelude::*}, +}; + +use indoc::indoc; +use nix::{ + poll, + sys::{ + signal, + signalfd::{SfdFlags, SignalFd}, + }, +}; + +use rebel_common::{error::*, string_hash::*, types::*}; +use rebel_resolve::{ + self as resolve, + context::{Context, OutputRef, TaskRef}, + paths, + task::*, +}; +use rebel_runner::Runner; + +use crate::template; + +#[derive(Debug)] +pub struct CompletionState<'ctx> { + ctx: &'ctx Context, + tasks_done: HashMap<TaskRef<'ctx>, TaskOutput>, +} + +impl<'ctx> CompletionState<'ctx> { + pub fn new(ctx: &'ctx Context) -> Self { + CompletionState { + ctx, + tasks_done: Default::default(), + } + } + + // Treats both "depends" and "parent" as dependencies + fn deps_satisfied(&self, task_ref: &TaskRef) -> bool { + resolve::get_dependent_tasks(self.ctx, task_ref) + .map_err(|_| Error::new(format!("invalid dependency for {}", task_ref))) + .unwrap() + .into_iter() + .all(|dep| self.tasks_done.contains_key(&dep)) + } + + fn fetch_deps(&self, task: &TaskRef<'ctx>) -> Result<Vec<Dependency>> { + let task_def = &self.ctx[task]; + task_def + .fetch + .iter() + .map(|Fetch { name, sha256 }| { + Ok(Dependency::Fetch { + name: template::ENGINE.eval(name, &task.args).with_context(|| { + format!("Failed to evaluate fetch filename for task {}", task) + })?, + target_dir: paths::TASK_DLDIR.to_string(), + sha256: *sha256, + }) + }) + .collect() + } + + fn dep_closure<I>(&self, deps: I, path: &'ctx str) -> impl Iterator<Item = Dependency> + '_ + where + I: IntoIterator<Item = OutputRef<'ctx>>, + { + resolve::runtime_depends(self.ctx, deps) + .expect("invalid runtime depends") + .into_iter() + .filter_map(|dep| self.tasks_done[&dep.task].outputs.get(dep.output)) + .map(|&output| Dependency::Task { + output, + path: path.to_string(), + }) + } + + fn build_deps(&self, task: &TaskRef<'ctx>) -> Result<impl Iterator<Item = Dependency> + '_> { + Ok(self.dep_closure( + self.ctx + .get_build_depends(task) + .with_context(|| format!("invalid build depends for {}", task))?, + "", + )) + } + + fn host_deps(&self, task: &TaskRef<'ctx>) -> Result<impl Iterator<Item = Dependency> + '_> { + Ok(self.dep_closure( + self.ctx + .get_host_depends(task) + .with_context(|| format!("invalid depends for {}", task))?, + paths::TASK_SYSROOT, + )) + } + + fn task_deps(&self, task: &TaskRef<'ctx>) -> Result<HashSet<Dependency>> { + let fetch_deps = self.fetch_deps(task)?.into_iter(); + let build_deps = self.build_deps(task)?; + let host_deps = self.host_deps(task)?; + + Ok(fetch_deps.chain(build_deps).chain(host_deps).collect()) + } + + fn task_ancestors(&self, task_ref: &TaskRef<'ctx>) -> Vec<LayerHash> { + let Some(parent) = self + .ctx + .get_parent_depend(task_ref) + .expect("invalid parent depends") + else { + return vec![]; + }; + + let mut chain = self.task_ancestors(&parent); + if let Some(layer) = self.tasks_done[&parent].layer { + chain.push(layer); + } + chain + } + + fn print_summary(&self) { + println!(); + println!("Summary:"); + + let mut tasks: Box<[_]> = self.tasks_done.iter().collect(); + tasks.sort_by_cached_key(|(task, _)| format!("{:#}", task)); + for (task_ref, task) in tasks.iter() { + println!(); + println!("{:#}", task_ref); + if let Some(hash) = task.input_hash { + println!(" input: {}", hash); + } + if let Some(hash) = task.layer { + println!(" layer: {}", hash); + } + if !task.outputs.is_empty() { + println!(" outputs:"); + + let mut outputs: Box<[_]> = task.outputs.iter().collect(); + outputs.sort_by_key(|(output, _)| *output); + for (output, hash) in outputs.iter() { + println!(" {}: {}", output, hash); + } + } + } + } +} + +#[derive(Debug)] +enum SpawnResult { + Spawned(UnixStream), + Skipped(TaskOutput), +} + +#[derive(Debug, PartialEq, Eq, Hash)] +enum TaskWaitResult { + Failed, + Interrupted, +} + +#[derive(Debug)] +pub struct Driver<'ctx> { + rdeps: HashMap<TaskRef<'ctx>, Vec<TaskRef<'ctx>>>, + force_run: HashSet<TaskRef<'ctx>>, + tasks_blocked: HashSet<TaskRef<'ctx>>, + tasks_runnable: Vec<TaskRef<'ctx>>, + tasks_running: HashMap<RawFd, (UnixStream, TaskRef<'ctx>)>, + state: CompletionState<'ctx>, +} + +impl<'ctx> Driver<'ctx> { + pub fn new( + ctx: &'ctx Context, + taskset: HashSet<TaskRef<'ctx>>, + force_run: HashSet<TaskRef<'ctx>>, + ) -> Result<Self> { + let mut driver = Driver { + rdeps: Default::default(), + force_run, + tasks_blocked: Default::default(), + tasks_runnable: Default::default(), + tasks_running: Default::default(), + state: CompletionState::new(ctx), + }; + + for task in taskset { + let mut has_depends = false; + for dep in resolve::get_dependent_tasks(ctx, &task) + .map_err(|_| Error::new(format!("invalid dependency for {}", task)))? + { + let rdep = driver.rdeps.entry(dep.clone()).or_default(); + rdep.push(task.clone()); + has_depends = true; + } + + if has_depends { + driver.tasks_blocked.insert(task); + } else { + driver.tasks_runnable.push(task); + } + } + + Ok(driver) + } + + const PREAMBLE: &'static str = indoc! {" + export PATH={{build.prefix}}/sbin:{{build.prefix}}/bin:$PATH + cd {{workdir}} + + export SOURCE_DATE_EPOCH=1 + + export AR_FOR_BUILD=ar + export AS_FOR_BUILD=as + export DLLTOOL_FOR_BUILD=dlltool + export CC_FOR_BUILD=gcc + export CXX_FOR_BUILD=g++ + export GCC_FOR_BUILD=gcc + export GFORTRAN_FOR_BUILD=gfortran + export GOC_FOR_BUILD=goc + export LD_FOR_BUILD=ld + export LIPO_FOR_BUILD=lipo + export NM_FOR_BUILD=nm + export OBJCOPY_FOR_BUILD=objcopy + export OBJDUMP_FOR_BUILD=objdump + export RANLIB_FOR_BUILD=ranlib + export STRIP_FOR_BUILD=strip + export WINDRES_FOR_BUILD=windres + export WINDMC_FOR_BUILD=windmc + "}; + const PREAMBLE_HOST: &'static str = indoc! {" + export AR={{build_to_host.cross_compile}}ar + export AS={{build_to_host.cross_compile}}as + export DLLTOOL={{build_to_host.cross_compile}}dlltool + export CC={{build_to_host.cross_compile}}gcc + export CXX={{build_to_host.cross_compile}}g++ + export GCC={{build_to_host.cross_compile}}gcc + export GFORTRAN={{build_to_host.cross_compile}}gfortran + export GOC={{build_to_host.cross_compile}}goc + export LD={{build_to_host.cross_compile}}ld + export LIPO={{build_to_host.cross_compile}}lipo + export NM={{build_to_host.cross_compile}}nm + export OBJCOPY={{build_to_host.cross_compile}}objcopy + export OBJDUMP={{build_to_host.cross_compile}}objdump + export RANLIB={{build_to_host.cross_compile}}ranlib + export STRIP={{build_to_host.cross_compile}}strip + export WINDRES={{build_to_host.cross_compile}}windres + export WINDMC={{build_to_host.cross_compile}}windmc + "}; + const PREAMBLE_TARGET: &'static str = indoc! {" + export AR_FOR_TARGET={{build_to_target.cross_compile}}ar + export AS_FOR_TARGET={{build_to_target.cross_compile}}as + export DLLTOOL_FOR_TARGET={{build_to_target.cross_compile}}dlltool + export CC_FOR_TARGET={{build_to_target.cross_compile}}gcc + export CXX_FOR_TARGET={{build_to_target.cross_compile}}g++ + export GCC_FOR_TARGET={{build_to_target.cross_compile}}gcc + export GFORTRAN_FOR_TARGET={{build_to_target.cross_compile}}gfortran + export GOC_FOR_TARGET={{build_to_target.cross_compile}}goc + export LD_FOR_TARGET={{build_to_target.cross_compile}}ld + export LIPO_FOR_TARGET={{build_to_target.cross_compile}}lipo + export NM_FOR_TARGET={{build_to_target.cross_compile}}nm + export OBJCOPY_FOR_TARGET={{build_to_target.cross_compile}}objcopy + export OBJDUMP_FOR_TARGET={{build_to_target.cross_compile}}objdump + export RANLIB_FOR_TARGET={{build_to_target.cross_compile}}ranlib + export STRIP_FOR_TARGET={{build_to_target.cross_compile}}strip + export WINDRES_FOR_TARGET={{build_to_target.cross_compile}}windres + export WINDMC_FOR_TARGET={{build_to_target.cross_compile}}windmc + "}; + + fn task_preamble(task_ref: &TaskRef<'ctx>) -> Vec<&'static str> { + let mut ret = vec![Self::PREAMBLE]; + + if task_ref.args.contains_key("build_to_host") { + ret.push(Self::PREAMBLE_HOST); + } + if task_ref.args.contains_key("build_to_target") { + ret.push(Self::PREAMBLE_TARGET); + } + ret + } + + fn update_runnable(&mut self, task_ref: TaskRef<'ctx>, task_output: TaskOutput) { + let rdeps = self.rdeps.get(&task_ref); + + self.state.tasks_done.insert(task_ref, task_output); + + for rdep in rdeps.unwrap_or(&Vec::new()) { + if !self.tasks_blocked.contains(rdep) { + continue; + } + if self.state.deps_satisfied(rdep) { + self.tasks_blocked.remove(rdep); + self.tasks_runnable.push(rdep.clone()); + } + } + } + + fn spawn_task(&self, task_ref: &TaskRef<'ctx>, runner: &Runner) -> Result<SpawnResult> { + let task_def = &self.state.ctx[task_ref]; + if task_def.action.is_empty() { + println!("Skipping empty task {:#}", task_ref); + return Ok(SpawnResult::Skipped(TaskOutput::default())); + } + + let task_deps = self.state.task_deps(task_ref)?; + let task_output = task_def + .output + .iter() + .map(|(name, Output { path, .. })| { + let output_path = if let Some(path) = path { + format!("{}/{}", paths::TASK_DESTDIR, path) + } else { + paths::TASK_DESTDIR.to_string() + }; + (name.clone(), output_path) + }) + .collect(); + + let ancestors = self.state.task_ancestors(task_ref); + + let mut run = Self::task_preamble(task_ref); + run.push(&task_def.action.run); + + let command = template::ENGINE + .eval_sh(&run.concat(), &task_ref.args) + .with_context(|| { + format!("Failed to evaluate command template for task {}", task_ref) + })?; + + let rootfs = self.state.ctx.get_rootfs(); + let task = Task { + label: format!("{:#}", task_ref), + command, + workdir: paths::TASK_WORKDIR.to_string(), + rootfs: rootfs.0, + ancestors, + depends: task_deps, + outputs: task_output, + pins: HashMap::from([rootfs.clone()]), + force_run: self.force_run.contains(task_ref), + }; + + Ok(SpawnResult::Spawned(runner.spawn(&task))) + } + + fn run_task(&mut self, task_ref: TaskRef<'ctx>, runner: &Runner) -> Result<()> { + match self.spawn_task(&task_ref, runner)? { + SpawnResult::Spawned(socket) => { + assert!(self + .tasks_running + .insert(socket.as_raw_fd(), (socket, task_ref)) + .is_none()); + } + SpawnResult::Skipped(result) => { + self.update_runnable(task_ref, result); + } + } + Ok(()) + } + + fn run_tasks(&mut self, runner: &Runner) -> Result<()> { + while let Some(task_ref) = self.tasks_runnable.pop() { + self.run_task(task_ref, runner)?; + } + Ok(()) + } + + fn wait_for_task(&mut self, signal_fd: &mut SignalFd) -> Result<Option<TaskWaitResult>> { + let mut pollfds: Vec<_> = self + .tasks_running + .values() + .map(|(socket, _)| socket.as_fd()) + .chain(iter::once(signal_fd.as_fd())) + .map(|fd| poll::PollFd::new(fd, poll::PollFlags::POLLIN)) + .collect(); + + while poll::poll(&mut pollfds, poll::PollTimeout::NONE).context("poll()")? == 0 {} + + let pollevents: Vec<_> = pollfds + .into_iter() + .map(|pollfd| { + ( + pollfd.as_fd().as_raw_fd(), + pollfd.revents().expect("Unknown events in poll() return"), + ) + }) + .collect(); + + for (fd, events) in pollevents { + if !events.contains(poll::PollFlags::POLLIN) { + if events.intersects(!poll::PollFlags::POLLIN) { + return Err(Error::new( + "Unexpected error status for socket file descriptor", + )); + } + continue; + } + + if fd == signal_fd.as_raw_fd() { + let _signal = signal_fd.read_signal().expect("read_signal()").unwrap(); + return Ok(Some(TaskWaitResult::Interrupted)); + } + + let (socket, task_ref) = self.tasks_running.remove(&fd).unwrap(); + + match Runner::result(&socket) { + Ok(task_output) => { + self.update_runnable(task_ref, task_output); + } + Err(error) => { + eprintln!("{}", error); + return Ok(Some(TaskWaitResult::Failed)); + } + } + } + + Ok(None) + } + + fn is_done(&self) -> bool { + self.tasks_blocked.is_empty() + && self.tasks_runnable.is_empty() + && self.tasks_running.is_empty() + } + + fn setup_signalfd() -> Result<SignalFd> { + let mut signals = signal::SigSet::empty(); + signals.add(signal::Signal::SIGINT); + signal::pthread_sigmask(signal::SigmaskHow::SIG_BLOCK, Some(&signals), None) + .expect("pthread_sigmask()"); + SignalFd::with_flags(&signals, SfdFlags::SFD_CLOEXEC) + .context("Failed to create signal file descriptor") + } + + fn raise_sigint() { + let mut signals = signal::SigSet::empty(); + signals.add(signal::Signal::SIGINT); + signal::pthread_sigmask(signal::SigmaskHow::SIG_UNBLOCK, Some(&signals), None) + .expect("pthread_sigmask()"); + signal::raise(signal::Signal::SIGINT).expect("raise()"); + unreachable!(); + } + + pub fn run(&mut self, runner: &Runner, keep_going: bool) -> Result<bool> { + let mut success = true; + let mut interrupted = false; + + let mut signal_fd = Self::setup_signalfd()?; + + self.run_tasks(runner)?; + + while !self.tasks_running.is_empty() { + match self.wait_for_task(&mut signal_fd)? { + Some(TaskWaitResult::Failed) => { + success = false; + } + Some(TaskWaitResult::Interrupted) => { + if interrupted { + Self::raise_sigint(); + } + eprintln!("Interrupt received, not spawning new tasks. Interrupt again to stop immediately."); + interrupted = true; + } + None => {} + } + if !interrupted && (success || keep_going) { + self.run_tasks(runner)?; + } + } + + if interrupted || !success { + return Ok(false); + } + + assert!(self.is_done(), "No runnable tasks left"); + self.state.print_summary(); + + Ok(true) + } +} |