diff options
author | Matthias Schiffer <mschiffer@universe-factory.net> | 2021-10-26 21:18:00 +0200 |
---|---|---|
committer | Matthias Schiffer <mschiffer@universe-factory.net> | 2021-10-26 21:48:18 +0200 |
commit | ae79ea1276f56872da590dd611c7b12b4db67c27 (patch) | |
tree | 788c7e5c54c88e3f2a7a654f358b204079e9f6c3 /crates/runner/src/lib.rs | |
parent | e2c11da16f401b47feb627571f394a23c9889dbf (diff) | |
download | rebel-ae79ea1276f56872da590dd611c7b12b4db67c27.tar rebel-ae79ea1276f56872da590dd611c7b12b4db67c27.zip |
runner: handle reaping and requests in the poll loop
Diffstat (limited to 'crates/runner/src/lib.rs')
-rw-r--r-- | crates/runner/src/lib.rs | 86 |
1 files changed, 49 insertions, 37 deletions
diff --git a/crates/runner/src/lib.rs b/crates/runner/src/lib.rs index 51e0e17..8104daa 100644 --- a/crates/runner/src/lib.rs +++ b/crates/runner/src/lib.rs @@ -35,6 +35,16 @@ pub struct Options { pub jobs: Option<usize>, } +fn handle_sigchld() { + loop { + let status = match wait::waitpid(Pid::from_raw(-1), Some(wait::WaitPidFlag::WNOHANG)) { + Ok(wait::WaitStatus::StillAlive) | Err(Errno::ECHILD) => return, + res => res.expect("waitpid()"), + }; + let _pid = status.pid().unwrap(); + } +} + fn handle_request( jobserver: Jobserver, socket: UnixSeqpacketConn, @@ -42,7 +52,6 @@ fn handle_request( ) -> (Jobserver, UnixSeqpacketConn) { let child = |(mut jobserver, socket): (Jobserver, UnixSeqpacketConn)| { drop(socket); - unsafe { signal::signal(signal::Signal::SIGCHLD, signal::SigHandler::SigDfl) }.unwrap(); let task: Task = bincode::deserialize_from(&request_socket).expect("Failed to decode task description"); @@ -69,26 +78,27 @@ fn handle_request( .1 } -fn runner_loop(mut socket: UnixSeqpacketConn, options: &Options) { - let jobs = options - .jobs - .unwrap_or_else(|| unix::nproc().expect("Failed to get number of available CPUs")); - let mut jobserver = Jobserver::new(jobs).expect("Failed to initialize jobserver pipe"); - +fn handle_socket( + jobserver: Jobserver, + socket: UnixSeqpacketConn, +) -> Option<(Jobserver, UnixSeqpacketConn)> { let mut fd = 0; - while let Ok((1, _, n_fd)) = socket.recv_fds(&mut [0], slice::from_mut(&mut fd)) { - assert!(n_fd == 1); - - let request_socket = unsafe { UnixStream::from_raw_fd(fd) }; - - let ret = handle_request(jobserver, socket, request_socket); - jobserver = ret.0; - socket = ret.1; + match socket + .recv_fds(&mut [0], slice::from_mut(&mut fd)) + .expect("recv_fds()") + { + (1, _, n_fd) => { + assert!(n_fd == 1); + } + _ => return None, } + + let request_socket = unsafe { UnixStream::from_raw_fd(fd) }; + Some(handle_request(jobserver, socket, request_socket)) } -fn runner(uid: Uid, gid: Gid, socket: UnixSeqpacketConn, _lockfile: File, options: &Options) { +fn runner(uid: Uid, gid: Gid, mut socket: UnixSeqpacketConn, _lockfile: File, options: &Options) { ns::mount_proc(); ns::setup_userns(Uid::from_raw(0), Gid::from_raw(0), uid, gid); @@ -96,22 +106,21 @@ fn runner(uid: Uid, gid: Gid, socket: UnixSeqpacketConn, _lockfile: File, option init::init_runner().unwrap(); + let jobs = options + .jobs + .unwrap_or_else(|| unix::nproc().expect("Failed to get number of available CPUs")); + let mut jobserver = Jobserver::new(jobs).expect("Failed to initialize jobserver pipe"); + let mut signals = signal::SigSet::empty(); signals.add(signal::Signal::SIGCHLD); signal::pthread_sigmask(signal::SigmaskHow::SIG_BLOCK, Some(&signals), None) .expect("pthread_sigmask()"); let mut sfd = SignalFd::new(&signals).expect("Failed to create signal file descriptor"); - let msg_handler = unsafe { - clone::spawn(None, (), |()| { - signal::signal(signal::Signal::SIGCHLD, signal::SigHandler::SigIgn).unwrap(); - runner_loop(socket, options); - }) - } - .expect("fork()") - .0; - - let mut pollfds = [poll::PollFd::new(sfd.as_raw_fd(), poll::PollFlags::POLLIN)]; + let mut pollfds = [ + poll::PollFd::new(sfd.as_raw_fd(), poll::PollFlags::POLLIN), + poll::PollFd::new(socket.as_raw_fd(), poll::PollFlags::POLLIN), + ]; loop { poll::poll(&mut pollfds, -1).expect("poll()"); @@ -121,21 +130,24 @@ fn runner(uid: Uid, gid: Gid, socket: UnixSeqpacketConn, _lockfile: File, option .expect("Unknown events in poll() return"); if events.contains(poll::PollFlags::POLLIN) { let _signal = sfd.read_signal().expect("read_signal()").unwrap(); + handle_sigchld(); + } else if events.intersects(!poll::PollFlags::POLLIN) { + panic!("Unexpected error status for signal file descriptor"); + } - loop { - let status = - match wait::waitpid(Pid::from_raw(-1), Some(wait::WaitPidFlag::WNOHANG)) { - Ok(wait::WaitStatus::StillAlive) | Err(Errno::ECHILD) => break, - res => res.expect("waitpid()"), - }; - let pid = status.pid().unwrap(); - - if pid == msg_handler { - return; + let events = pollfds[1] + .revents() + .expect("Unknown events in poll() return"); + if events.contains(poll::PollFlags::POLLIN) { + match handle_socket(jobserver, socket) { + Some((jobserver_ret, socket_ret)) => { + jobserver = jobserver_ret; + socket = socket_ret; } + None => break, } } else if events.intersects(!poll::PollFlags::POLLIN) { - panic!("Unexpected error status for signal file descriptor"); + panic!("Unexpected error status for socket file descriptor"); } } } |