summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Schiffer <mschiffer@universe-factory.net>2021-10-26 18:43:51 +0200
committerMatthias Schiffer <mschiffer@universe-factory.net>2021-10-26 19:03:17 +0200
commit5d93586e6def88752323d6dc91443a24a5b7e0f0 (patch)
treec2104054006a0b340f23adc3874c22b04d6c1a9e
parent8bebe4c76107d8b0a55fda238b0475469d374d77 (diff)
downloadrebel-5d93586e6def88752323d6dc91443a24a5b7e0f0.tar
rebel-5d93586e6def88752323d6dc91443a24a5b7e0f0.zip
Replace ipc-channel with UNIX sockets and manual FD passing
The new implementation is more flexible, as it allows mixed polling with other FD types, and it saves us a whole zoo of dependencies.
-rw-r--r--Cargo.lock345
-rw-r--r--crates/executor/Cargo.toml4
-rw-r--r--crates/executor/src/executor.rs83
-rw-r--r--crates/runner/Cargo.toml3
-rw-r--r--crates/runner/src/lib.rs100
5 files changed, 113 insertions, 422 deletions
diff --git a/Cargo.lock b/Cargo.lock
index ad5a21c..356c0dd 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -31,7 +31,7 @@ checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8"
dependencies = [
"hermit-abi",
"libc",
- "winapi 0.3.9",
+ "winapi",
]
[[package]]
@@ -64,7 +64,7 @@ dependencies = [
"arrayref",
"arrayvec",
"cc",
- "cfg-if 1.0.0",
+ "cfg-if",
"constant_time_eq",
"crypto-mac",
"digest 0.9.0",
@@ -121,12 +121,6 @@ checksum = "79c2681d6594606957bbb8631c4b90a7fcaaa72cdb714743a437b156d6a7eedd"
[[package]]
name = "cfg-if"
-version = "0.1.10"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"
-
-[[package]]
-name = "cfg-if"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
@@ -169,27 +163,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc"
[[package]]
-name = "crossbeam-channel"
-version = "0.4.4"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b153fe7cbef478c567df0f972e02e6d736db11affe43dfc9c56a9374d1adfb87"
-dependencies = [
- "crossbeam-utils",
- "maybe-uninit",
-]
-
-[[package]]
-name = "crossbeam-utils"
-version = "0.7.2"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8"
-dependencies = [
- "autocfg",
- "cfg-if 0.1.10",
- "lazy_static",
-]
-
-[[package]]
name = "crypto-mac"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -246,35 +219,13 @@ version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "975ccf83d8d9d0d84682850a38c8169027be83368805971cc4f238c2b245bc98"
dependencies = [
- "cfg-if 1.0.0",
+ "cfg-if",
"libc",
"redox_syscall",
- "winapi 0.3.9",
+ "winapi",
]
[[package]]
-name = "fnv"
-version = "1.0.7"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
-
-[[package]]
-name = "fuchsia-zircon"
-version = "0.3.3"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82"
-dependencies = [
- "bitflags",
- "fuchsia-zircon-sys",
-]
-
-[[package]]
-name = "fuchsia-zircon-sys"
-version = "0.3.3"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7"
-
-[[package]]
name = "generic-array"
version = "0.12.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -294,28 +245,6 @@ dependencies = [
]
[[package]]
-name = "getrandom"
-version = "0.1.16"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8fc3cb4d91f53b50155bdcfd23f6a4c39ae1969c2ae85982b135750cccaf5fce"
-dependencies = [
- "cfg-if 1.0.0",
- "libc",
- "wasi 0.9.0+wasi-snapshot-preview1",
-]
-
-[[package]]
-name = "getrandom"
-version = "0.2.3"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7fcd999463524c52659517fe2cea98493cfe485d10565e7b0fb07dbba7ad2753"
-dependencies = [
- "cfg-if 1.0.0",
- "libc",
- "wasi 0.10.2+wasi-snapshot-preview1",
-]
-
-[[package]]
name = "handlebars"
version = "4.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -382,49 +311,12 @@ dependencies = [
]
[[package]]
-name = "iovec"
-version = "0.1.4"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e"
-dependencies = [
- "libc",
-]
-
-[[package]]
-name = "ipc-channel"
-version = "0.15.0"
-source = "git+https://github.com/servo/ipc-channel.git#483439c09c94e097577042a6ffc37931c2b15c5f"
-dependencies = [
- "bincode",
- "crossbeam-channel",
- "fnv",
- "lazy_static",
- "libc",
- "mio",
- "rand 0.7.3",
- "serde",
- "tempfile",
- "uuid",
- "winapi 0.3.9",
-]
-
-[[package]]
name = "itoa"
version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4"
[[package]]
-name = "kernel32-sys"
-version = "0.2.2"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d"
-dependencies = [
- "winapi 0.2.8",
- "winapi-build",
-]
-
-[[package]]
name = "lazy_static"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -448,7 +340,7 @@ version = "0.4.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710"
dependencies = [
- "cfg-if 1.0.0",
+ "cfg-if",
]
[[package]]
@@ -458,12 +350,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d"
[[package]]
-name = "maybe-uninit"
-version = "2.0.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00"
-
-[[package]]
name = "memchr"
version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -479,48 +365,6 @@ dependencies = [
]
[[package]]
-name = "mio"
-version = "0.6.23"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4afd66f5b91bf2a3bc13fad0e21caedac168ca4c707504e75585648ae80e4cc4"
-dependencies = [
- "cfg-if 0.1.10",
- "fuchsia-zircon",
- "fuchsia-zircon-sys",
- "iovec",
- "kernel32-sys",
- "libc",
- "log",
- "miow",
- "net2",
- "slab",
- "winapi 0.2.8",
-]
-
-[[package]]
-name = "miow"
-version = "0.2.2"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ebd808424166322d4a38da87083bfddd3ac4c131334ed55856112eb06d46944d"
-dependencies = [
- "kernel32-sys",
- "net2",
- "winapi 0.2.8",
- "ws2_32-sys",
-]
-
-[[package]]
-name = "net2"
-version = "0.2.37"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "391630d12b68002ae1e25e8f974306474966550ad82dac6886fb8910c19568ae"
-dependencies = [
- "cfg-if 0.1.10",
- "libc",
- "winapi 0.3.9",
-]
-
-[[package]]
name = "nix"
version = "0.23.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -528,7 +372,7 @@ checksum = "f305c2c2e4c39a82f7bf0bf65fb557f9070ce06781d4f2454295cc34b1c43188"
dependencies = [
"bitflags",
"cc",
- "cfg-if 1.0.0",
+ "cfg-if",
"libc",
"memoffset",
]
@@ -603,12 +447,6 @@ dependencies = [
]
[[package]]
-name = "ppv-lite86"
-version = "0.2.15"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ed0cfbc8191465bed66e1718596ee0b0b35d5ee1f41c5df2189d0fe8bde535ba"
-
-[[package]]
name = "proc-macro-error"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -657,87 +495,6 @@ dependencies = [
]
[[package]]
-name = "rand"
-version = "0.7.3"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03"
-dependencies = [
- "getrandom 0.1.16",
- "libc",
- "rand_chacha 0.2.2",
- "rand_core 0.5.1",
- "rand_hc 0.2.0",
-]
-
-[[package]]
-name = "rand"
-version = "0.8.4"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2e7573632e6454cf6b99d7aac4ccca54be06da05aca2ef7423d22d27d4d4bcd8"
-dependencies = [
- "libc",
- "rand_chacha 0.3.1",
- "rand_core 0.6.3",
- "rand_hc 0.3.1",
-]
-
-[[package]]
-name = "rand_chacha"
-version = "0.2.2"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402"
-dependencies = [
- "ppv-lite86",
- "rand_core 0.5.1",
-]
-
-[[package]]
-name = "rand_chacha"
-version = "0.3.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
-dependencies = [
- "ppv-lite86",
- "rand_core 0.6.3",
-]
-
-[[package]]
-name = "rand_core"
-version = "0.5.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19"
-dependencies = [
- "getrandom 0.1.16",
-]
-
-[[package]]
-name = "rand_core"
-version = "0.6.3"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d34f1408f55294453790c48b2f1ebbb1c5b4b7563eb1f418bcfcfdbb06ebb4e7"
-dependencies = [
- "getrandom 0.2.3",
-]
-
-[[package]]
-name = "rand_hc"
-version = "0.2.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c"
-dependencies = [
- "rand_core 0.5.1",
-]
-
-[[package]]
-name = "rand_hc"
-version = "0.3.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d51e9f596de227fda2ea6c84607f5558e196eeaf43c986b724ba4fb8fdf497e7"
-dependencies = [
- "rand_core 0.6.3",
-]
-
-[[package]]
name = "rebel"
version = "0.1.0"
dependencies = [
@@ -745,8 +502,8 @@ dependencies = [
"enum-kinds",
"handlebars",
"indoc",
- "ipc-channel",
"lazy_static",
+ "nix",
"rebel-common",
"rebel-runner",
"regex",
@@ -768,10 +525,10 @@ dependencies = [
name = "rebel-runner"
version = "0.1.0"
dependencies = [
+ "bincode",
"blake3",
"capctl",
"digest 0.9.0",
- "ipc-channel",
"libc",
"nix",
"olpc-cjson",
@@ -780,6 +537,7 @@ dependencies = [
"serde_json",
"tar",
"tee_readwrite",
+ "uds",
]
[[package]]
@@ -809,15 +567,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b"
[[package]]
-name = "remove_dir_all"
-version = "0.5.3"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7"
-dependencies = [
- "winapi 0.3.9",
-]
-
-[[package]]
name = "ryu"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -894,12 +643,6 @@ dependencies = [
]
[[package]]
-name = "slab"
-version = "0.4.5"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9def91fd1e018fe007022791f865d0ccc9b3a0d5001e01aabb8b40e46000afb5"
-
-[[package]]
name = "strsim"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -940,20 +683,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0642ac8a287350e5bd14ac50e7a137d49432ae625a1ae83668323ef3ca466af8"
[[package]]
-name = "tempfile"
-version = "3.2.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "dac1c663cfc93810f88aed9b8941d48cabf856a1b111c29a40439018d870eb22"
-dependencies = [
- "cfg-if 1.0.0",
- "libc",
- "rand 0.8.4",
- "redox_syscall",
- "remove_dir_all",
- "winapi 0.3.9",
-]
-
-[[package]]
name = "termcolor"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -999,6 +728,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56dee185309b50d1f11bfedef0fe6d036842e3fb77413abef29f8f8d1c5d4c1c"
[[package]]
+name = "uds"
+version = "0.2.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "343758ccc8a17c1663182d780f68b52021d68b9a43d4b912b0a01f48b526e4f0"
+dependencies = [
+ "libc",
+]
+
+[[package]]
name = "unicase"
version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1041,15 +779,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f14ee04d9415b52b3aeab06258a3f07093182b88ba0f9b8d203f211a7a7d41c7"
[[package]]
-name = "uuid"
-version = "0.8.2"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7"
-dependencies = [
- "getrandom 0.2.3",
-]
-
-[[package]]
name = "version_check"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1062,29 +791,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "808cf2735cd4b6866113f648b791c6adc5714537bc222d9347bb203386ffda56"
dependencies = [
"same-file",
- "winapi 0.3.9",
+ "winapi",
"winapi-util",
]
[[package]]
-name = "wasi"
-version = "0.9.0+wasi-snapshot-preview1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519"
-
-[[package]]
-name = "wasi"
-version = "0.10.2+wasi-snapshot-preview1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6"
-
-[[package]]
-name = "winapi"
-version = "0.2.8"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a"
-
-[[package]]
name = "winapi"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1095,12 +806,6 @@ dependencies = [
]
[[package]]
-name = "winapi-build"
-version = "0.1.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc"
-
-[[package]]
name = "winapi-i686-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1112,7 +817,7 @@ version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178"
dependencies = [
- "winapi 0.3.9",
+ "winapi",
]
[[package]]
@@ -1122,16 +827,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
-name = "ws2_32-sys"
-version = "0.2.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d59cefebd0c892fa2dd6de581e937301d8552cb44489cdff035c6187cb63fa5e"
-dependencies = [
- "winapi 0.2.8",
- "winapi-build",
-]
-
-[[package]]
name = "xattr"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/crates/executor/Cargo.toml b/crates/executor/Cargo.toml
index 821caa7..215c451 100644
--- a/crates/executor/Cargo.toml
+++ b/crates/executor/Cargo.toml
@@ -15,10 +15,10 @@ clap = "3.0.0-beta.2"
enum-kinds = "0.5.1"
handlebars = "4.1.3"
indoc = "1.0.3"
-ipc-channel = { git = "https://github.com/servo/ipc-channel.git" }
lazy_static = "1.4.0"
+nix = "0.23.0"
regex = "1.5.4"
scoped-tls-hkt = "0.1.2"
-serde = { version = "1", features = ["derive"] }
+serde = { version = "1", features = ["derive", "rc"] }
serde_yaml = "0.8"
walkdir = "2"
diff --git a/crates/executor/src/executor.rs b/crates/executor/src/executor.rs
index 1d6ee44..ad560e9 100644
--- a/crates/executor/src/executor.rs
+++ b/crates/executor/src/executor.rs
@@ -1,7 +1,10 @@
-use std::collections::{HashMap, HashSet};
+use std::{
+ collections::{HashMap, HashSet},
+ os::unix::{net::UnixStream, prelude::*},
+};
use indoc::indoc;
-use ipc_channel::ipc;
+use nix::poll;
use common::{error::*, string_hash::*, types::*};
use runner::{paths, Runner};
@@ -15,10 +18,9 @@ use crate::{
pub struct Executor<'ctx> {
ctx: &'ctx Context,
- receiver_set: ipc::IpcReceiverSet,
tasks_blocked: HashSet<TaskRef<'ctx>>,
tasks_runnable: Vec<TaskRef<'ctx>>,
- tasks_running: HashMap<u64, TaskRef<'ctx>>,
+ tasks_running: HashMap<RawFd, (UnixStream, TaskRef<'ctx>)>,
tasks_done: HashMap<TaskRef<'ctx>, TaskOutput>,
rdeps: HashMap<TaskRef<'ctx>, Vec<TaskRef<'ctx>>>,
tpl: TemplateEngine,
@@ -28,7 +30,6 @@ impl<'ctx> Executor<'ctx> {
pub fn new(ctx: &'ctx Context, taskset: HashSet<TaskRef<'ctx>>) -> Result<Self> {
let mut exc = Executor {
ctx,
- receiver_set: ipc::IpcReceiverSet::new().expect("IpcReceiverSet::new()"),
tasks_blocked: HashSet::new(),
tasks_runnable: Vec::new(),
tasks_running: HashMap::new(),
@@ -205,11 +206,7 @@ impl<'ctx> Executor<'ctx> {
ret
}
- fn spawn_one(
- &self,
- task_ref: &TaskRef<'ctx>,
- runner: &Runner,
- ) -> Result<ipc::IpcReceiver<Result<TaskOutput>>> {
+ fn spawn_one(&self, task_ref: &TaskRef<'ctx>, runner: &Runner) -> Result<UnixStream> {
let task_def = &self.ctx[task_ref.id];
let task_deps = self.task_deps(task_ref)?;
let task_output = task_def
@@ -248,12 +245,11 @@ impl<'ctx> Executor<'ctx> {
fn run_tasks(&mut self, runner: &Runner) -> Result<()> {
while let Some(task_ref) = self.tasks_runnable.pop() {
- let channel = self.spawn_one(&task_ref, runner)?;
- let id = self
- .receiver_set
- .add(channel)
- .expect("Failed to add channel to receiver set");
- self.tasks_running.insert(id, task_ref);
+ let socket = self.spawn_one(&task_ref, runner)?;
+ assert!(self
+ .tasks_running
+ .insert(socket.as_raw_fd(), (socket, task_ref))
+ .is_none());
}
Ok(())
@@ -274,39 +270,32 @@ impl<'ctx> Executor<'ctx> {
}
fn wait_for_task(&mut self) -> Result<()> {
- let mut progress = false;
-
- while !progress {
- let events = self
- .receiver_set
- .select()
- .expect("Failed to get messages from receiver set");
- for event in events {
- match event {
- ipc::IpcSelectionResult::MessageReceived(id, msg) => {
- let task_ref = self
- .tasks_running
- .remove(&id)
- .expect("Received message for unknown task");
- let task_output = msg
- .to::<Result<TaskOutput>>()
- .expect("Failed to decode message from runner")?;
-
- self.tasks_done.insert(task_ref.clone(), task_output);
- self.update_runnable(&task_ref);
-
- progress = true;
- }
- ipc::IpcSelectionResult::ChannelClosed(id) => {
- if let Some(task) = self.tasks_running.remove(&id) {
- return Err(Error::new(format!(
- "Unexpectedly got no result for task {:#}",
- task
- )));
- }
- }
+ let mut pollfds: Box<[_]> = self
+ .tasks_running
+ .keys()
+ .copied()
+ .map(|fd| poll::PollFd::new(fd, poll::PollFlags::POLLIN))
+ .collect();
+
+ while poll::poll(&mut pollfds, -1).context("poll()")? == 0 {}
+
+ for pollfd in &*pollfds {
+ let events = pollfd.revents().expect("Unknown events in poll() return");
+ if !events.contains(poll::PollFlags::POLLIN) {
+ if events.intersects(!poll::PollFlags::POLLIN) {
+ return Err(Error::new(
+ "Unexpected error status for socket file descriptor",
+ ));
}
+ continue;
}
+
+ let fd = pollfd.as_raw_fd();
+ let (socket, task_ref) = self.tasks_running.remove(&fd).unwrap();
+
+ let task_output = Runner::result(&socket)?;
+ self.tasks_done.insert(task_ref.clone(), task_output);
+ self.update_runnable(&task_ref);
}
Ok(())
diff --git a/crates/runner/Cargo.toml b/crates/runner/Cargo.toml
index 5d51739..d8fe562 100644
--- a/crates/runner/Cargo.toml
+++ b/crates/runner/Cargo.toml
@@ -13,7 +13,6 @@ common = { path = "../common", package = "rebel-common" }
blake3 = { version = "1.0.0", features = ["traits-preview"] }
capctl = "0.2.0"
digest = "0.9.0"
-ipc-channel = { git = "https://github.com/servo/ipc-channel.git" }
libc = "0.2.84"
nix = "0.23.0"
olpc-cjson = "0.1.0"
@@ -21,3 +20,5 @@ serde = { version = "1", features = ["derive"] }
serde_json = "1.0.62"
tar = "0.4.32"
tee_readwrite = "0.1.0"
+uds = "0.2.6"
+bincode = "1.3.3"
diff --git a/crates/runner/src/lib.rs b/crates/runner/src/lib.rs
index 21289cf..bf052ac 100644
--- a/crates/runner/src/lib.rs
+++ b/crates/runner/src/lib.rs
@@ -6,16 +6,20 @@ mod tar;
mod task;
mod util;
-use std::fs::File;
+use std::{
+ fs::File,
+ net,
+ os::unix::{net::UnixStream, prelude::*},
+ slice,
+};
use capctl::prctl;
-use ipc_channel::ipc;
use nix::{
sched::CloneFlags,
sys::{signal, stat, wait},
unistd::{self, Gid, Uid},
};
-use serde::{Deserialize, Serialize};
+use uds::UnixSeqpacketConn;
use common::{error::*, types::*};
@@ -29,28 +33,27 @@ pub struct Options {
pub jobs: Option<usize>,
}
-#[derive(Debug, Deserialize, Serialize)]
-enum Message {
- Request(Task, ipc::IpcSender<Result<TaskOutput>>),
-}
-
fn handle_request(
jobserver: Jobserver,
- channel: ipc::IpcReceiver<Message>,
- task: Task,
- reply_channel: ipc::IpcSender<Result<TaskOutput>>,
-) -> (Jobserver, ipc::IpcReceiver<Message>) {
- let child = |(mut jobserver, channel): (Jobserver, ipc::IpcReceiver<Message>)| {
- drop(channel);
+ socket: UnixSeqpacketConn,
+ request_socket: UnixStream,
+) -> (Jobserver, UnixSeqpacketConn) {
+ let child = |(mut jobserver, socket): (Jobserver, UnixSeqpacketConn)| {
+ drop(socket);
unsafe { signal::signal(signal::Signal::SIGCHLD, signal::SigHandler::SigDfl) }.unwrap();
+ let task: Task =
+ bincode::deserialize_from(&request_socket).expect("Failed to decode task description");
+
prctl::set_name(&task.label).expect("prctl(PR_SET_NAME)");
let token = jobserver.wait();
let (pid, mut jobserver) = unsafe {
clone::spawn(None, jobserver, |jobserver| {
let result = task::handle(task, jobserver);
- reply_channel.send(result).expect("IPC send() failed");
+ bincode::serialize_into(&request_socket, &result)
+ .expect("Failed to send task result");
+ drop(request_socket);
})
}
.expect("fork()");
@@ -59,35 +62,31 @@ fn handle_request(
wait_res.expect("waitpid()");
};
- unsafe { clone::spawn(None, (jobserver, channel), child) }
+ unsafe { clone::spawn(None, (jobserver, socket), child) }
.expect("fork()")
.1
}
-fn runner_loop(mut channel: ipc::IpcReceiver<Message>, options: &Options) {
+fn runner_loop(mut socket: UnixSeqpacketConn, options: &Options) {
let jobs = options
.jobs
.unwrap_or_else(|| unix::nproc().expect("Failed to get number of available CPUs"));
let mut jobserver = Jobserver::new(jobs).expect("Failed to initialize jobserver pipe");
- while let Ok(msg) = channel.recv() {
- match msg {
- Message::Request(task, reply_channel) => {
- let ret = handle_request(jobserver, channel, task, reply_channel);
- jobserver = ret.0;
- channel = ret.1;
- }
- }
+ let mut fd = 0;
+
+ while let Ok((1, _, n_fd)) = socket.recv_fds(&mut [0], slice::from_mut(&mut fd)) {
+ assert!(n_fd == 1);
+
+ let request_socket = unsafe { UnixStream::from_raw_fd(fd) };
+
+ let ret = handle_request(jobserver, socket, request_socket);
+ jobserver = ret.0;
+ socket = ret.1;
}
}
-fn runner(
- uid: Uid,
- gid: Gid,
- channel: ipc::IpcReceiver<Message>,
- _lockfile: File,
- options: &Options,
-) {
+fn runner(uid: Uid, gid: Gid, socket: UnixSeqpacketConn, _lockfile: File, options: &Options) {
ns::mount_proc();
ns::setup_userns(Uid::from_raw(0), Gid::from_raw(0), uid, gid);
@@ -98,7 +97,7 @@ fn runner(
let msg_handler = unsafe {
clone::spawn(None, (), |()| {
signal::signal(signal::Signal::SIGCHLD, signal::SigHandler::SigIgn).unwrap();
- runner_loop(channel, options);
+ runner_loop(socket, options);
})
}
.expect("fork()")
@@ -113,7 +112,7 @@ fn runner(
}
pub struct Runner {
- channel: ipc::IpcSender<Message>,
+ socket: UnixSeqpacketConn,
}
impl Runner {
@@ -129,29 +128,36 @@ impl Runner {
let uid = unistd::geteuid();
let gid = unistd::getegid();
- let (tx, rx) = ipc::channel().expect("IPC channel creation failed");
+ let (local, remote) = UnixSeqpacketConn::pair().expect("socketpair()");
- let (tx, _rx) = clone::spawn(
+ let (local, _remote) = clone::spawn(
Some(CloneFlags::CLONE_NEWUSER | CloneFlags::CLONE_NEWNS | CloneFlags::CLONE_NEWPID),
- (tx, rx),
- |(tx, rx)| {
- drop(tx);
- runner(uid, gid, rx, lockfile, options);
+ (local, remote),
+ |(local, remote)| {
+ drop(local);
+ runner(uid, gid, remote, lockfile, options);
},
)
.expect("clone()")
.1;
- Ok(Runner { channel: tx })
+ Ok(Runner { socket: local })
}
- pub fn spawn(&self, task: &Task) -> ipc::IpcReceiver<Result<TaskOutput>> {
- let (reply_tx, reply_rx) = ipc::channel().expect("IPC channel creation failed");
+ pub fn spawn(&self, task: &Task) -> UnixStream {
+ let (local, remote) = UnixStream::pair().expect("socketpair()");
+
+ self.socket
+ .send_fds(&[0], &[remote.as_raw_fd()])
+ .expect("send()");
- self.channel
- .send(Message::Request(task.clone(), reply_tx))
- .expect("ContainerRunner task submission failed");
+ bincode::serialize_into(&local, task).expect("Task submission failed");
+ local.shutdown(net::Shutdown::Write).expect("shutdown()");
+
+ local
+ }
- reply_rx
+ pub fn result(socket: &UnixStream) -> Result<TaskOutput> {
+ bincode::deserialize_from(socket).expect("Failed to read task result")
}
}