summaryrefslogtreecommitdiffstats
path: root/crates
diff options
context:
space:
mode:
authorMatthias Schiffer <mschiffer@universe-factory.net>2021-10-26 21:18:00 +0200
committerMatthias Schiffer <mschiffer@universe-factory.net>2021-10-26 21:48:18 +0200
commitae79ea1276f56872da590dd611c7b12b4db67c27 (patch)
tree788c7e5c54c88e3f2a7a654f358b204079e9f6c3 /crates
parente2c11da16f401b47feb627571f394a23c9889dbf (diff)
downloadrebel-ae79ea1276f56872da590dd611c7b12b4db67c27.tar
rebel-ae79ea1276f56872da590dd611c7b12b4db67c27.zip
runner: handle reaping and requests in the poll loop
Diffstat (limited to 'crates')
-rw-r--r--crates/runner/src/lib.rs86
1 files changed, 49 insertions, 37 deletions
diff --git a/crates/runner/src/lib.rs b/crates/runner/src/lib.rs
index 51e0e17..8104daa 100644
--- a/crates/runner/src/lib.rs
+++ b/crates/runner/src/lib.rs
@@ -35,6 +35,16 @@ pub struct Options {
pub jobs: Option<usize>,
}
+fn handle_sigchld() {
+ loop {
+ let status = match wait::waitpid(Pid::from_raw(-1), Some(wait::WaitPidFlag::WNOHANG)) {
+ Ok(wait::WaitStatus::StillAlive) | Err(Errno::ECHILD) => return,
+ res => res.expect("waitpid()"),
+ };
+ let _pid = status.pid().unwrap();
+ }
+}
+
fn handle_request(
jobserver: Jobserver,
socket: UnixSeqpacketConn,
@@ -42,7 +52,6 @@ fn handle_request(
) -> (Jobserver, UnixSeqpacketConn) {
let child = |(mut jobserver, socket): (Jobserver, UnixSeqpacketConn)| {
drop(socket);
- unsafe { signal::signal(signal::Signal::SIGCHLD, signal::SigHandler::SigDfl) }.unwrap();
let task: Task =
bincode::deserialize_from(&request_socket).expect("Failed to decode task description");
@@ -69,26 +78,27 @@ fn handle_request(
.1
}
-fn runner_loop(mut socket: UnixSeqpacketConn, options: &Options) {
- let jobs = options
- .jobs
- .unwrap_or_else(|| unix::nproc().expect("Failed to get number of available CPUs"));
- let mut jobserver = Jobserver::new(jobs).expect("Failed to initialize jobserver pipe");
-
+fn handle_socket(
+ jobserver: Jobserver,
+ socket: UnixSeqpacketConn,
+) -> Option<(Jobserver, UnixSeqpacketConn)> {
let mut fd = 0;
- while let Ok((1, _, n_fd)) = socket.recv_fds(&mut [0], slice::from_mut(&mut fd)) {
- assert!(n_fd == 1);
-
- let request_socket = unsafe { UnixStream::from_raw_fd(fd) };
-
- let ret = handle_request(jobserver, socket, request_socket);
- jobserver = ret.0;
- socket = ret.1;
+ match socket
+ .recv_fds(&mut [0], slice::from_mut(&mut fd))
+ .expect("recv_fds()")
+ {
+ (1, _, n_fd) => {
+ assert!(n_fd == 1);
+ }
+ _ => return None,
}
+
+ let request_socket = unsafe { UnixStream::from_raw_fd(fd) };
+ Some(handle_request(jobserver, socket, request_socket))
}
-fn runner(uid: Uid, gid: Gid, socket: UnixSeqpacketConn, _lockfile: File, options: &Options) {
+fn runner(uid: Uid, gid: Gid, mut socket: UnixSeqpacketConn, _lockfile: File, options: &Options) {
ns::mount_proc();
ns::setup_userns(Uid::from_raw(0), Gid::from_raw(0), uid, gid);
@@ -96,22 +106,21 @@ fn runner(uid: Uid, gid: Gid, socket: UnixSeqpacketConn, _lockfile: File, option
init::init_runner().unwrap();
+ let jobs = options
+ .jobs
+ .unwrap_or_else(|| unix::nproc().expect("Failed to get number of available CPUs"));
+ let mut jobserver = Jobserver::new(jobs).expect("Failed to initialize jobserver pipe");
+
let mut signals = signal::SigSet::empty();
signals.add(signal::Signal::SIGCHLD);
signal::pthread_sigmask(signal::SigmaskHow::SIG_BLOCK, Some(&signals), None)
.expect("pthread_sigmask()");
let mut sfd = SignalFd::new(&signals).expect("Failed to create signal file descriptor");
- let msg_handler = unsafe {
- clone::spawn(None, (), |()| {
- signal::signal(signal::Signal::SIGCHLD, signal::SigHandler::SigIgn).unwrap();
- runner_loop(socket, options);
- })
- }
- .expect("fork()")
- .0;
-
- let mut pollfds = [poll::PollFd::new(sfd.as_raw_fd(), poll::PollFlags::POLLIN)];
+ let mut pollfds = [
+ poll::PollFd::new(sfd.as_raw_fd(), poll::PollFlags::POLLIN),
+ poll::PollFd::new(socket.as_raw_fd(), poll::PollFlags::POLLIN),
+ ];
loop {
poll::poll(&mut pollfds, -1).expect("poll()");
@@ -121,21 +130,24 @@ fn runner(uid: Uid, gid: Gid, socket: UnixSeqpacketConn, _lockfile: File, option
.expect("Unknown events in poll() return");
if events.contains(poll::PollFlags::POLLIN) {
let _signal = sfd.read_signal().expect("read_signal()").unwrap();
+ handle_sigchld();
+ } else if events.intersects(!poll::PollFlags::POLLIN) {
+ panic!("Unexpected error status for signal file descriptor");
+ }
- loop {
- let status =
- match wait::waitpid(Pid::from_raw(-1), Some(wait::WaitPidFlag::WNOHANG)) {
- Ok(wait::WaitStatus::StillAlive) | Err(Errno::ECHILD) => break,
- res => res.expect("waitpid()"),
- };
- let pid = status.pid().unwrap();
-
- if pid == msg_handler {
- return;
+ let events = pollfds[1]
+ .revents()
+ .expect("Unknown events in poll() return");
+ if events.contains(poll::PollFlags::POLLIN) {
+ match handle_socket(jobserver, socket) {
+ Some((jobserver_ret, socket_ret)) => {
+ jobserver = jobserver_ret;
+ socket = socket_ret;
}
+ None => break,
}
} else if events.intersects(!poll::PollFlags::POLLIN) {
- panic!("Unexpected error status for signal file descriptor");
+ panic!("Unexpected error status for socket file descriptor");
}
}
}