diff options
author | Matthias Schiffer <mschiffer@universe-factory.net> | 2024-04-07 01:10:43 +0200 |
---|---|---|
committer | Matthias Schiffer <mschiffer@universe-factory.net> | 2024-04-07 01:22:12 +0200 |
commit | 1d1a40b55a94c2f3a7477f066035f165e37779e4 (patch) | |
tree | 2ca3ca283f15d4dc84b789ef8519e20084ee4938 /crates | |
parent | b1f6695e5759d39b711ab599726bc1ae61fcacac (diff) | |
download | rebel-1d1a40b55a94c2f3a7477f066035f165e37779e4.tar rebel-1d1a40b55a94c2f3a7477f066035f165e37779e4.zip |
driver: implement "soft" SIGINT handling
On the first SIGINT, spawning new tasks is disabled, but running tasks
are allowed to finish. The second SIGINT will terminate the build
immediately.
Diffstat (limited to 'crates')
-rw-r--r-- | crates/driver/Cargo.toml | 2 | ||||
-rw-r--r-- | crates/driver/src/driver.rs | 78 |
2 files changed, 67 insertions, 13 deletions
diff --git a/crates/driver/Cargo.toml b/crates/driver/Cargo.toml index 673e0ba..df1fb19 100644 --- a/crates/driver/Cargo.toml +++ b/crates/driver/Cargo.toml @@ -17,7 +17,7 @@ enum-kinds = "0.5.1" handlebars = "5.1.2" indoc = "2.0.4" lazy_static = "1.4.0" -nix = { version = "0.28.0", features = ["poll"] } +nix = { version = "0.28.0", features = ["poll", "signal"] } scoped-tls-hkt = "0.1.2" serde = { version = "1", features = ["derive", "rc"] } serde_yaml = "0.9" diff --git a/crates/driver/src/driver.rs b/crates/driver/src/driver.rs index cc4bfbf..5a00882 100644 --- a/crates/driver/src/driver.rs +++ b/crates/driver/src/driver.rs @@ -1,10 +1,17 @@ use std::{ collections::{HashMap, HashSet}, + iter, os::unix::{net::UnixStream, prelude::*}, }; use indoc::indoc; -use nix::poll; +use nix::{ + poll, + sys::{ + signal, + signalfd::{SfdFlags, SignalFd}, + }, +}; use common::{error::*, string_hash::*, types::*}; use runner::Runner; @@ -143,11 +150,18 @@ impl<'ctx> CompletionState<'ctx> { } } +#[derive(Debug)] enum SpawnResult { Spawned(UnixStream), Skipped(TaskOutput), } +#[derive(Debug, PartialEq, Eq, Hash)] +enum TaskWaitResult { + Failed, + Interrupted, +} + #[derive(Debug)] pub struct Driver<'ctx> { rdeps: HashMap<TaskRef<'ctx>, Vec<TaskRef<'ctx>>>, @@ -351,11 +365,13 @@ impl<'ctx> Driver<'ctx> { Ok(()) } - fn wait_for_task(&mut self) -> Result<bool> { + fn wait_for_task(&mut self, signal_fd: &mut SignalFd) -> Result<Option<TaskWaitResult>> { let mut pollfds: Vec<_> = self .tasks_running .values() - .map(|(socket, _)| poll::PollFd::new(socket.as_fd(), poll::PollFlags::POLLIN)) + .map(|(socket, _)| socket.as_fd()) + .chain(iter::once(signal_fd.as_fd())) + .map(|fd| poll::PollFd::new(fd, poll::PollFlags::POLLIN)) .collect(); while poll::poll(&mut pollfds, poll::PollTimeout::NONE).context("poll()")? == 0 {} @@ -380,6 +396,11 @@ impl<'ctx> Driver<'ctx> { continue; } + if fd == signal_fd.as_raw_fd() { + let _signal = signal_fd.read_signal().expect("read_signal()").unwrap(); + return Ok(Some(TaskWaitResult::Interrupted)); + } + let (socket, task_ref) = self.tasks_running.remove(&fd).unwrap(); match Runner::result(&socket) { @@ -388,12 +409,12 @@ impl<'ctx> Driver<'ctx> { } Err(error) => { eprintln!("{}", error); - return Ok(false); + return Ok(Some(TaskWaitResult::Failed)); } } } - Ok(true) + Ok(None) } fn is_done(&self) -> bool { @@ -402,25 +423,58 @@ impl<'ctx> Driver<'ctx> { && self.tasks_running.is_empty() } + fn setup_signalfd() -> Result<SignalFd> { + let mut signals = signal::SigSet::empty(); + signals.add(signal::Signal::SIGINT); + signal::pthread_sigmask(signal::SigmaskHow::SIG_BLOCK, Some(&signals), None) + .expect("pthread_sigmask()"); + SignalFd::with_flags(&signals, SfdFlags::SFD_CLOEXEC) + .context("Failed to create signal file descriptor") + } + + fn raise_sigint() { + let mut signals = signal::SigSet::empty(); + signals.add(signal::Signal::SIGINT); + signal::pthread_sigmask(signal::SigmaskHow::SIG_UNBLOCK, Some(&signals), None) + .expect("pthread_sigmask()"); + signal::raise(signal::Signal::SIGINT).expect("raise()"); + unreachable!(); + } + pub fn run(&mut self, runner: &Runner, keep_going: bool) -> Result<bool> { let mut success = true; + let mut interrupted = false; + + let mut signal_fd = Self::setup_signalfd()?; self.run_tasks(runner)?; while !self.tasks_running.is_empty() { - if !self.wait_for_task()? { - success = false; + match self.wait_for_task(&mut signal_fd)? { + Some(TaskWaitResult::Failed) => { + success = false; + } + Some(TaskWaitResult::Interrupted) => { + if interrupted { + Self::raise_sigint(); + } + eprintln!("Interrupt received, not spawning new tasks. Interrupt again to stop immediately."); + interrupted = true; + } + None => {} } - if success || keep_going { + if !interrupted && (success || keep_going) { self.run_tasks(runner)?; } } - if success { - assert!(self.is_done(), "No runnable tasks left"); - self.state.print_summary(); + if interrupted || !success { + return Ok(false); } - Ok(success) + assert!(self.is_done(), "No runnable tasks left"); + self.state.print_summary(); + + Ok(true) } } |