diff options
author | Matthias Schiffer <mschiffer@universe-factory.net> | 2021-10-26 18:43:51 +0200 |
---|---|---|
committer | Matthias Schiffer <mschiffer@universe-factory.net> | 2021-10-26 19:03:17 +0200 |
commit | 5d93586e6def88752323d6dc91443a24a5b7e0f0 (patch) | |
tree | c2104054006a0b340f23adc3874c22b04d6c1a9e | |
parent | 8bebe4c76107d8b0a55fda238b0475469d374d77 (diff) | |
download | rebel-5d93586e6def88752323d6dc91443a24a5b7e0f0.tar rebel-5d93586e6def88752323d6dc91443a24a5b7e0f0.zip |
Replace ipc-channel with UNIX sockets and manual FD passing
The new implementation is more flexible, as it allows mixed polling with
other FD types, and it saves us a whole zoo of dependencies.
-rw-r--r-- | Cargo.lock | 345 | ||||
-rw-r--r-- | crates/executor/Cargo.toml | 4 | ||||
-rw-r--r-- | crates/executor/src/executor.rs | 83 | ||||
-rw-r--r-- | crates/runner/Cargo.toml | 3 | ||||
-rw-r--r-- | crates/runner/src/lib.rs | 100 |
5 files changed, 113 insertions, 422 deletions
@@ -31,7 +31,7 @@ checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" dependencies = [ "hermit-abi", "libc", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -64,7 +64,7 @@ dependencies = [ "arrayref", "arrayvec", "cc", - "cfg-if 1.0.0", + "cfg-if", "constant_time_eq", "crypto-mac", "digest 0.9.0", @@ -121,12 +121,6 @@ checksum = "79c2681d6594606957bbb8631c4b90a7fcaaa72cdb714743a437b156d6a7eedd" [[package]] name = "cfg-if" -version = "0.1.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" - -[[package]] -name = "cfg-if" version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" @@ -169,27 +163,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc" [[package]] -name = "crossbeam-channel" -version = "0.4.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b153fe7cbef478c567df0f972e02e6d736db11affe43dfc9c56a9374d1adfb87" -dependencies = [ - "crossbeam-utils", - "maybe-uninit", -] - -[[package]] -name = "crossbeam-utils" -version = "0.7.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8" -dependencies = [ - "autocfg", - "cfg-if 0.1.10", - "lazy_static", -] - -[[package]] name = "crypto-mac" version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -246,35 +219,13 @@ version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "975ccf83d8d9d0d84682850a38c8169027be83368805971cc4f238c2b245bc98" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "libc", "redox_syscall", - "winapi 0.3.9", + "winapi", ] [[package]] -name = "fnv" -version = "1.0.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" - -[[package]] -name = "fuchsia-zircon" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82" -dependencies = [ - "bitflags", - "fuchsia-zircon-sys", -] - -[[package]] -name = "fuchsia-zircon-sys" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" - -[[package]] name = "generic-array" version = "0.12.4" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -294,28 +245,6 @@ dependencies = [ ] [[package]] -name = "getrandom" -version = "0.1.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8fc3cb4d91f53b50155bdcfd23f6a4c39ae1969c2ae85982b135750cccaf5fce" -dependencies = [ - "cfg-if 1.0.0", - "libc", - "wasi 0.9.0+wasi-snapshot-preview1", -] - -[[package]] -name = "getrandom" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fcd999463524c52659517fe2cea98493cfe485d10565e7b0fb07dbba7ad2753" -dependencies = [ - "cfg-if 1.0.0", - "libc", - "wasi 0.10.2+wasi-snapshot-preview1", -] - -[[package]] name = "handlebars" version = "4.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -382,49 +311,12 @@ dependencies = [ ] [[package]] -name = "iovec" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e" -dependencies = [ - "libc", -] - -[[package]] -name = "ipc-channel" -version = "0.15.0" -source = "git+https://github.com/servo/ipc-channel.git#483439c09c94e097577042a6ffc37931c2b15c5f" -dependencies = [ - "bincode", - "crossbeam-channel", - "fnv", - "lazy_static", - "libc", - "mio", - "rand 0.7.3", - "serde", - "tempfile", - "uuid", - "winapi 0.3.9", -] - -[[package]] name = "itoa" version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4" [[package]] -name = "kernel32-sys" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d" -dependencies = [ - "winapi 0.2.8", - "winapi-build", -] - -[[package]] name = "lazy_static" version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -448,7 +340,7 @@ version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", ] [[package]] @@ -458,12 +350,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d" [[package]] -name = "maybe-uninit" -version = "2.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00" - -[[package]] name = "memchr" version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -479,48 +365,6 @@ dependencies = [ ] [[package]] -name = "mio" -version = "0.6.23" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4afd66f5b91bf2a3bc13fad0e21caedac168ca4c707504e75585648ae80e4cc4" -dependencies = [ - "cfg-if 0.1.10", - "fuchsia-zircon", - "fuchsia-zircon-sys", - "iovec", - "kernel32-sys", - "libc", - "log", - "miow", - "net2", - "slab", - "winapi 0.2.8", -] - -[[package]] -name = "miow" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ebd808424166322d4a38da87083bfddd3ac4c131334ed55856112eb06d46944d" -dependencies = [ - "kernel32-sys", - "net2", - "winapi 0.2.8", - "ws2_32-sys", -] - -[[package]] -name = "net2" -version = "0.2.37" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "391630d12b68002ae1e25e8f974306474966550ad82dac6886fb8910c19568ae" -dependencies = [ - "cfg-if 0.1.10", - "libc", - "winapi 0.3.9", -] - -[[package]] name = "nix" version = "0.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -528,7 +372,7 @@ checksum = "f305c2c2e4c39a82f7bf0bf65fb557f9070ce06781d4f2454295cc34b1c43188" dependencies = [ "bitflags", "cc", - "cfg-if 1.0.0", + "cfg-if", "libc", "memoffset", ] @@ -603,12 +447,6 @@ dependencies = [ ] [[package]] -name = "ppv-lite86" -version = "0.2.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed0cfbc8191465bed66e1718596ee0b0b35d5ee1f41c5df2189d0fe8bde535ba" - -[[package]] name = "proc-macro-error" version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -657,87 +495,6 @@ dependencies = [ ] [[package]] -name = "rand" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03" -dependencies = [ - "getrandom 0.1.16", - "libc", - "rand_chacha 0.2.2", - "rand_core 0.5.1", - "rand_hc 0.2.0", -] - -[[package]] -name = "rand" -version = "0.8.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e7573632e6454cf6b99d7aac4ccca54be06da05aca2ef7423d22d27d4d4bcd8" -dependencies = [ - "libc", - "rand_chacha 0.3.1", - "rand_core 0.6.3", - "rand_hc 0.3.1", -] - -[[package]] -name = "rand_chacha" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402" -dependencies = [ - "ppv-lite86", - "rand_core 0.5.1", -] - -[[package]] -name = "rand_chacha" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" -dependencies = [ - "ppv-lite86", - "rand_core 0.6.3", -] - -[[package]] -name = "rand_core" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19" -dependencies = [ - "getrandom 0.1.16", -] - -[[package]] -name = "rand_core" -version = "0.6.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d34f1408f55294453790c48b2f1ebbb1c5b4b7563eb1f418bcfcfdbb06ebb4e7" -dependencies = [ - "getrandom 0.2.3", -] - -[[package]] -name = "rand_hc" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c" -dependencies = [ - "rand_core 0.5.1", -] - -[[package]] -name = "rand_hc" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d51e9f596de227fda2ea6c84607f5558e196eeaf43c986b724ba4fb8fdf497e7" -dependencies = [ - "rand_core 0.6.3", -] - -[[package]] name = "rebel" version = "0.1.0" dependencies = [ @@ -745,8 +502,8 @@ dependencies = [ "enum-kinds", "handlebars", "indoc", - "ipc-channel", "lazy_static", + "nix", "rebel-common", "rebel-runner", "regex", @@ -768,10 +525,10 @@ dependencies = [ name = "rebel-runner" version = "0.1.0" dependencies = [ + "bincode", "blake3", "capctl", "digest 0.9.0", - "ipc-channel", "libc", "nix", "olpc-cjson", @@ -780,6 +537,7 @@ dependencies = [ "serde_json", "tar", "tee_readwrite", + "uds", ] [[package]] @@ -809,15 +567,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b" [[package]] -name = "remove_dir_all" -version = "0.5.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7" -dependencies = [ - "winapi 0.3.9", -] - -[[package]] name = "ryu" version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -894,12 +643,6 @@ dependencies = [ ] [[package]] -name = "slab" -version = "0.4.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9def91fd1e018fe007022791f865d0ccc9b3a0d5001e01aabb8b40e46000afb5" - -[[package]] name = "strsim" version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -940,20 +683,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0642ac8a287350e5bd14ac50e7a137d49432ae625a1ae83668323ef3ca466af8" [[package]] -name = "tempfile" -version = "3.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dac1c663cfc93810f88aed9b8941d48cabf856a1b111c29a40439018d870eb22" -dependencies = [ - "cfg-if 1.0.0", - "libc", - "rand 0.8.4", - "redox_syscall", - "remove_dir_all", - "winapi 0.3.9", -] - -[[package]] name = "termcolor" version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -999,6 +728,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56dee185309b50d1f11bfedef0fe6d036842e3fb77413abef29f8f8d1c5d4c1c" [[package]] +name = "uds" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "343758ccc8a17c1663182d780f68b52021d68b9a43d4b912b0a01f48b526e4f0" +dependencies = [ + "libc", +] + +[[package]] name = "unicase" version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1041,15 +779,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f14ee04d9415b52b3aeab06258a3f07093182b88ba0f9b8d203f211a7a7d41c7" [[package]] -name = "uuid" -version = "0.8.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7" -dependencies = [ - "getrandom 0.2.3", -] - -[[package]] name = "version_check" version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1062,29 +791,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "808cf2735cd4b6866113f648b791c6adc5714537bc222d9347bb203386ffda56" dependencies = [ "same-file", - "winapi 0.3.9", + "winapi", "winapi-util", ] [[package]] -name = "wasi" -version = "0.9.0+wasi-snapshot-preview1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" - -[[package]] -name = "wasi" -version = "0.10.2+wasi-snapshot-preview1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6" - -[[package]] -name = "winapi" -version = "0.2.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a" - -[[package]] name = "winapi" version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1095,12 +806,6 @@ dependencies = [ ] [[package]] -name = "winapi-build" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc" - -[[package]] name = "winapi-i686-pc-windows-gnu" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1112,7 +817,7 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" dependencies = [ - "winapi 0.3.9", + "winapi", ] [[package]] @@ -1122,16 +827,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" [[package]] -name = "ws2_32-sys" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d59cefebd0c892fa2dd6de581e937301d8552cb44489cdff035c6187cb63fa5e" -dependencies = [ - "winapi 0.2.8", - "winapi-build", -] - -[[package]] name = "xattr" version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" diff --git a/crates/executor/Cargo.toml b/crates/executor/Cargo.toml index 821caa7..215c451 100644 --- a/crates/executor/Cargo.toml +++ b/crates/executor/Cargo.toml @@ -15,10 +15,10 @@ clap = "3.0.0-beta.2" enum-kinds = "0.5.1" handlebars = "4.1.3" indoc = "1.0.3" -ipc-channel = { git = "https://github.com/servo/ipc-channel.git" } lazy_static = "1.4.0" +nix = "0.23.0" regex = "1.5.4" scoped-tls-hkt = "0.1.2" -serde = { version = "1", features = ["derive"] } +serde = { version = "1", features = ["derive", "rc"] } serde_yaml = "0.8" walkdir = "2" diff --git a/crates/executor/src/executor.rs b/crates/executor/src/executor.rs index 1d6ee44..ad560e9 100644 --- a/crates/executor/src/executor.rs +++ b/crates/executor/src/executor.rs @@ -1,7 +1,10 @@ -use std::collections::{HashMap, HashSet}; +use std::{ + collections::{HashMap, HashSet}, + os::unix::{net::UnixStream, prelude::*}, +}; use indoc::indoc; -use ipc_channel::ipc; +use nix::poll; use common::{error::*, string_hash::*, types::*}; use runner::{paths, Runner}; @@ -15,10 +18,9 @@ use crate::{ pub struct Executor<'ctx> { ctx: &'ctx Context, - receiver_set: ipc::IpcReceiverSet, tasks_blocked: HashSet<TaskRef<'ctx>>, tasks_runnable: Vec<TaskRef<'ctx>>, - tasks_running: HashMap<u64, TaskRef<'ctx>>, + tasks_running: HashMap<RawFd, (UnixStream, TaskRef<'ctx>)>, tasks_done: HashMap<TaskRef<'ctx>, TaskOutput>, rdeps: HashMap<TaskRef<'ctx>, Vec<TaskRef<'ctx>>>, tpl: TemplateEngine, @@ -28,7 +30,6 @@ impl<'ctx> Executor<'ctx> { pub fn new(ctx: &'ctx Context, taskset: HashSet<TaskRef<'ctx>>) -> Result<Self> { let mut exc = Executor { ctx, - receiver_set: ipc::IpcReceiverSet::new().expect("IpcReceiverSet::new()"), tasks_blocked: HashSet::new(), tasks_runnable: Vec::new(), tasks_running: HashMap::new(), @@ -205,11 +206,7 @@ impl<'ctx> Executor<'ctx> { ret } - fn spawn_one( - &self, - task_ref: &TaskRef<'ctx>, - runner: &Runner, - ) -> Result<ipc::IpcReceiver<Result<TaskOutput>>> { + fn spawn_one(&self, task_ref: &TaskRef<'ctx>, runner: &Runner) -> Result<UnixStream> { let task_def = &self.ctx[task_ref.id]; let task_deps = self.task_deps(task_ref)?; let task_output = task_def @@ -248,12 +245,11 @@ impl<'ctx> Executor<'ctx> { fn run_tasks(&mut self, runner: &Runner) -> Result<()> { while let Some(task_ref) = self.tasks_runnable.pop() { - let channel = self.spawn_one(&task_ref, runner)?; - let id = self - .receiver_set - .add(channel) - .expect("Failed to add channel to receiver set"); - self.tasks_running.insert(id, task_ref); + let socket = self.spawn_one(&task_ref, runner)?; + assert!(self + .tasks_running + .insert(socket.as_raw_fd(), (socket, task_ref)) + .is_none()); } Ok(()) @@ -274,39 +270,32 @@ impl<'ctx> Executor<'ctx> { } fn wait_for_task(&mut self) -> Result<()> { - let mut progress = false; - - while !progress { - let events = self - .receiver_set - .select() - .expect("Failed to get messages from receiver set"); - for event in events { - match event { - ipc::IpcSelectionResult::MessageReceived(id, msg) => { - let task_ref = self - .tasks_running - .remove(&id) - .expect("Received message for unknown task"); - let task_output = msg - .to::<Result<TaskOutput>>() - .expect("Failed to decode message from runner")?; - - self.tasks_done.insert(task_ref.clone(), task_output); - self.update_runnable(&task_ref); - - progress = true; - } - ipc::IpcSelectionResult::ChannelClosed(id) => { - if let Some(task) = self.tasks_running.remove(&id) { - return Err(Error::new(format!( - "Unexpectedly got no result for task {:#}", - task - ))); - } - } + let mut pollfds: Box<[_]> = self + .tasks_running + .keys() + .copied() + .map(|fd| poll::PollFd::new(fd, poll::PollFlags::POLLIN)) + .collect(); + + while poll::poll(&mut pollfds, -1).context("poll()")? == 0 {} + + for pollfd in &*pollfds { + let events = pollfd.revents().expect("Unknown events in poll() return"); + if !events.contains(poll::PollFlags::POLLIN) { + if events.intersects(!poll::PollFlags::POLLIN) { + return Err(Error::new( + "Unexpected error status for socket file descriptor", + )); } + continue; } + + let fd = pollfd.as_raw_fd(); + let (socket, task_ref) = self.tasks_running.remove(&fd).unwrap(); + + let task_output = Runner::result(&socket)?; + self.tasks_done.insert(task_ref.clone(), task_output); + self.update_runnable(&task_ref); } Ok(()) diff --git a/crates/runner/Cargo.toml b/crates/runner/Cargo.toml index 5d51739..d8fe562 100644 --- a/crates/runner/Cargo.toml +++ b/crates/runner/Cargo.toml @@ -13,7 +13,6 @@ common = { path = "../common", package = "rebel-common" } blake3 = { version = "1.0.0", features = ["traits-preview"] } capctl = "0.2.0" digest = "0.9.0" -ipc-channel = { git = "https://github.com/servo/ipc-channel.git" } libc = "0.2.84" nix = "0.23.0" olpc-cjson = "0.1.0" @@ -21,3 +20,5 @@ serde = { version = "1", features = ["derive"] } serde_json = "1.0.62" tar = "0.4.32" tee_readwrite = "0.1.0" +uds = "0.2.6" +bincode = "1.3.3" diff --git a/crates/runner/src/lib.rs b/crates/runner/src/lib.rs index 21289cf..bf052ac 100644 --- a/crates/runner/src/lib.rs +++ b/crates/runner/src/lib.rs @@ -6,16 +6,20 @@ mod tar; mod task; mod util; -use std::fs::File; +use std::{ + fs::File, + net, + os::unix::{net::UnixStream, prelude::*}, + slice, +}; use capctl::prctl; -use ipc_channel::ipc; use nix::{ sched::CloneFlags, sys::{signal, stat, wait}, unistd::{self, Gid, Uid}, }; -use serde::{Deserialize, Serialize}; +use uds::UnixSeqpacketConn; use common::{error::*, types::*}; @@ -29,28 +33,27 @@ pub struct Options { pub jobs: Option<usize>, } -#[derive(Debug, Deserialize, Serialize)] -enum Message { - Request(Task, ipc::IpcSender<Result<TaskOutput>>), -} - fn handle_request( jobserver: Jobserver, - channel: ipc::IpcReceiver<Message>, - task: Task, - reply_channel: ipc::IpcSender<Result<TaskOutput>>, -) -> (Jobserver, ipc::IpcReceiver<Message>) { - let child = |(mut jobserver, channel): (Jobserver, ipc::IpcReceiver<Message>)| { - drop(channel); + socket: UnixSeqpacketConn, + request_socket: UnixStream, +) -> (Jobserver, UnixSeqpacketConn) { + let child = |(mut jobserver, socket): (Jobserver, UnixSeqpacketConn)| { + drop(socket); unsafe { signal::signal(signal::Signal::SIGCHLD, signal::SigHandler::SigDfl) }.unwrap(); + let task: Task = + bincode::deserialize_from(&request_socket).expect("Failed to decode task description"); + prctl::set_name(&task.label).expect("prctl(PR_SET_NAME)"); let token = jobserver.wait(); let (pid, mut jobserver) = unsafe { clone::spawn(None, jobserver, |jobserver| { let result = task::handle(task, jobserver); - reply_channel.send(result).expect("IPC send() failed"); + bincode::serialize_into(&request_socket, &result) + .expect("Failed to send task result"); + drop(request_socket); }) } .expect("fork()"); @@ -59,35 +62,31 @@ fn handle_request( wait_res.expect("waitpid()"); }; - unsafe { clone::spawn(None, (jobserver, channel), child) } + unsafe { clone::spawn(None, (jobserver, socket), child) } .expect("fork()") .1 } -fn runner_loop(mut channel: ipc::IpcReceiver<Message>, options: &Options) { +fn runner_loop(mut socket: UnixSeqpacketConn, options: &Options) { let jobs = options .jobs .unwrap_or_else(|| unix::nproc().expect("Failed to get number of available CPUs")); let mut jobserver = Jobserver::new(jobs).expect("Failed to initialize jobserver pipe"); - while let Ok(msg) = channel.recv() { - match msg { - Message::Request(task, reply_channel) => { - let ret = handle_request(jobserver, channel, task, reply_channel); - jobserver = ret.0; - channel = ret.1; - } - } + let mut fd = 0; + + while let Ok((1, _, n_fd)) = socket.recv_fds(&mut [0], slice::from_mut(&mut fd)) { + assert!(n_fd == 1); + + let request_socket = unsafe { UnixStream::from_raw_fd(fd) }; + + let ret = handle_request(jobserver, socket, request_socket); + jobserver = ret.0; + socket = ret.1; } } -fn runner( - uid: Uid, - gid: Gid, - channel: ipc::IpcReceiver<Message>, - _lockfile: File, - options: &Options, -) { +fn runner(uid: Uid, gid: Gid, socket: UnixSeqpacketConn, _lockfile: File, options: &Options) { ns::mount_proc(); ns::setup_userns(Uid::from_raw(0), Gid::from_raw(0), uid, gid); @@ -98,7 +97,7 @@ fn runner( let msg_handler = unsafe { clone::spawn(None, (), |()| { signal::signal(signal::Signal::SIGCHLD, signal::SigHandler::SigIgn).unwrap(); - runner_loop(channel, options); + runner_loop(socket, options); }) } .expect("fork()") @@ -113,7 +112,7 @@ fn runner( } pub struct Runner { - channel: ipc::IpcSender<Message>, + socket: UnixSeqpacketConn, } impl Runner { @@ -129,29 +128,36 @@ impl Runner { let uid = unistd::geteuid(); let gid = unistd::getegid(); - let (tx, rx) = ipc::channel().expect("IPC channel creation failed"); + let (local, remote) = UnixSeqpacketConn::pair().expect("socketpair()"); - let (tx, _rx) = clone::spawn( + let (local, _remote) = clone::spawn( Some(CloneFlags::CLONE_NEWUSER | CloneFlags::CLONE_NEWNS | CloneFlags::CLONE_NEWPID), - (tx, rx), - |(tx, rx)| { - drop(tx); - runner(uid, gid, rx, lockfile, options); + (local, remote), + |(local, remote)| { + drop(local); + runner(uid, gid, remote, lockfile, options); }, ) .expect("clone()") .1; - Ok(Runner { channel: tx }) + Ok(Runner { socket: local }) } - pub fn spawn(&self, task: &Task) -> ipc::IpcReceiver<Result<TaskOutput>> { - let (reply_tx, reply_rx) = ipc::channel().expect("IPC channel creation failed"); + pub fn spawn(&self, task: &Task) -> UnixStream { + let (local, remote) = UnixStream::pair().expect("socketpair()"); + + self.socket + .send_fds(&[0], &[remote.as_raw_fd()]) + .expect("send()"); - self.channel - .send(Message::Request(task.clone(), reply_tx)) - .expect("ContainerRunner task submission failed"); + bincode::serialize_into(&local, task).expect("Task submission failed"); + local.shutdown(net::Shutdown::Write).expect("shutdown()"); + + local + } - reply_rx + pub fn result(socket: &UnixStream) -> Result<TaskOutput> { + bincode::deserialize_from(socket).expect("Failed to read task result") } } |