summaryrefslogtreecommitdiffstats
path: root/crates
diff options
context:
space:
mode:
authorMatthias Schiffer <mschiffer@universe-factory.net>2021-10-26 18:43:51 +0200
committerMatthias Schiffer <mschiffer@universe-factory.net>2021-10-26 19:03:17 +0200
commit5d93586e6def88752323d6dc91443a24a5b7e0f0 (patch)
treec2104054006a0b340f23adc3874c22b04d6c1a9e /crates
parent8bebe4c76107d8b0a55fda238b0475469d374d77 (diff)
downloadrebel-5d93586e6def88752323d6dc91443a24a5b7e0f0.tar
rebel-5d93586e6def88752323d6dc91443a24a5b7e0f0.zip
Replace ipc-channel with UNIX sockets and manual FD passing
The new implementation is more flexible, as it allows mixed polling with other FD types, and it saves us a whole zoo of dependencies.
Diffstat (limited to 'crates')
-rw-r--r--crates/executor/Cargo.toml4
-rw-r--r--crates/executor/src/executor.rs83
-rw-r--r--crates/runner/Cargo.toml3
-rw-r--r--crates/runner/src/lib.rs100
4 files changed, 93 insertions, 97 deletions
diff --git a/crates/executor/Cargo.toml b/crates/executor/Cargo.toml
index 821caa7..215c451 100644
--- a/crates/executor/Cargo.toml
+++ b/crates/executor/Cargo.toml
@@ -15,10 +15,10 @@ clap = "3.0.0-beta.2"
enum-kinds = "0.5.1"
handlebars = "4.1.3"
indoc = "1.0.3"
-ipc-channel = { git = "https://github.com/servo/ipc-channel.git" }
lazy_static = "1.4.0"
+nix = "0.23.0"
regex = "1.5.4"
scoped-tls-hkt = "0.1.2"
-serde = { version = "1", features = ["derive"] }
+serde = { version = "1", features = ["derive", "rc"] }
serde_yaml = "0.8"
walkdir = "2"
diff --git a/crates/executor/src/executor.rs b/crates/executor/src/executor.rs
index 1d6ee44..ad560e9 100644
--- a/crates/executor/src/executor.rs
+++ b/crates/executor/src/executor.rs
@@ -1,7 +1,10 @@
-use std::collections::{HashMap, HashSet};
+use std::{
+ collections::{HashMap, HashSet},
+ os::unix::{net::UnixStream, prelude::*},
+};
use indoc::indoc;
-use ipc_channel::ipc;
+use nix::poll;
use common::{error::*, string_hash::*, types::*};
use runner::{paths, Runner};
@@ -15,10 +18,9 @@ use crate::{
pub struct Executor<'ctx> {
ctx: &'ctx Context,
- receiver_set: ipc::IpcReceiverSet,
tasks_blocked: HashSet<TaskRef<'ctx>>,
tasks_runnable: Vec<TaskRef<'ctx>>,
- tasks_running: HashMap<u64, TaskRef<'ctx>>,
+ tasks_running: HashMap<RawFd, (UnixStream, TaskRef<'ctx>)>,
tasks_done: HashMap<TaskRef<'ctx>, TaskOutput>,
rdeps: HashMap<TaskRef<'ctx>, Vec<TaskRef<'ctx>>>,
tpl: TemplateEngine,
@@ -28,7 +30,6 @@ impl<'ctx> Executor<'ctx> {
pub fn new(ctx: &'ctx Context, taskset: HashSet<TaskRef<'ctx>>) -> Result<Self> {
let mut exc = Executor {
ctx,
- receiver_set: ipc::IpcReceiverSet::new().expect("IpcReceiverSet::new()"),
tasks_blocked: HashSet::new(),
tasks_runnable: Vec::new(),
tasks_running: HashMap::new(),
@@ -205,11 +206,7 @@ impl<'ctx> Executor<'ctx> {
ret
}
- fn spawn_one(
- &self,
- task_ref: &TaskRef<'ctx>,
- runner: &Runner,
- ) -> Result<ipc::IpcReceiver<Result<TaskOutput>>> {
+ fn spawn_one(&self, task_ref: &TaskRef<'ctx>, runner: &Runner) -> Result<UnixStream> {
let task_def = &self.ctx[task_ref.id];
let task_deps = self.task_deps(task_ref)?;
let task_output = task_def
@@ -248,12 +245,11 @@ impl<'ctx> Executor<'ctx> {
fn run_tasks(&mut self, runner: &Runner) -> Result<()> {
while let Some(task_ref) = self.tasks_runnable.pop() {
- let channel = self.spawn_one(&task_ref, runner)?;
- let id = self
- .receiver_set
- .add(channel)
- .expect("Failed to add channel to receiver set");
- self.tasks_running.insert(id, task_ref);
+ let socket = self.spawn_one(&task_ref, runner)?;
+ assert!(self
+ .tasks_running
+ .insert(socket.as_raw_fd(), (socket, task_ref))
+ .is_none());
}
Ok(())
@@ -274,39 +270,32 @@ impl<'ctx> Executor<'ctx> {
}
fn wait_for_task(&mut self) -> Result<()> {
- let mut progress = false;
-
- while !progress {
- let events = self
- .receiver_set
- .select()
- .expect("Failed to get messages from receiver set");
- for event in events {
- match event {
- ipc::IpcSelectionResult::MessageReceived(id, msg) => {
- let task_ref = self
- .tasks_running
- .remove(&id)
- .expect("Received message for unknown task");
- let task_output = msg
- .to::<Result<TaskOutput>>()
- .expect("Failed to decode message from runner")?;
-
- self.tasks_done.insert(task_ref.clone(), task_output);
- self.update_runnable(&task_ref);
-
- progress = true;
- }
- ipc::IpcSelectionResult::ChannelClosed(id) => {
- if let Some(task) = self.tasks_running.remove(&id) {
- return Err(Error::new(format!(
- "Unexpectedly got no result for task {:#}",
- task
- )));
- }
- }
+ let mut pollfds: Box<[_]> = self
+ .tasks_running
+ .keys()
+ .copied()
+ .map(|fd| poll::PollFd::new(fd, poll::PollFlags::POLLIN))
+ .collect();
+
+ while poll::poll(&mut pollfds, -1).context("poll()")? == 0 {}
+
+ for pollfd in &*pollfds {
+ let events = pollfd.revents().expect("Unknown events in poll() return");
+ if !events.contains(poll::PollFlags::POLLIN) {
+ if events.intersects(!poll::PollFlags::POLLIN) {
+ return Err(Error::new(
+ "Unexpected error status for socket file descriptor",
+ ));
}
+ continue;
}
+
+ let fd = pollfd.as_raw_fd();
+ let (socket, task_ref) = self.tasks_running.remove(&fd).unwrap();
+
+ let task_output = Runner::result(&socket)?;
+ self.tasks_done.insert(task_ref.clone(), task_output);
+ self.update_runnable(&task_ref);
}
Ok(())
diff --git a/crates/runner/Cargo.toml b/crates/runner/Cargo.toml
index 5d51739..d8fe562 100644
--- a/crates/runner/Cargo.toml
+++ b/crates/runner/Cargo.toml
@@ -13,7 +13,6 @@ common = { path = "../common", package = "rebel-common" }
blake3 = { version = "1.0.0", features = ["traits-preview"] }
capctl = "0.2.0"
digest = "0.9.0"
-ipc-channel = { git = "https://github.com/servo/ipc-channel.git" }
libc = "0.2.84"
nix = "0.23.0"
olpc-cjson = "0.1.0"
@@ -21,3 +20,5 @@ serde = { version = "1", features = ["derive"] }
serde_json = "1.0.62"
tar = "0.4.32"
tee_readwrite = "0.1.0"
+uds = "0.2.6"
+bincode = "1.3.3"
diff --git a/crates/runner/src/lib.rs b/crates/runner/src/lib.rs
index 21289cf..bf052ac 100644
--- a/crates/runner/src/lib.rs
+++ b/crates/runner/src/lib.rs
@@ -6,16 +6,20 @@ mod tar;
mod task;
mod util;
-use std::fs::File;
+use std::{
+ fs::File,
+ net,
+ os::unix::{net::UnixStream, prelude::*},
+ slice,
+};
use capctl::prctl;
-use ipc_channel::ipc;
use nix::{
sched::CloneFlags,
sys::{signal, stat, wait},
unistd::{self, Gid, Uid},
};
-use serde::{Deserialize, Serialize};
+use uds::UnixSeqpacketConn;
use common::{error::*, types::*};
@@ -29,28 +33,27 @@ pub struct Options {
pub jobs: Option<usize>,
}
-#[derive(Debug, Deserialize, Serialize)]
-enum Message {
- Request(Task, ipc::IpcSender<Result<TaskOutput>>),
-}
-
fn handle_request(
jobserver: Jobserver,
- channel: ipc::IpcReceiver<Message>,
- task: Task,
- reply_channel: ipc::IpcSender<Result<TaskOutput>>,
-) -> (Jobserver, ipc::IpcReceiver<Message>) {
- let child = |(mut jobserver, channel): (Jobserver, ipc::IpcReceiver<Message>)| {
- drop(channel);
+ socket: UnixSeqpacketConn,
+ request_socket: UnixStream,
+) -> (Jobserver, UnixSeqpacketConn) {
+ let child = |(mut jobserver, socket): (Jobserver, UnixSeqpacketConn)| {
+ drop(socket);
unsafe { signal::signal(signal::Signal::SIGCHLD, signal::SigHandler::SigDfl) }.unwrap();
+ let task: Task =
+ bincode::deserialize_from(&request_socket).expect("Failed to decode task description");
+
prctl::set_name(&task.label).expect("prctl(PR_SET_NAME)");
let token = jobserver.wait();
let (pid, mut jobserver) = unsafe {
clone::spawn(None, jobserver, |jobserver| {
let result = task::handle(task, jobserver);
- reply_channel.send(result).expect("IPC send() failed");
+ bincode::serialize_into(&request_socket, &result)
+ .expect("Failed to send task result");
+ drop(request_socket);
})
}
.expect("fork()");
@@ -59,35 +62,31 @@ fn handle_request(
wait_res.expect("waitpid()");
};
- unsafe { clone::spawn(None, (jobserver, channel), child) }
+ unsafe { clone::spawn(None, (jobserver, socket), child) }
.expect("fork()")
.1
}
-fn runner_loop(mut channel: ipc::IpcReceiver<Message>, options: &Options) {
+fn runner_loop(mut socket: UnixSeqpacketConn, options: &Options) {
let jobs = options
.jobs
.unwrap_or_else(|| unix::nproc().expect("Failed to get number of available CPUs"));
let mut jobserver = Jobserver::new(jobs).expect("Failed to initialize jobserver pipe");
- while let Ok(msg) = channel.recv() {
- match msg {
- Message::Request(task, reply_channel) => {
- let ret = handle_request(jobserver, channel, task, reply_channel);
- jobserver = ret.0;
- channel = ret.1;
- }
- }
+ let mut fd = 0;
+
+ while let Ok((1, _, n_fd)) = socket.recv_fds(&mut [0], slice::from_mut(&mut fd)) {
+ assert!(n_fd == 1);
+
+ let request_socket = unsafe { UnixStream::from_raw_fd(fd) };
+
+ let ret = handle_request(jobserver, socket, request_socket);
+ jobserver = ret.0;
+ socket = ret.1;
}
}
-fn runner(
- uid: Uid,
- gid: Gid,
- channel: ipc::IpcReceiver<Message>,
- _lockfile: File,
- options: &Options,
-) {
+fn runner(uid: Uid, gid: Gid, socket: UnixSeqpacketConn, _lockfile: File, options: &Options) {
ns::mount_proc();
ns::setup_userns(Uid::from_raw(0), Gid::from_raw(0), uid, gid);
@@ -98,7 +97,7 @@ fn runner(
let msg_handler = unsafe {
clone::spawn(None, (), |()| {
signal::signal(signal::Signal::SIGCHLD, signal::SigHandler::SigIgn).unwrap();
- runner_loop(channel, options);
+ runner_loop(socket, options);
})
}
.expect("fork()")
@@ -113,7 +112,7 @@ fn runner(
}
pub struct Runner {
- channel: ipc::IpcSender<Message>,
+ socket: UnixSeqpacketConn,
}
impl Runner {
@@ -129,29 +128,36 @@ impl Runner {
let uid = unistd::geteuid();
let gid = unistd::getegid();
- let (tx, rx) = ipc::channel().expect("IPC channel creation failed");
+ let (local, remote) = UnixSeqpacketConn::pair().expect("socketpair()");
- let (tx, _rx) = clone::spawn(
+ let (local, _remote) = clone::spawn(
Some(CloneFlags::CLONE_NEWUSER | CloneFlags::CLONE_NEWNS | CloneFlags::CLONE_NEWPID),
- (tx, rx),
- |(tx, rx)| {
- drop(tx);
- runner(uid, gid, rx, lockfile, options);
+ (local, remote),
+ |(local, remote)| {
+ drop(local);
+ runner(uid, gid, remote, lockfile, options);
},
)
.expect("clone()")
.1;
- Ok(Runner { channel: tx })
+ Ok(Runner { socket: local })
}
- pub fn spawn(&self, task: &Task) -> ipc::IpcReceiver<Result<TaskOutput>> {
- let (reply_tx, reply_rx) = ipc::channel().expect("IPC channel creation failed");
+ pub fn spawn(&self, task: &Task) -> UnixStream {
+ let (local, remote) = UnixStream::pair().expect("socketpair()");
+
+ self.socket
+ .send_fds(&[0], &[remote.as_raw_fd()])
+ .expect("send()");
- self.channel
- .send(Message::Request(task.clone(), reply_tx))
- .expect("ContainerRunner task submission failed");
+ bincode::serialize_into(&local, task).expect("Task submission failed");
+ local.shutdown(net::Shutdown::Write).expect("shutdown()");
+
+ local
+ }
- reply_rx
+ pub fn result(socket: &UnixStream) -> Result<TaskOutput> {
+ bincode::deserialize_from(socket).expect("Failed to read task result")
}
}