summaryrefslogtreecommitdiffstats
path: root/crates/runner
diff options
context:
space:
mode:
authorMatthias Schiffer <mschiffer@universe-factory.net>2021-10-26 18:43:51 +0200
committerMatthias Schiffer <mschiffer@universe-factory.net>2021-10-26 19:03:17 +0200
commit5d93586e6def88752323d6dc91443a24a5b7e0f0 (patch)
treec2104054006a0b340f23adc3874c22b04d6c1a9e /crates/runner
parent8bebe4c76107d8b0a55fda238b0475469d374d77 (diff)
downloadrebel-5d93586e6def88752323d6dc91443a24a5b7e0f0.tar
rebel-5d93586e6def88752323d6dc91443a24a5b7e0f0.zip
Replace ipc-channel with UNIX sockets and manual FD passing
The new implementation is more flexible, as it allows mixed polling with other FD types, and it saves us a whole zoo of dependencies.
Diffstat (limited to 'crates/runner')
-rw-r--r--crates/runner/Cargo.toml3
-rw-r--r--crates/runner/src/lib.rs100
2 files changed, 55 insertions, 48 deletions
diff --git a/crates/runner/Cargo.toml b/crates/runner/Cargo.toml
index 5d51739..d8fe562 100644
--- a/crates/runner/Cargo.toml
+++ b/crates/runner/Cargo.toml
@@ -13,7 +13,6 @@ common = { path = "../common", package = "rebel-common" }
blake3 = { version = "1.0.0", features = ["traits-preview"] }
capctl = "0.2.0"
digest = "0.9.0"
-ipc-channel = { git = "https://github.com/servo/ipc-channel.git" }
libc = "0.2.84"
nix = "0.23.0"
olpc-cjson = "0.1.0"
@@ -21,3 +20,5 @@ serde = { version = "1", features = ["derive"] }
serde_json = "1.0.62"
tar = "0.4.32"
tee_readwrite = "0.1.0"
+uds = "0.2.6"
+bincode = "1.3.3"
diff --git a/crates/runner/src/lib.rs b/crates/runner/src/lib.rs
index 21289cf..bf052ac 100644
--- a/crates/runner/src/lib.rs
+++ b/crates/runner/src/lib.rs
@@ -6,16 +6,20 @@ mod tar;
mod task;
mod util;
-use std::fs::File;
+use std::{
+ fs::File,
+ net,
+ os::unix::{net::UnixStream, prelude::*},
+ slice,
+};
use capctl::prctl;
-use ipc_channel::ipc;
use nix::{
sched::CloneFlags,
sys::{signal, stat, wait},
unistd::{self, Gid, Uid},
};
-use serde::{Deserialize, Serialize};
+use uds::UnixSeqpacketConn;
use common::{error::*, types::*};
@@ -29,28 +33,27 @@ pub struct Options {
pub jobs: Option<usize>,
}
-#[derive(Debug, Deserialize, Serialize)]
-enum Message {
- Request(Task, ipc::IpcSender<Result<TaskOutput>>),
-}
-
fn handle_request(
jobserver: Jobserver,
- channel: ipc::IpcReceiver<Message>,
- task: Task,
- reply_channel: ipc::IpcSender<Result<TaskOutput>>,
-) -> (Jobserver, ipc::IpcReceiver<Message>) {
- let child = |(mut jobserver, channel): (Jobserver, ipc::IpcReceiver<Message>)| {
- drop(channel);
+ socket: UnixSeqpacketConn,
+ request_socket: UnixStream,
+) -> (Jobserver, UnixSeqpacketConn) {
+ let child = |(mut jobserver, socket): (Jobserver, UnixSeqpacketConn)| {
+ drop(socket);
unsafe { signal::signal(signal::Signal::SIGCHLD, signal::SigHandler::SigDfl) }.unwrap();
+ let task: Task =
+ bincode::deserialize_from(&request_socket).expect("Failed to decode task description");
+
prctl::set_name(&task.label).expect("prctl(PR_SET_NAME)");
let token = jobserver.wait();
let (pid, mut jobserver) = unsafe {
clone::spawn(None, jobserver, |jobserver| {
let result = task::handle(task, jobserver);
- reply_channel.send(result).expect("IPC send() failed");
+ bincode::serialize_into(&request_socket, &result)
+ .expect("Failed to send task result");
+ drop(request_socket);
})
}
.expect("fork()");
@@ -59,35 +62,31 @@ fn handle_request(
wait_res.expect("waitpid()");
};
- unsafe { clone::spawn(None, (jobserver, channel), child) }
+ unsafe { clone::spawn(None, (jobserver, socket), child) }
.expect("fork()")
.1
}
-fn runner_loop(mut channel: ipc::IpcReceiver<Message>, options: &Options) {
+fn runner_loop(mut socket: UnixSeqpacketConn, options: &Options) {
let jobs = options
.jobs
.unwrap_or_else(|| unix::nproc().expect("Failed to get number of available CPUs"));
let mut jobserver = Jobserver::new(jobs).expect("Failed to initialize jobserver pipe");
- while let Ok(msg) = channel.recv() {
- match msg {
- Message::Request(task, reply_channel) => {
- let ret = handle_request(jobserver, channel, task, reply_channel);
- jobserver = ret.0;
- channel = ret.1;
- }
- }
+ let mut fd = 0;
+
+ while let Ok((1, _, n_fd)) = socket.recv_fds(&mut [0], slice::from_mut(&mut fd)) {
+ assert!(n_fd == 1);
+
+ let request_socket = unsafe { UnixStream::from_raw_fd(fd) };
+
+ let ret = handle_request(jobserver, socket, request_socket);
+ jobserver = ret.0;
+ socket = ret.1;
}
}
-fn runner(
- uid: Uid,
- gid: Gid,
- channel: ipc::IpcReceiver<Message>,
- _lockfile: File,
- options: &Options,
-) {
+fn runner(uid: Uid, gid: Gid, socket: UnixSeqpacketConn, _lockfile: File, options: &Options) {
ns::mount_proc();
ns::setup_userns(Uid::from_raw(0), Gid::from_raw(0), uid, gid);
@@ -98,7 +97,7 @@ fn runner(
let msg_handler = unsafe {
clone::spawn(None, (), |()| {
signal::signal(signal::Signal::SIGCHLD, signal::SigHandler::SigIgn).unwrap();
- runner_loop(channel, options);
+ runner_loop(socket, options);
})
}
.expect("fork()")
@@ -113,7 +112,7 @@ fn runner(
}
pub struct Runner {
- channel: ipc::IpcSender<Message>,
+ socket: UnixSeqpacketConn,
}
impl Runner {
@@ -129,29 +128,36 @@ impl Runner {
let uid = unistd::geteuid();
let gid = unistd::getegid();
- let (tx, rx) = ipc::channel().expect("IPC channel creation failed");
+ let (local, remote) = UnixSeqpacketConn::pair().expect("socketpair()");
- let (tx, _rx) = clone::spawn(
+ let (local, _remote) = clone::spawn(
Some(CloneFlags::CLONE_NEWUSER | CloneFlags::CLONE_NEWNS | CloneFlags::CLONE_NEWPID),
- (tx, rx),
- |(tx, rx)| {
- drop(tx);
- runner(uid, gid, rx, lockfile, options);
+ (local, remote),
+ |(local, remote)| {
+ drop(local);
+ runner(uid, gid, remote, lockfile, options);
},
)
.expect("clone()")
.1;
- Ok(Runner { channel: tx })
+ Ok(Runner { socket: local })
}
- pub fn spawn(&self, task: &Task) -> ipc::IpcReceiver<Result<TaskOutput>> {
- let (reply_tx, reply_rx) = ipc::channel().expect("IPC channel creation failed");
+ pub fn spawn(&self, task: &Task) -> UnixStream {
+ let (local, remote) = UnixStream::pair().expect("socketpair()");
+
+ self.socket
+ .send_fds(&[0], &[remote.as_raw_fd()])
+ .expect("send()");
- self.channel
- .send(Message::Request(task.clone(), reply_tx))
- .expect("ContainerRunner task submission failed");
+ bincode::serialize_into(&local, task).expect("Task submission failed");
+ local.shutdown(net::Shutdown::Write).expect("shutdown()");
+
+ local
+ }
- reply_rx
+ pub fn result(socket: &UnixStream) -> Result<TaskOutput> {
+ bincode::deserialize_from(socket).expect("Failed to read task result")
}
}