diff options
author | Matthias Schiffer <mschiffer@universe-factory.net> | 2021-10-26 18:43:51 +0200 |
---|---|---|
committer | Matthias Schiffer <mschiffer@universe-factory.net> | 2021-10-26 19:03:17 +0200 |
commit | 5d93586e6def88752323d6dc91443a24a5b7e0f0 (patch) | |
tree | c2104054006a0b340f23adc3874c22b04d6c1a9e /crates | |
parent | 8bebe4c76107d8b0a55fda238b0475469d374d77 (diff) | |
download | rebel-5d93586e6def88752323d6dc91443a24a5b7e0f0.tar rebel-5d93586e6def88752323d6dc91443a24a5b7e0f0.zip |
Replace ipc-channel with UNIX sockets and manual FD passing
The new implementation is more flexible, as it allows mixed polling with
other FD types, and it saves us a whole zoo of dependencies.
Diffstat (limited to 'crates')
-rw-r--r-- | crates/executor/Cargo.toml | 4 | ||||
-rw-r--r-- | crates/executor/src/executor.rs | 83 | ||||
-rw-r--r-- | crates/runner/Cargo.toml | 3 | ||||
-rw-r--r-- | crates/runner/src/lib.rs | 100 |
4 files changed, 93 insertions, 97 deletions
diff --git a/crates/executor/Cargo.toml b/crates/executor/Cargo.toml index 821caa7..215c451 100644 --- a/crates/executor/Cargo.toml +++ b/crates/executor/Cargo.toml @@ -15,10 +15,10 @@ clap = "3.0.0-beta.2" enum-kinds = "0.5.1" handlebars = "4.1.3" indoc = "1.0.3" -ipc-channel = { git = "https://github.com/servo/ipc-channel.git" } lazy_static = "1.4.0" +nix = "0.23.0" regex = "1.5.4" scoped-tls-hkt = "0.1.2" -serde = { version = "1", features = ["derive"] } +serde = { version = "1", features = ["derive", "rc"] } serde_yaml = "0.8" walkdir = "2" diff --git a/crates/executor/src/executor.rs b/crates/executor/src/executor.rs index 1d6ee44..ad560e9 100644 --- a/crates/executor/src/executor.rs +++ b/crates/executor/src/executor.rs @@ -1,7 +1,10 @@ -use std::collections::{HashMap, HashSet}; +use std::{ + collections::{HashMap, HashSet}, + os::unix::{net::UnixStream, prelude::*}, +}; use indoc::indoc; -use ipc_channel::ipc; +use nix::poll; use common::{error::*, string_hash::*, types::*}; use runner::{paths, Runner}; @@ -15,10 +18,9 @@ use crate::{ pub struct Executor<'ctx> { ctx: &'ctx Context, - receiver_set: ipc::IpcReceiverSet, tasks_blocked: HashSet<TaskRef<'ctx>>, tasks_runnable: Vec<TaskRef<'ctx>>, - tasks_running: HashMap<u64, TaskRef<'ctx>>, + tasks_running: HashMap<RawFd, (UnixStream, TaskRef<'ctx>)>, tasks_done: HashMap<TaskRef<'ctx>, TaskOutput>, rdeps: HashMap<TaskRef<'ctx>, Vec<TaskRef<'ctx>>>, tpl: TemplateEngine, @@ -28,7 +30,6 @@ impl<'ctx> Executor<'ctx> { pub fn new(ctx: &'ctx Context, taskset: HashSet<TaskRef<'ctx>>) -> Result<Self> { let mut exc = Executor { ctx, - receiver_set: ipc::IpcReceiverSet::new().expect("IpcReceiverSet::new()"), tasks_blocked: HashSet::new(), tasks_runnable: Vec::new(), tasks_running: HashMap::new(), @@ -205,11 +206,7 @@ impl<'ctx> Executor<'ctx> { ret } - fn spawn_one( - &self, - task_ref: &TaskRef<'ctx>, - runner: &Runner, - ) -> Result<ipc::IpcReceiver<Result<TaskOutput>>> { + fn spawn_one(&self, task_ref: &TaskRef<'ctx>, runner: &Runner) -> Result<UnixStream> { let task_def = &self.ctx[task_ref.id]; let task_deps = self.task_deps(task_ref)?; let task_output = task_def @@ -248,12 +245,11 @@ impl<'ctx> Executor<'ctx> { fn run_tasks(&mut self, runner: &Runner) -> Result<()> { while let Some(task_ref) = self.tasks_runnable.pop() { - let channel = self.spawn_one(&task_ref, runner)?; - let id = self - .receiver_set - .add(channel) - .expect("Failed to add channel to receiver set"); - self.tasks_running.insert(id, task_ref); + let socket = self.spawn_one(&task_ref, runner)?; + assert!(self + .tasks_running + .insert(socket.as_raw_fd(), (socket, task_ref)) + .is_none()); } Ok(()) @@ -274,39 +270,32 @@ impl<'ctx> Executor<'ctx> { } fn wait_for_task(&mut self) -> Result<()> { - let mut progress = false; - - while !progress { - let events = self - .receiver_set - .select() - .expect("Failed to get messages from receiver set"); - for event in events { - match event { - ipc::IpcSelectionResult::MessageReceived(id, msg) => { - let task_ref = self - .tasks_running - .remove(&id) - .expect("Received message for unknown task"); - let task_output = msg - .to::<Result<TaskOutput>>() - .expect("Failed to decode message from runner")?; - - self.tasks_done.insert(task_ref.clone(), task_output); - self.update_runnable(&task_ref); - - progress = true; - } - ipc::IpcSelectionResult::ChannelClosed(id) => { - if let Some(task) = self.tasks_running.remove(&id) { - return Err(Error::new(format!( - "Unexpectedly got no result for task {:#}", - task - ))); - } - } + let mut pollfds: Box<[_]> = self + .tasks_running + .keys() + .copied() + .map(|fd| poll::PollFd::new(fd, poll::PollFlags::POLLIN)) + .collect(); + + while poll::poll(&mut pollfds, -1).context("poll()")? == 0 {} + + for pollfd in &*pollfds { + let events = pollfd.revents().expect("Unknown events in poll() return"); + if !events.contains(poll::PollFlags::POLLIN) { + if events.intersects(!poll::PollFlags::POLLIN) { + return Err(Error::new( + "Unexpected error status for socket file descriptor", + )); } + continue; } + + let fd = pollfd.as_raw_fd(); + let (socket, task_ref) = self.tasks_running.remove(&fd).unwrap(); + + let task_output = Runner::result(&socket)?; + self.tasks_done.insert(task_ref.clone(), task_output); + self.update_runnable(&task_ref); } Ok(()) diff --git a/crates/runner/Cargo.toml b/crates/runner/Cargo.toml index 5d51739..d8fe562 100644 --- a/crates/runner/Cargo.toml +++ b/crates/runner/Cargo.toml @@ -13,7 +13,6 @@ common = { path = "../common", package = "rebel-common" } blake3 = { version = "1.0.0", features = ["traits-preview"] } capctl = "0.2.0" digest = "0.9.0" -ipc-channel = { git = "https://github.com/servo/ipc-channel.git" } libc = "0.2.84" nix = "0.23.0" olpc-cjson = "0.1.0" @@ -21,3 +20,5 @@ serde = { version = "1", features = ["derive"] } serde_json = "1.0.62" tar = "0.4.32" tee_readwrite = "0.1.0" +uds = "0.2.6" +bincode = "1.3.3" diff --git a/crates/runner/src/lib.rs b/crates/runner/src/lib.rs index 21289cf..bf052ac 100644 --- a/crates/runner/src/lib.rs +++ b/crates/runner/src/lib.rs @@ -6,16 +6,20 @@ mod tar; mod task; mod util; -use std::fs::File; +use std::{ + fs::File, + net, + os::unix::{net::UnixStream, prelude::*}, + slice, +}; use capctl::prctl; -use ipc_channel::ipc; use nix::{ sched::CloneFlags, sys::{signal, stat, wait}, unistd::{self, Gid, Uid}, }; -use serde::{Deserialize, Serialize}; +use uds::UnixSeqpacketConn; use common::{error::*, types::*}; @@ -29,28 +33,27 @@ pub struct Options { pub jobs: Option<usize>, } -#[derive(Debug, Deserialize, Serialize)] -enum Message { - Request(Task, ipc::IpcSender<Result<TaskOutput>>), -} - fn handle_request( jobserver: Jobserver, - channel: ipc::IpcReceiver<Message>, - task: Task, - reply_channel: ipc::IpcSender<Result<TaskOutput>>, -) -> (Jobserver, ipc::IpcReceiver<Message>) { - let child = |(mut jobserver, channel): (Jobserver, ipc::IpcReceiver<Message>)| { - drop(channel); + socket: UnixSeqpacketConn, + request_socket: UnixStream, +) -> (Jobserver, UnixSeqpacketConn) { + let child = |(mut jobserver, socket): (Jobserver, UnixSeqpacketConn)| { + drop(socket); unsafe { signal::signal(signal::Signal::SIGCHLD, signal::SigHandler::SigDfl) }.unwrap(); + let task: Task = + bincode::deserialize_from(&request_socket).expect("Failed to decode task description"); + prctl::set_name(&task.label).expect("prctl(PR_SET_NAME)"); let token = jobserver.wait(); let (pid, mut jobserver) = unsafe { clone::spawn(None, jobserver, |jobserver| { let result = task::handle(task, jobserver); - reply_channel.send(result).expect("IPC send() failed"); + bincode::serialize_into(&request_socket, &result) + .expect("Failed to send task result"); + drop(request_socket); }) } .expect("fork()"); @@ -59,35 +62,31 @@ fn handle_request( wait_res.expect("waitpid()"); }; - unsafe { clone::spawn(None, (jobserver, channel), child) } + unsafe { clone::spawn(None, (jobserver, socket), child) } .expect("fork()") .1 } -fn runner_loop(mut channel: ipc::IpcReceiver<Message>, options: &Options) { +fn runner_loop(mut socket: UnixSeqpacketConn, options: &Options) { let jobs = options .jobs .unwrap_or_else(|| unix::nproc().expect("Failed to get number of available CPUs")); let mut jobserver = Jobserver::new(jobs).expect("Failed to initialize jobserver pipe"); - while let Ok(msg) = channel.recv() { - match msg { - Message::Request(task, reply_channel) => { - let ret = handle_request(jobserver, channel, task, reply_channel); - jobserver = ret.0; - channel = ret.1; - } - } + let mut fd = 0; + + while let Ok((1, _, n_fd)) = socket.recv_fds(&mut [0], slice::from_mut(&mut fd)) { + assert!(n_fd == 1); + + let request_socket = unsafe { UnixStream::from_raw_fd(fd) }; + + let ret = handle_request(jobserver, socket, request_socket); + jobserver = ret.0; + socket = ret.1; } } -fn runner( - uid: Uid, - gid: Gid, - channel: ipc::IpcReceiver<Message>, - _lockfile: File, - options: &Options, -) { +fn runner(uid: Uid, gid: Gid, socket: UnixSeqpacketConn, _lockfile: File, options: &Options) { ns::mount_proc(); ns::setup_userns(Uid::from_raw(0), Gid::from_raw(0), uid, gid); @@ -98,7 +97,7 @@ fn runner( let msg_handler = unsafe { clone::spawn(None, (), |()| { signal::signal(signal::Signal::SIGCHLD, signal::SigHandler::SigIgn).unwrap(); - runner_loop(channel, options); + runner_loop(socket, options); }) } .expect("fork()") @@ -113,7 +112,7 @@ fn runner( } pub struct Runner { - channel: ipc::IpcSender<Message>, + socket: UnixSeqpacketConn, } impl Runner { @@ -129,29 +128,36 @@ impl Runner { let uid = unistd::geteuid(); let gid = unistd::getegid(); - let (tx, rx) = ipc::channel().expect("IPC channel creation failed"); + let (local, remote) = UnixSeqpacketConn::pair().expect("socketpair()"); - let (tx, _rx) = clone::spawn( + let (local, _remote) = clone::spawn( Some(CloneFlags::CLONE_NEWUSER | CloneFlags::CLONE_NEWNS | CloneFlags::CLONE_NEWPID), - (tx, rx), - |(tx, rx)| { - drop(tx); - runner(uid, gid, rx, lockfile, options); + (local, remote), + |(local, remote)| { + drop(local); + runner(uid, gid, remote, lockfile, options); }, ) .expect("clone()") .1; - Ok(Runner { channel: tx }) + Ok(Runner { socket: local }) } - pub fn spawn(&self, task: &Task) -> ipc::IpcReceiver<Result<TaskOutput>> { - let (reply_tx, reply_rx) = ipc::channel().expect("IPC channel creation failed"); + pub fn spawn(&self, task: &Task) -> UnixStream { + let (local, remote) = UnixStream::pair().expect("socketpair()"); + + self.socket + .send_fds(&[0], &[remote.as_raw_fd()]) + .expect("send()"); - self.channel - .send(Message::Request(task.clone(), reply_tx)) - .expect("ContainerRunner task submission failed"); + bincode::serialize_into(&local, task).expect("Task submission failed"); + local.shutdown(net::Shutdown::Write).expect("shutdown()"); + + local + } - reply_rx + pub fn result(socket: &UnixStream) -> Result<TaskOutput> { + bincode::deserialize_from(socket).expect("Failed to read task result") } } |