summaryrefslogtreecommitdiffstats
path: root/crates/runner/src/lib.rs
diff options
context:
space:
mode:
authorMatthias Schiffer <mschiffer@universe-factory.net>2021-10-25 00:19:45 +0200
committerMatthias Schiffer <mschiffer@universe-factory.net>2021-10-25 00:19:45 +0200
commit34ac18d20c13a78914d447fee83204811a27b1e4 (patch)
tree56763d4ea46927105fcc6a71e03d5bd75a6947a6 /crates/runner/src/lib.rs
parenta1a185370da27f2cc3df84d3a8d7141f9ce7db16 (diff)
downloadrebel-34ac18d20c13a78914d447fee83204811a27b1e4.tar
rebel-34ac18d20c13a78914d447fee83204811a27b1e4.zip
Move runner into separate crate
Diffstat (limited to 'crates/runner/src/lib.rs')
-rw-r--r--crates/runner/src/lib.rs139
1 files changed, 139 insertions, 0 deletions
diff --git a/crates/runner/src/lib.rs b/crates/runner/src/lib.rs
new file mode 100644
index 0000000..658a833
--- /dev/null
+++ b/crates/runner/src/lib.rs
@@ -0,0 +1,139 @@
+mod init;
+mod jobserver;
+mod ns;
+pub mod paths;
+mod tar;
+mod task;
+mod util;
+
+use capctl::prctl;
+use ipc_channel::ipc;
+use nix::{
+ sched::CloneFlags,
+ sys::{signal, stat, wait},
+ unistd::{self, Gid, Uid},
+};
+use serde::{Deserialize, Serialize};
+
+use common::{error::*, types::*};
+
+use self::{
+ jobserver::Jobserver,
+ util::{clone, unix},
+};
+
+#[derive(Debug, Deserialize, Serialize)]
+enum Message {
+ Request(Task, ipc::IpcSender<Result<TaskOutput>>),
+}
+
+fn handle_request(
+ jobserver: Jobserver,
+ channel: ipc::IpcReceiver<Message>,
+ task: Task,
+ reply_channel: ipc::IpcSender<Result<TaskOutput>>,
+) -> (Jobserver, ipc::IpcReceiver<Message>) {
+ let child = |(mut jobserver, channel): (Jobserver, ipc::IpcReceiver<Message>)| {
+ drop(channel);
+ unsafe { signal::signal(signal::Signal::SIGCHLD, signal::SigHandler::SigDfl) }.unwrap();
+
+ prctl::set_name(&task.label).expect("prctl(PR_SET_NAME)");
+
+ let token = jobserver.wait();
+ let (pid, mut jobserver) = unsafe {
+ clone::spawn(None, jobserver, |jobserver| {
+ let result = task::handle(task, jobserver);
+ reply_channel.send(result).expect("IPC send() failed");
+ })
+ }
+ .expect("fork()");
+ let wait_res = wait::waitpid(pid, None);
+ jobserver.post(token);
+ wait_res.expect("waitpid()");
+ };
+
+ unsafe { clone::spawn(None, (jobserver, channel), child) }
+ .expect("fork()")
+ .1
+}
+
+fn runner_loop(mut channel: ipc::IpcReceiver<Message>) {
+ let nproc = unix::nproc().expect("Failed to get number of available CPUs");
+ let mut jobserver = Jobserver::new(nproc).expect("Failed to initialize jobserver pipe");
+
+ while let Ok(msg) = channel.recv() {
+ match msg {
+ Message::Request(task, reply_channel) => {
+ let ret = handle_request(jobserver, channel, task, reply_channel);
+ jobserver = ret.0;
+ channel = ret.1;
+ }
+ }
+ }
+}
+
+fn runner(uid: Uid, gid: Gid, channel: ipc::IpcReceiver<Message>) {
+ ns::mount_proc();
+ ns::setup_userns(Uid::from_raw(0), Gid::from_raw(0), uid, gid);
+
+ stat::umask(stat::Mode::from_bits_truncate(0o022));
+
+ init::init_runner().unwrap();
+
+ let msg_handler = unsafe {
+ clone::spawn(None, (), |()| {
+ signal::signal(signal::Signal::SIGCHLD, signal::SigHandler::SigIgn).unwrap();
+ runner_loop(channel);
+ })
+ }
+ .expect("fork()")
+ .0;
+
+ loop {
+ let status = wait::wait().expect("wait()");
+ if status.pid() == Some(msg_handler) {
+ break;
+ }
+ }
+}
+
+pub struct Runner {
+ channel: ipc::IpcSender<Message>,
+}
+
+impl Runner {
+ /// Creates a new container runner
+ ///
+ /// Unsafe: Do not call in multithreaded processes
+ pub unsafe fn new() -> Result<Self> {
+ let uid = unistd::geteuid();
+ let gid = unistd::getegid();
+
+ let (tx, rx) = ipc::channel().expect("IPC channel creation failed");
+
+ let (tx, _rx) = clone::spawn(
+ Some(CloneFlags::CLONE_NEWUSER | CloneFlags::CLONE_NEWNS | CloneFlags::CLONE_NEWPID),
+ (tx, rx),
+ |(tx, rx)| {
+ drop(tx);
+ runner(uid, gid, rx);
+ },
+ )
+ .expect("clone()")
+ .1;
+
+ Ok(Runner { channel: tx })
+ }
+}
+
+impl Runner {
+ pub fn spawn(&self, task: &Task) -> ipc::IpcReceiver<Result<TaskOutput>> {
+ let (reply_tx, reply_rx) = ipc::channel().expect("IPC channel creation failed");
+
+ self.channel
+ .send(Message::Request(task.clone(), reply_tx))
+ .expect("ContainerRunner task submission failed");
+
+ reply_rx
+ }
+}