diff options
Diffstat (limited to 'crates/executor/src/executor.rs')
-rw-r--r-- | crates/executor/src/executor.rs | 83 |
1 files changed, 36 insertions, 47 deletions
diff --git a/crates/executor/src/executor.rs b/crates/executor/src/executor.rs index 1d6ee44..ad560e9 100644 --- a/crates/executor/src/executor.rs +++ b/crates/executor/src/executor.rs @@ -1,7 +1,10 @@ -use std::collections::{HashMap, HashSet}; +use std::{ + collections::{HashMap, HashSet}, + os::unix::{net::UnixStream, prelude::*}, +}; use indoc::indoc; -use ipc_channel::ipc; +use nix::poll; use common::{error::*, string_hash::*, types::*}; use runner::{paths, Runner}; @@ -15,10 +18,9 @@ use crate::{ pub struct Executor<'ctx> { ctx: &'ctx Context, - receiver_set: ipc::IpcReceiverSet, tasks_blocked: HashSet<TaskRef<'ctx>>, tasks_runnable: Vec<TaskRef<'ctx>>, - tasks_running: HashMap<u64, TaskRef<'ctx>>, + tasks_running: HashMap<RawFd, (UnixStream, TaskRef<'ctx>)>, tasks_done: HashMap<TaskRef<'ctx>, TaskOutput>, rdeps: HashMap<TaskRef<'ctx>, Vec<TaskRef<'ctx>>>, tpl: TemplateEngine, @@ -28,7 +30,6 @@ impl<'ctx> Executor<'ctx> { pub fn new(ctx: &'ctx Context, taskset: HashSet<TaskRef<'ctx>>) -> Result<Self> { let mut exc = Executor { ctx, - receiver_set: ipc::IpcReceiverSet::new().expect("IpcReceiverSet::new()"), tasks_blocked: HashSet::new(), tasks_runnable: Vec::new(), tasks_running: HashMap::new(), @@ -205,11 +206,7 @@ impl<'ctx> Executor<'ctx> { ret } - fn spawn_one( - &self, - task_ref: &TaskRef<'ctx>, - runner: &Runner, - ) -> Result<ipc::IpcReceiver<Result<TaskOutput>>> { + fn spawn_one(&self, task_ref: &TaskRef<'ctx>, runner: &Runner) -> Result<UnixStream> { let task_def = &self.ctx[task_ref.id]; let task_deps = self.task_deps(task_ref)?; let task_output = task_def @@ -248,12 +245,11 @@ impl<'ctx> Executor<'ctx> { fn run_tasks(&mut self, runner: &Runner) -> Result<()> { while let Some(task_ref) = self.tasks_runnable.pop() { - let channel = self.spawn_one(&task_ref, runner)?; - let id = self - .receiver_set - .add(channel) - .expect("Failed to add channel to receiver set"); - self.tasks_running.insert(id, task_ref); + let socket = self.spawn_one(&task_ref, runner)?; + assert!(self + .tasks_running + .insert(socket.as_raw_fd(), (socket, task_ref)) + .is_none()); } Ok(()) @@ -274,39 +270,32 @@ impl<'ctx> Executor<'ctx> { } fn wait_for_task(&mut self) -> Result<()> { - let mut progress = false; - - while !progress { - let events = self - .receiver_set - .select() - .expect("Failed to get messages from receiver set"); - for event in events { - match event { - ipc::IpcSelectionResult::MessageReceived(id, msg) => { - let task_ref = self - .tasks_running - .remove(&id) - .expect("Received message for unknown task"); - let task_output = msg - .to::<Result<TaskOutput>>() - .expect("Failed to decode message from runner")?; - - self.tasks_done.insert(task_ref.clone(), task_output); - self.update_runnable(&task_ref); - - progress = true; - } - ipc::IpcSelectionResult::ChannelClosed(id) => { - if let Some(task) = self.tasks_running.remove(&id) { - return Err(Error::new(format!( - "Unexpectedly got no result for task {:#}", - task - ))); - } - } + let mut pollfds: Box<[_]> = self + .tasks_running + .keys() + .copied() + .map(|fd| poll::PollFd::new(fd, poll::PollFlags::POLLIN)) + .collect(); + + while poll::poll(&mut pollfds, -1).context("poll()")? == 0 {} + + for pollfd in &*pollfds { + let events = pollfd.revents().expect("Unknown events in poll() return"); + if !events.contains(poll::PollFlags::POLLIN) { + if events.intersects(!poll::PollFlags::POLLIN) { + return Err(Error::new( + "Unexpected error status for socket file descriptor", + )); } + continue; } + + let fd = pollfd.as_raw_fd(); + let (socket, task_ref) = self.tasks_running.remove(&fd).unwrap(); + + let task_output = Runner::result(&socket)?; + self.tasks_done.insert(task_ref.clone(), task_output); + self.update_runnable(&task_ref); } Ok(()) |