summaryrefslogtreecommitdiffstats
path: root/crates/runner/src/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/runner/src/lib.rs')
-rw-r--r--crates/runner/src/lib.rs204
1 files changed, 0 insertions, 204 deletions
diff --git a/crates/runner/src/lib.rs b/crates/runner/src/lib.rs
deleted file mode 100644
index cfaf658..0000000
--- a/crates/runner/src/lib.rs
+++ /dev/null
@@ -1,204 +0,0 @@
-mod init;
-mod jobserver;
-mod ns;
-mod paths;
-mod tar;
-mod task;
-mod util;
-
-use std::{
- collections::HashSet,
- fs::File,
- net,
- os::unix::{net::UnixStream, prelude::*},
- process, slice,
-};
-
-use capctl::prctl;
-use nix::{
- errno::Errno,
- poll,
- sched::CloneFlags,
- sys::{
- signal,
- signalfd::{SfdFlags, SignalFd},
- stat, wait,
- },
- unistd::{self, Gid, Pid, Uid},
-};
-use uds::UnixSeqpacketConn;
-
-use common::{error::*, types::*};
-
-use jobserver::Jobserver;
-use util::{checkable::Checkable, clone, steal::Steal, unix};
-
-#[derive(Debug, Clone)]
-pub struct Options {
- pub jobs: Option<usize>,
-}
-
-#[derive(Debug)]
-struct RunnerContext {
- socket: Steal<UnixSeqpacketConn>,
- jobserver: Jobserver,
- tasks: HashSet<Pid>,
-}
-
-fn handle_sigchld(ctx: &mut RunnerContext) -> Result<()> {
- loop {
- let status = match wait::waitpid(Pid::from_raw(-1), Some(wait::WaitPidFlag::WNOHANG)) {
- Ok(wait::WaitStatus::StillAlive) | Err(Errno::ECHILD) => return Ok(()),
- res => res.expect("waitpid()"),
- };
- let pid = status.pid().unwrap();
- if ctx.tasks.remove(&pid) {
- status.check()?;
- }
- }
-}
-
-fn handle_request(ctx: &mut RunnerContext, request_socket: UnixStream) {
- let run = || {
- ctx.socket.steal();
-
- let task: Task =
- bincode::deserialize_from(&request_socket).expect("Failed to decode task description");
-
- prctl::set_name(&task.label).expect("prctl(PR_SET_NAME)");
-
- let result = task::handle(task, &mut ctx.jobserver);
- bincode::serialize_into(&request_socket, &result).expect("Failed to send task result");
- drop(request_socket);
- };
-
- let pid = unsafe { clone::spawn(None, run) }.expect("fork()");
- assert!(ctx.tasks.insert(pid));
-}
-
-fn handle_socket(ctx: &mut RunnerContext) -> bool {
- let mut fd = 0;
-
- match ctx
- .socket
- .recv_fds(&mut [0], slice::from_mut(&mut fd))
- .expect("recv_fds()")
- {
- (1, _, n_fd) => {
- assert!(n_fd == 1);
- }
- _ => return false,
- }
-
- let request_socket = unsafe { UnixStream::from_raw_fd(fd) };
- handle_request(ctx, request_socket);
- true
-}
-
-fn runner(uid: Uid, gid: Gid, socket: UnixSeqpacketConn, _lockfile: File, options: &Options) -> ! {
- ns::mount_proc();
- ns::setup_userns(Uid::from_raw(0), Gid::from_raw(0), uid, gid);
-
- stat::umask(stat::Mode::from_bits_truncate(0o022));
-
- init::init_runner().unwrap();
-
- let jobs = options
- .jobs
- .unwrap_or_else(|| unix::nproc().expect("Failed to get number of available CPUs"));
- let jobserver = Jobserver::new(jobs).expect("Failed to initialize jobserver pipe");
- let mut ctx = RunnerContext {
- socket: socket.into(),
- jobserver,
- tasks: HashSet::new(),
- };
-
- let mut signals = signal::SigSet::empty();
- signals.add(signal::Signal::SIGCHLD);
- signal::pthread_sigmask(signal::SigmaskHow::SIG_BLOCK, Some(&signals), None)
- .expect("pthread_sigmask()");
- let mut sfd = SignalFd::with_flags(&signals, SfdFlags::SFD_CLOEXEC)
- .expect("Failed to create signal file descriptor");
-
- let mut pollfds = [
- poll::PollFd::new(sfd.as_raw_fd(), poll::PollFlags::POLLIN),
- poll::PollFd::new(ctx.socket.as_raw_fd(), poll::PollFlags::POLLIN),
- ];
-
- loop {
- poll::poll(&mut pollfds, -1).expect("poll()");
-
- let events = pollfds[0]
- .revents()
- .expect("Unknown events in poll() return");
- if events.contains(poll::PollFlags::POLLIN) {
- let _signal = sfd.read_signal().expect("read_signal()").unwrap();
- handle_sigchld(&mut ctx).expect("Task process exited abnormally");
- } else if events.intersects(!poll::PollFlags::POLLIN) {
- panic!("Unexpected error status for signal file descriptor");
- }
-
- let events = pollfds[1]
- .revents()
- .expect("Unknown events in poll() return");
- if events.contains(poll::PollFlags::POLLIN) {
- if !handle_socket(&mut ctx) {
- break;
- }
- } else if events.intersects(!poll::PollFlags::POLLIN) {
- panic!("Unexpected error status for socket file descriptor");
- }
- }
-
- process::exit(0);
-}
-
-pub struct Runner {
- socket: UnixSeqpacketConn,
-}
-
-impl Runner {
- /// Creates a new container runner
- ///
- /// # Safety
- ///
- /// Do not call in multithreaded processes.
- pub unsafe fn new(options: &Options) -> Result<Self> {
- let lockfile = unix::lock(paths::LOCKFILE, true, false)
- .context("Failed to get lock on build directory, is another instance running?")?;
-
- let uid = unistd::geteuid();
- let gid = unistd::getegid();
-
- let (local, remote) = UnixSeqpacketConn::pair().expect("socketpair()");
-
- match clone::clone(
- CloneFlags::CLONE_NEWUSER | CloneFlags::CLONE_NEWNS | CloneFlags::CLONE_NEWPID,
- )
- .expect("clone()")
- {
- unistd::ForkResult::Parent { .. } => Ok(Runner { socket: local }),
- unistd::ForkResult::Child => {
- drop(local);
- runner(uid, gid, remote, lockfile, options);
- }
- }
- }
-
- pub fn spawn(&self, task: &Task) -> UnixStream {
- let (local, remote) = UnixStream::pair().expect("socketpair()");
-
- self.socket
- .send_fds(&[0], &[remote.as_raw_fd()])
- .expect("send()");
-
- bincode::serialize_into(&local, task).expect("Task submission failed");
- local.shutdown(net::Shutdown::Write).expect("shutdown()");
-
- local
- }
-
- pub fn result(socket: &UnixStream) -> Result<TaskOutput> {
- bincode::deserialize_from(socket).expect("Failed to read task result")
- }
-}