summaryrefslogtreecommitdiffstats
path: root/crates/rebel/src/driver.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/rebel/src/driver.rs')
-rw-r--r--crates/rebel/src/driver.rs481
1 files changed, 481 insertions, 0 deletions
diff --git a/crates/rebel/src/driver.rs b/crates/rebel/src/driver.rs
new file mode 100644
index 0000000..e4de2a7
--- /dev/null
+++ b/crates/rebel/src/driver.rs
@@ -0,0 +1,481 @@
+use std::{
+ collections::{HashMap, HashSet},
+ iter,
+ os::unix::{net::UnixStream, prelude::*},
+};
+
+use indoc::indoc;
+use nix::{
+ poll,
+ sys::{
+ signal,
+ signalfd::{SfdFlags, SignalFd},
+ },
+};
+
+use rebel_common::{error::*, string_hash::*, types::*};
+use rebel_resolve::{
+ self as resolve,
+ context::{Context, OutputRef, TaskRef},
+ paths,
+ task::*,
+};
+use rebel_runner::Runner;
+
+use crate::template;
+
+#[derive(Debug)]
+pub struct CompletionState<'ctx> {
+ ctx: &'ctx Context,
+ tasks_done: HashMap<TaskRef<'ctx>, TaskOutput>,
+}
+
+impl<'ctx> CompletionState<'ctx> {
+ pub fn new(ctx: &'ctx Context) -> Self {
+ CompletionState {
+ ctx,
+ tasks_done: Default::default(),
+ }
+ }
+
+ // Treats both "depends" and "parent" as dependencies
+ fn deps_satisfied(&self, task_ref: &TaskRef) -> bool {
+ resolve::get_dependent_tasks(self.ctx, task_ref)
+ .map_err(|_| Error::new(format!("invalid dependency for {}", task_ref)))
+ .unwrap()
+ .into_iter()
+ .all(|dep| self.tasks_done.contains_key(&dep))
+ }
+
+ fn fetch_deps(&self, task: &TaskRef<'ctx>) -> Result<Vec<Dependency>> {
+ let task_def = &self.ctx[task];
+ task_def
+ .fetch
+ .iter()
+ .map(|Fetch { name, sha256 }| {
+ Ok(Dependency::Fetch {
+ name: template::ENGINE.eval(name, &task.args).with_context(|| {
+ format!("Failed to evaluate fetch filename for task {}", task)
+ })?,
+ target_dir: paths::TASK_DLDIR.to_string(),
+ sha256: *sha256,
+ })
+ })
+ .collect()
+ }
+
+ fn dep_closure<I>(&self, deps: I, path: &'ctx str) -> impl Iterator<Item = Dependency> + '_
+ where
+ I: IntoIterator<Item = OutputRef<'ctx>>,
+ {
+ resolve::runtime_depends(self.ctx, deps)
+ .expect("invalid runtime depends")
+ .into_iter()
+ .filter_map(|dep| self.tasks_done[&dep.task].outputs.get(dep.output))
+ .map(|&output| Dependency::Task {
+ output,
+ path: path.to_string(),
+ })
+ }
+
+ fn build_deps(&self, task: &TaskRef<'ctx>) -> Result<impl Iterator<Item = Dependency> + '_> {
+ Ok(self.dep_closure(
+ self.ctx
+ .get_build_depends(task)
+ .with_context(|| format!("invalid build depends for {}", task))?,
+ "",
+ ))
+ }
+
+ fn host_deps(&self, task: &TaskRef<'ctx>) -> Result<impl Iterator<Item = Dependency> + '_> {
+ Ok(self.dep_closure(
+ self.ctx
+ .get_host_depends(task)
+ .with_context(|| format!("invalid depends for {}", task))?,
+ paths::TASK_SYSROOT,
+ ))
+ }
+
+ fn task_deps(&self, task: &TaskRef<'ctx>) -> Result<HashSet<Dependency>> {
+ let fetch_deps = self.fetch_deps(task)?.into_iter();
+ let build_deps = self.build_deps(task)?;
+ let host_deps = self.host_deps(task)?;
+
+ Ok(fetch_deps.chain(build_deps).chain(host_deps).collect())
+ }
+
+ fn task_ancestors(&self, task_ref: &TaskRef<'ctx>) -> Vec<LayerHash> {
+ let Some(parent) = self
+ .ctx
+ .get_parent_depend(task_ref)
+ .expect("invalid parent depends")
+ else {
+ return vec![];
+ };
+
+ let mut chain = self.task_ancestors(&parent);
+ if let Some(layer) = self.tasks_done[&parent].layer {
+ chain.push(layer);
+ }
+ chain
+ }
+
+ fn print_summary(&self) {
+ println!();
+ println!("Summary:");
+
+ let mut tasks: Box<[_]> = self.tasks_done.iter().collect();
+ tasks.sort_by_cached_key(|(task, _)| format!("{:#}", task));
+ for (task_ref, task) in tasks.iter() {
+ println!();
+ println!("{:#}", task_ref);
+ if let Some(hash) = task.input_hash {
+ println!(" input: {}", hash);
+ }
+ if let Some(hash) = task.layer {
+ println!(" layer: {}", hash);
+ }
+ if !task.outputs.is_empty() {
+ println!(" outputs:");
+
+ let mut outputs: Box<[_]> = task.outputs.iter().collect();
+ outputs.sort_by_key(|(output, _)| *output);
+ for (output, hash) in outputs.iter() {
+ println!(" {}: {}", output, hash);
+ }
+ }
+ }
+ }
+}
+
+#[derive(Debug)]
+enum SpawnResult {
+ Spawned(UnixStream),
+ Skipped(TaskOutput),
+}
+
+#[derive(Debug, PartialEq, Eq, Hash)]
+enum TaskWaitResult {
+ Failed,
+ Interrupted,
+}
+
+#[derive(Debug)]
+pub struct Driver<'ctx> {
+ rdeps: HashMap<TaskRef<'ctx>, Vec<TaskRef<'ctx>>>,
+ force_run: HashSet<TaskRef<'ctx>>,
+ tasks_blocked: HashSet<TaskRef<'ctx>>,
+ tasks_runnable: Vec<TaskRef<'ctx>>,
+ tasks_running: HashMap<RawFd, (UnixStream, TaskRef<'ctx>)>,
+ state: CompletionState<'ctx>,
+}
+
+impl<'ctx> Driver<'ctx> {
+ pub fn new(
+ ctx: &'ctx Context,
+ taskset: HashSet<TaskRef<'ctx>>,
+ force_run: HashSet<TaskRef<'ctx>>,
+ ) -> Result<Self> {
+ let mut driver = Driver {
+ rdeps: Default::default(),
+ force_run,
+ tasks_blocked: Default::default(),
+ tasks_runnable: Default::default(),
+ tasks_running: Default::default(),
+ state: CompletionState::new(ctx),
+ };
+
+ for task in taskset {
+ let mut has_depends = false;
+ for dep in resolve::get_dependent_tasks(ctx, &task)
+ .map_err(|_| Error::new(format!("invalid dependency for {}", task)))?
+ {
+ let rdep = driver.rdeps.entry(dep.clone()).or_default();
+ rdep.push(task.clone());
+ has_depends = true;
+ }
+
+ if has_depends {
+ driver.tasks_blocked.insert(task);
+ } else {
+ driver.tasks_runnable.push(task);
+ }
+ }
+
+ Ok(driver)
+ }
+
+ const PREAMBLE: &'static str = indoc! {"
+ export PATH={{build.prefix}}/sbin:{{build.prefix}}/bin:$PATH
+ cd {{workdir}}
+
+ export SOURCE_DATE_EPOCH=1
+
+ export AR_FOR_BUILD=ar
+ export AS_FOR_BUILD=as
+ export DLLTOOL_FOR_BUILD=dlltool
+ export CC_FOR_BUILD=gcc
+ export CXX_FOR_BUILD=g++
+ export GCC_FOR_BUILD=gcc
+ export GFORTRAN_FOR_BUILD=gfortran
+ export GOC_FOR_BUILD=goc
+ export LD_FOR_BUILD=ld
+ export LIPO_FOR_BUILD=lipo
+ export NM_FOR_BUILD=nm
+ export OBJCOPY_FOR_BUILD=objcopy
+ export OBJDUMP_FOR_BUILD=objdump
+ export RANLIB_FOR_BUILD=ranlib
+ export STRIP_FOR_BUILD=strip
+ export WINDRES_FOR_BUILD=windres
+ export WINDMC_FOR_BUILD=windmc
+ "};
+ const PREAMBLE_HOST: &'static str = indoc! {"
+ export AR={{build_to_host.cross_compile}}ar
+ export AS={{build_to_host.cross_compile}}as
+ export DLLTOOL={{build_to_host.cross_compile}}dlltool
+ export CC={{build_to_host.cross_compile}}gcc
+ export CXX={{build_to_host.cross_compile}}g++
+ export GCC={{build_to_host.cross_compile}}gcc
+ export GFORTRAN={{build_to_host.cross_compile}}gfortran
+ export GOC={{build_to_host.cross_compile}}goc
+ export LD={{build_to_host.cross_compile}}ld
+ export LIPO={{build_to_host.cross_compile}}lipo
+ export NM={{build_to_host.cross_compile}}nm
+ export OBJCOPY={{build_to_host.cross_compile}}objcopy
+ export OBJDUMP={{build_to_host.cross_compile}}objdump
+ export RANLIB={{build_to_host.cross_compile}}ranlib
+ export STRIP={{build_to_host.cross_compile}}strip
+ export WINDRES={{build_to_host.cross_compile}}windres
+ export WINDMC={{build_to_host.cross_compile}}windmc
+ "};
+ const PREAMBLE_TARGET: &'static str = indoc! {"
+ export AR_FOR_TARGET={{build_to_target.cross_compile}}ar
+ export AS_FOR_TARGET={{build_to_target.cross_compile}}as
+ export DLLTOOL_FOR_TARGET={{build_to_target.cross_compile}}dlltool
+ export CC_FOR_TARGET={{build_to_target.cross_compile}}gcc
+ export CXX_FOR_TARGET={{build_to_target.cross_compile}}g++
+ export GCC_FOR_TARGET={{build_to_target.cross_compile}}gcc
+ export GFORTRAN_FOR_TARGET={{build_to_target.cross_compile}}gfortran
+ export GOC_FOR_TARGET={{build_to_target.cross_compile}}goc
+ export LD_FOR_TARGET={{build_to_target.cross_compile}}ld
+ export LIPO_FOR_TARGET={{build_to_target.cross_compile}}lipo
+ export NM_FOR_TARGET={{build_to_target.cross_compile}}nm
+ export OBJCOPY_FOR_TARGET={{build_to_target.cross_compile}}objcopy
+ export OBJDUMP_FOR_TARGET={{build_to_target.cross_compile}}objdump
+ export RANLIB_FOR_TARGET={{build_to_target.cross_compile}}ranlib
+ export STRIP_FOR_TARGET={{build_to_target.cross_compile}}strip
+ export WINDRES_FOR_TARGET={{build_to_target.cross_compile}}windres
+ export WINDMC_FOR_TARGET={{build_to_target.cross_compile}}windmc
+ "};
+
+ fn task_preamble(task_ref: &TaskRef<'ctx>) -> Vec<&'static str> {
+ let mut ret = vec![Self::PREAMBLE];
+
+ if task_ref.args.contains_key("build_to_host") {
+ ret.push(Self::PREAMBLE_HOST);
+ }
+ if task_ref.args.contains_key("build_to_target") {
+ ret.push(Self::PREAMBLE_TARGET);
+ }
+ ret
+ }
+
+ fn update_runnable(&mut self, task_ref: TaskRef<'ctx>, task_output: TaskOutput) {
+ let rdeps = self.rdeps.get(&task_ref);
+
+ self.state.tasks_done.insert(task_ref, task_output);
+
+ for rdep in rdeps.unwrap_or(&Vec::new()) {
+ if !self.tasks_blocked.contains(rdep) {
+ continue;
+ }
+ if self.state.deps_satisfied(rdep) {
+ self.tasks_blocked.remove(rdep);
+ self.tasks_runnable.push(rdep.clone());
+ }
+ }
+ }
+
+ fn spawn_task(&self, task_ref: &TaskRef<'ctx>, runner: &Runner) -> Result<SpawnResult> {
+ let task_def = &self.state.ctx[task_ref];
+ if task_def.action.is_empty() {
+ println!("Skipping empty task {:#}", task_ref);
+ return Ok(SpawnResult::Skipped(TaskOutput::default()));
+ }
+
+ let task_deps = self.state.task_deps(task_ref)?;
+ let task_output = task_def
+ .output
+ .iter()
+ .map(|(name, Output { path, .. })| {
+ let output_path = if let Some(path) = path {
+ format!("{}/{}", paths::TASK_DESTDIR, path)
+ } else {
+ paths::TASK_DESTDIR.to_string()
+ };
+ (name.clone(), output_path)
+ })
+ .collect();
+
+ let ancestors = self.state.task_ancestors(task_ref);
+
+ let mut run = Self::task_preamble(task_ref);
+ run.push(&task_def.action.run);
+
+ let command = template::ENGINE
+ .eval_sh(&run.concat(), &task_ref.args)
+ .with_context(|| {
+ format!("Failed to evaluate command template for task {}", task_ref)
+ })?;
+
+ let rootfs = self.state.ctx.get_rootfs();
+ let task = Task {
+ label: format!("{:#}", task_ref),
+ command,
+ workdir: paths::TASK_WORKDIR.to_string(),
+ rootfs: rootfs.0,
+ ancestors,
+ depends: task_deps,
+ outputs: task_output,
+ pins: HashMap::from([rootfs.clone()]),
+ force_run: self.force_run.contains(task_ref),
+ };
+
+ Ok(SpawnResult::Spawned(runner.spawn(&task)))
+ }
+
+ fn run_task(&mut self, task_ref: TaskRef<'ctx>, runner: &Runner) -> Result<()> {
+ match self.spawn_task(&task_ref, runner)? {
+ SpawnResult::Spawned(socket) => {
+ assert!(self
+ .tasks_running
+ .insert(socket.as_raw_fd(), (socket, task_ref))
+ .is_none());
+ }
+ SpawnResult::Skipped(result) => {
+ self.update_runnable(task_ref, result);
+ }
+ }
+ Ok(())
+ }
+
+ fn run_tasks(&mut self, runner: &Runner) -> Result<()> {
+ while let Some(task_ref) = self.tasks_runnable.pop() {
+ self.run_task(task_ref, runner)?;
+ }
+ Ok(())
+ }
+
+ fn wait_for_task(&mut self, signal_fd: &mut SignalFd) -> Result<Option<TaskWaitResult>> {
+ let mut pollfds: Vec<_> = self
+ .tasks_running
+ .values()
+ .map(|(socket, _)| socket.as_fd())
+ .chain(iter::once(signal_fd.as_fd()))
+ .map(|fd| poll::PollFd::new(fd, poll::PollFlags::POLLIN))
+ .collect();
+
+ while poll::poll(&mut pollfds, poll::PollTimeout::NONE).context("poll()")? == 0 {}
+
+ let pollevents: Vec<_> = pollfds
+ .into_iter()
+ .map(|pollfd| {
+ (
+ pollfd.as_fd().as_raw_fd(),
+ pollfd.revents().expect("Unknown events in poll() return"),
+ )
+ })
+ .collect();
+
+ for (fd, events) in pollevents {
+ if !events.contains(poll::PollFlags::POLLIN) {
+ if events.intersects(!poll::PollFlags::POLLIN) {
+ return Err(Error::new(
+ "Unexpected error status for socket file descriptor",
+ ));
+ }
+ continue;
+ }
+
+ if fd == signal_fd.as_raw_fd() {
+ let _signal = signal_fd.read_signal().expect("read_signal()").unwrap();
+ return Ok(Some(TaskWaitResult::Interrupted));
+ }
+
+ let (socket, task_ref) = self.tasks_running.remove(&fd).unwrap();
+
+ match Runner::result(&socket) {
+ Ok(task_output) => {
+ self.update_runnable(task_ref, task_output);
+ }
+ Err(error) => {
+ eprintln!("{}", error);
+ return Ok(Some(TaskWaitResult::Failed));
+ }
+ }
+ }
+
+ Ok(None)
+ }
+
+ fn is_done(&self) -> bool {
+ self.tasks_blocked.is_empty()
+ && self.tasks_runnable.is_empty()
+ && self.tasks_running.is_empty()
+ }
+
+ fn setup_signalfd() -> Result<SignalFd> {
+ let mut signals = signal::SigSet::empty();
+ signals.add(signal::Signal::SIGINT);
+ signal::pthread_sigmask(signal::SigmaskHow::SIG_BLOCK, Some(&signals), None)
+ .expect("pthread_sigmask()");
+ SignalFd::with_flags(&signals, SfdFlags::SFD_CLOEXEC)
+ .context("Failed to create signal file descriptor")
+ }
+
+ fn raise_sigint() {
+ let mut signals = signal::SigSet::empty();
+ signals.add(signal::Signal::SIGINT);
+ signal::pthread_sigmask(signal::SigmaskHow::SIG_UNBLOCK, Some(&signals), None)
+ .expect("pthread_sigmask()");
+ signal::raise(signal::Signal::SIGINT).expect("raise()");
+ unreachable!();
+ }
+
+ pub fn run(&mut self, runner: &Runner, keep_going: bool) -> Result<bool> {
+ let mut success = true;
+ let mut interrupted = false;
+
+ let mut signal_fd = Self::setup_signalfd()?;
+
+ self.run_tasks(runner)?;
+
+ while !self.tasks_running.is_empty() {
+ match self.wait_for_task(&mut signal_fd)? {
+ Some(TaskWaitResult::Failed) => {
+ success = false;
+ }
+ Some(TaskWaitResult::Interrupted) => {
+ if interrupted {
+ Self::raise_sigint();
+ }
+ eprintln!("Interrupt received, not spawning new tasks. Interrupt again to stop immediately.");
+ interrupted = true;
+ }
+ None => {}
+ }
+ if !interrupted && (success || keep_going) {
+ self.run_tasks(runner)?;
+ }
+ }
+
+ if interrupted || !success {
+ return Ok(false);
+ }
+
+ assert!(self.is_done(), "No runnable tasks left");
+ self.state.print_summary();
+
+ Ok(true)
+ }
+}