diff options
author | Matthias Schiffer <mschiffer@universe-factory.net> | 2021-10-25 00:19:45 +0200 |
---|---|---|
committer | Matthias Schiffer <mschiffer@universe-factory.net> | 2021-10-25 00:19:45 +0200 |
commit | 34ac18d20c13a78914d447fee83204811a27b1e4 (patch) | |
tree | 56763d4ea46927105fcc6a71e03d5bd75a6947a6 /crates/runner/src/lib.rs | |
parent | a1a185370da27f2cc3df84d3a8d7141f9ce7db16 (diff) | |
download | rebel-34ac18d20c13a78914d447fee83204811a27b1e4.tar rebel-34ac18d20c13a78914d447fee83204811a27b1e4.zip |
Move runner into separate crate
Diffstat (limited to 'crates/runner/src/lib.rs')
-rw-r--r-- | crates/runner/src/lib.rs | 139 |
1 files changed, 139 insertions, 0 deletions
diff --git a/crates/runner/src/lib.rs b/crates/runner/src/lib.rs new file mode 100644 index 0000000..658a833 --- /dev/null +++ b/crates/runner/src/lib.rs @@ -0,0 +1,139 @@ +mod init; +mod jobserver; +mod ns; +pub mod paths; +mod tar; +mod task; +mod util; + +use capctl::prctl; +use ipc_channel::ipc; +use nix::{ + sched::CloneFlags, + sys::{signal, stat, wait}, + unistd::{self, Gid, Uid}, +}; +use serde::{Deserialize, Serialize}; + +use common::{error::*, types::*}; + +use self::{ + jobserver::Jobserver, + util::{clone, unix}, +}; + +#[derive(Debug, Deserialize, Serialize)] +enum Message { + Request(Task, ipc::IpcSender<Result<TaskOutput>>), +} + +fn handle_request( + jobserver: Jobserver, + channel: ipc::IpcReceiver<Message>, + task: Task, + reply_channel: ipc::IpcSender<Result<TaskOutput>>, +) -> (Jobserver, ipc::IpcReceiver<Message>) { + let child = |(mut jobserver, channel): (Jobserver, ipc::IpcReceiver<Message>)| { + drop(channel); + unsafe { signal::signal(signal::Signal::SIGCHLD, signal::SigHandler::SigDfl) }.unwrap(); + + prctl::set_name(&task.label).expect("prctl(PR_SET_NAME)"); + + let token = jobserver.wait(); + let (pid, mut jobserver) = unsafe { + clone::spawn(None, jobserver, |jobserver| { + let result = task::handle(task, jobserver); + reply_channel.send(result).expect("IPC send() failed"); + }) + } + .expect("fork()"); + let wait_res = wait::waitpid(pid, None); + jobserver.post(token); + wait_res.expect("waitpid()"); + }; + + unsafe { clone::spawn(None, (jobserver, channel), child) } + .expect("fork()") + .1 +} + +fn runner_loop(mut channel: ipc::IpcReceiver<Message>) { + let nproc = unix::nproc().expect("Failed to get number of available CPUs"); + let mut jobserver = Jobserver::new(nproc).expect("Failed to initialize jobserver pipe"); + + while let Ok(msg) = channel.recv() { + match msg { + Message::Request(task, reply_channel) => { + let ret = handle_request(jobserver, channel, task, reply_channel); + jobserver = ret.0; + channel = ret.1; + } + } + } +} + +fn runner(uid: Uid, gid: Gid, channel: ipc::IpcReceiver<Message>) { + ns::mount_proc(); + ns::setup_userns(Uid::from_raw(0), Gid::from_raw(0), uid, gid); + + stat::umask(stat::Mode::from_bits_truncate(0o022)); + + init::init_runner().unwrap(); + + let msg_handler = unsafe { + clone::spawn(None, (), |()| { + signal::signal(signal::Signal::SIGCHLD, signal::SigHandler::SigIgn).unwrap(); + runner_loop(channel); + }) + } + .expect("fork()") + .0; + + loop { + let status = wait::wait().expect("wait()"); + if status.pid() == Some(msg_handler) { + break; + } + } +} + +pub struct Runner { + channel: ipc::IpcSender<Message>, +} + +impl Runner { + /// Creates a new container runner + /// + /// Unsafe: Do not call in multithreaded processes + pub unsafe fn new() -> Result<Self> { + let uid = unistd::geteuid(); + let gid = unistd::getegid(); + + let (tx, rx) = ipc::channel().expect("IPC channel creation failed"); + + let (tx, _rx) = clone::spawn( + Some(CloneFlags::CLONE_NEWUSER | CloneFlags::CLONE_NEWNS | CloneFlags::CLONE_NEWPID), + (tx, rx), + |(tx, rx)| { + drop(tx); + runner(uid, gid, rx); + }, + ) + .expect("clone()") + .1; + + Ok(Runner { channel: tx }) + } +} + +impl Runner { + pub fn spawn(&self, task: &Task) -> ipc::IpcReceiver<Result<TaskOutput>> { + let (reply_tx, reply_rx) = ipc::channel().expect("IPC channel creation failed"); + + self.channel + .send(Message::Request(task.clone(), reply_tx)) + .expect("ContainerRunner task submission failed"); + + reply_rx + } +} |