diff options
author | Matthias Schiffer <mschiffer@universe-factory.net> | 2021-10-26 18:43:51 +0200 |
---|---|---|
committer | Matthias Schiffer <mschiffer@universe-factory.net> | 2021-10-26 19:03:17 +0200 |
commit | 5d93586e6def88752323d6dc91443a24a5b7e0f0 (patch) | |
tree | c2104054006a0b340f23adc3874c22b04d6c1a9e /crates/runner/src | |
parent | 8bebe4c76107d8b0a55fda238b0475469d374d77 (diff) | |
download | rebel-5d93586e6def88752323d6dc91443a24a5b7e0f0.tar rebel-5d93586e6def88752323d6dc91443a24a5b7e0f0.zip |
Replace ipc-channel with UNIX sockets and manual FD passing
The new implementation is more flexible, as it allows mixed polling with
other FD types, and it saves us a whole zoo of dependencies.
Diffstat (limited to 'crates/runner/src')
-rw-r--r-- | crates/runner/src/lib.rs | 100 |
1 files changed, 53 insertions, 47 deletions
diff --git a/crates/runner/src/lib.rs b/crates/runner/src/lib.rs index 21289cf..bf052ac 100644 --- a/crates/runner/src/lib.rs +++ b/crates/runner/src/lib.rs @@ -6,16 +6,20 @@ mod tar; mod task; mod util; -use std::fs::File; +use std::{ + fs::File, + net, + os::unix::{net::UnixStream, prelude::*}, + slice, +}; use capctl::prctl; -use ipc_channel::ipc; use nix::{ sched::CloneFlags, sys::{signal, stat, wait}, unistd::{self, Gid, Uid}, }; -use serde::{Deserialize, Serialize}; +use uds::UnixSeqpacketConn; use common::{error::*, types::*}; @@ -29,28 +33,27 @@ pub struct Options { pub jobs: Option<usize>, } -#[derive(Debug, Deserialize, Serialize)] -enum Message { - Request(Task, ipc::IpcSender<Result<TaskOutput>>), -} - fn handle_request( jobserver: Jobserver, - channel: ipc::IpcReceiver<Message>, - task: Task, - reply_channel: ipc::IpcSender<Result<TaskOutput>>, -) -> (Jobserver, ipc::IpcReceiver<Message>) { - let child = |(mut jobserver, channel): (Jobserver, ipc::IpcReceiver<Message>)| { - drop(channel); + socket: UnixSeqpacketConn, + request_socket: UnixStream, +) -> (Jobserver, UnixSeqpacketConn) { + let child = |(mut jobserver, socket): (Jobserver, UnixSeqpacketConn)| { + drop(socket); unsafe { signal::signal(signal::Signal::SIGCHLD, signal::SigHandler::SigDfl) }.unwrap(); + let task: Task = + bincode::deserialize_from(&request_socket).expect("Failed to decode task description"); + prctl::set_name(&task.label).expect("prctl(PR_SET_NAME)"); let token = jobserver.wait(); let (pid, mut jobserver) = unsafe { clone::spawn(None, jobserver, |jobserver| { let result = task::handle(task, jobserver); - reply_channel.send(result).expect("IPC send() failed"); + bincode::serialize_into(&request_socket, &result) + .expect("Failed to send task result"); + drop(request_socket); }) } .expect("fork()"); @@ -59,35 +62,31 @@ fn handle_request( wait_res.expect("waitpid()"); }; - unsafe { clone::spawn(None, (jobserver, channel), child) } + unsafe { clone::spawn(None, (jobserver, socket), child) } .expect("fork()") .1 } -fn runner_loop(mut channel: ipc::IpcReceiver<Message>, options: &Options) { +fn runner_loop(mut socket: UnixSeqpacketConn, options: &Options) { let jobs = options .jobs .unwrap_or_else(|| unix::nproc().expect("Failed to get number of available CPUs")); let mut jobserver = Jobserver::new(jobs).expect("Failed to initialize jobserver pipe"); - while let Ok(msg) = channel.recv() { - match msg { - Message::Request(task, reply_channel) => { - let ret = handle_request(jobserver, channel, task, reply_channel); - jobserver = ret.0; - channel = ret.1; - } - } + let mut fd = 0; + + while let Ok((1, _, n_fd)) = socket.recv_fds(&mut [0], slice::from_mut(&mut fd)) { + assert!(n_fd == 1); + + let request_socket = unsafe { UnixStream::from_raw_fd(fd) }; + + let ret = handle_request(jobserver, socket, request_socket); + jobserver = ret.0; + socket = ret.1; } } -fn runner( - uid: Uid, - gid: Gid, - channel: ipc::IpcReceiver<Message>, - _lockfile: File, - options: &Options, -) { +fn runner(uid: Uid, gid: Gid, socket: UnixSeqpacketConn, _lockfile: File, options: &Options) { ns::mount_proc(); ns::setup_userns(Uid::from_raw(0), Gid::from_raw(0), uid, gid); @@ -98,7 +97,7 @@ fn runner( let msg_handler = unsafe { clone::spawn(None, (), |()| { signal::signal(signal::Signal::SIGCHLD, signal::SigHandler::SigIgn).unwrap(); - runner_loop(channel, options); + runner_loop(socket, options); }) } .expect("fork()") @@ -113,7 +112,7 @@ fn runner( } pub struct Runner { - channel: ipc::IpcSender<Message>, + socket: UnixSeqpacketConn, } impl Runner { @@ -129,29 +128,36 @@ impl Runner { let uid = unistd::geteuid(); let gid = unistd::getegid(); - let (tx, rx) = ipc::channel().expect("IPC channel creation failed"); + let (local, remote) = UnixSeqpacketConn::pair().expect("socketpair()"); - let (tx, _rx) = clone::spawn( + let (local, _remote) = clone::spawn( Some(CloneFlags::CLONE_NEWUSER | CloneFlags::CLONE_NEWNS | CloneFlags::CLONE_NEWPID), - (tx, rx), - |(tx, rx)| { - drop(tx); - runner(uid, gid, rx, lockfile, options); + (local, remote), + |(local, remote)| { + drop(local); + runner(uid, gid, remote, lockfile, options); }, ) .expect("clone()") .1; - Ok(Runner { channel: tx }) + Ok(Runner { socket: local }) } - pub fn spawn(&self, task: &Task) -> ipc::IpcReceiver<Result<TaskOutput>> { - let (reply_tx, reply_rx) = ipc::channel().expect("IPC channel creation failed"); + pub fn spawn(&self, task: &Task) -> UnixStream { + let (local, remote) = UnixStream::pair().expect("socketpair()"); + + self.socket + .send_fds(&[0], &[remote.as_raw_fd()]) + .expect("send()"); - self.channel - .send(Message::Request(task.clone(), reply_tx)) - .expect("ContainerRunner task submission failed"); + bincode::serialize_into(&local, task).expect("Task submission failed"); + local.shutdown(net::Shutdown::Write).expect("shutdown()"); + + local + } - reply_rx + pub fn result(socket: &UnixStream) -> Result<TaskOutput> { + bincode::deserialize_from(socket).expect("Failed to read task result") } } |