summaryrefslogtreecommitdiffstats
path: root/crates/executor/src/executor.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/executor/src/executor.rs')
-rw-r--r--crates/executor/src/executor.rs83
1 files changed, 36 insertions, 47 deletions
diff --git a/crates/executor/src/executor.rs b/crates/executor/src/executor.rs
index 1d6ee44..ad560e9 100644
--- a/crates/executor/src/executor.rs
+++ b/crates/executor/src/executor.rs
@@ -1,7 +1,10 @@
-use std::collections::{HashMap, HashSet};
+use std::{
+ collections::{HashMap, HashSet},
+ os::unix::{net::UnixStream, prelude::*},
+};
use indoc::indoc;
-use ipc_channel::ipc;
+use nix::poll;
use common::{error::*, string_hash::*, types::*};
use runner::{paths, Runner};
@@ -15,10 +18,9 @@ use crate::{
pub struct Executor<'ctx> {
ctx: &'ctx Context,
- receiver_set: ipc::IpcReceiverSet,
tasks_blocked: HashSet<TaskRef<'ctx>>,
tasks_runnable: Vec<TaskRef<'ctx>>,
- tasks_running: HashMap<u64, TaskRef<'ctx>>,
+ tasks_running: HashMap<RawFd, (UnixStream, TaskRef<'ctx>)>,
tasks_done: HashMap<TaskRef<'ctx>, TaskOutput>,
rdeps: HashMap<TaskRef<'ctx>, Vec<TaskRef<'ctx>>>,
tpl: TemplateEngine,
@@ -28,7 +30,6 @@ impl<'ctx> Executor<'ctx> {
pub fn new(ctx: &'ctx Context, taskset: HashSet<TaskRef<'ctx>>) -> Result<Self> {
let mut exc = Executor {
ctx,
- receiver_set: ipc::IpcReceiverSet::new().expect("IpcReceiverSet::new()"),
tasks_blocked: HashSet::new(),
tasks_runnable: Vec::new(),
tasks_running: HashMap::new(),
@@ -205,11 +206,7 @@ impl<'ctx> Executor<'ctx> {
ret
}
- fn spawn_one(
- &self,
- task_ref: &TaskRef<'ctx>,
- runner: &Runner,
- ) -> Result<ipc::IpcReceiver<Result<TaskOutput>>> {
+ fn spawn_one(&self, task_ref: &TaskRef<'ctx>, runner: &Runner) -> Result<UnixStream> {
let task_def = &self.ctx[task_ref.id];
let task_deps = self.task_deps(task_ref)?;
let task_output = task_def
@@ -248,12 +245,11 @@ impl<'ctx> Executor<'ctx> {
fn run_tasks(&mut self, runner: &Runner) -> Result<()> {
while let Some(task_ref) = self.tasks_runnable.pop() {
- let channel = self.spawn_one(&task_ref, runner)?;
- let id = self
- .receiver_set
- .add(channel)
- .expect("Failed to add channel to receiver set");
- self.tasks_running.insert(id, task_ref);
+ let socket = self.spawn_one(&task_ref, runner)?;
+ assert!(self
+ .tasks_running
+ .insert(socket.as_raw_fd(), (socket, task_ref))
+ .is_none());
}
Ok(())
@@ -274,39 +270,32 @@ impl<'ctx> Executor<'ctx> {
}
fn wait_for_task(&mut self) -> Result<()> {
- let mut progress = false;
-
- while !progress {
- let events = self
- .receiver_set
- .select()
- .expect("Failed to get messages from receiver set");
- for event in events {
- match event {
- ipc::IpcSelectionResult::MessageReceived(id, msg) => {
- let task_ref = self
- .tasks_running
- .remove(&id)
- .expect("Received message for unknown task");
- let task_output = msg
- .to::<Result<TaskOutput>>()
- .expect("Failed to decode message from runner")?;
-
- self.tasks_done.insert(task_ref.clone(), task_output);
- self.update_runnable(&task_ref);
-
- progress = true;
- }
- ipc::IpcSelectionResult::ChannelClosed(id) => {
- if let Some(task) = self.tasks_running.remove(&id) {
- return Err(Error::new(format!(
- "Unexpectedly got no result for task {:#}",
- task
- )));
- }
- }
+ let mut pollfds: Box<[_]> = self
+ .tasks_running
+ .keys()
+ .copied()
+ .map(|fd| poll::PollFd::new(fd, poll::PollFlags::POLLIN))
+ .collect();
+
+ while poll::poll(&mut pollfds, -1).context("poll()")? == 0 {}
+
+ for pollfd in &*pollfds {
+ let events = pollfd.revents().expect("Unknown events in poll() return");
+ if !events.contains(poll::PollFlags::POLLIN) {
+ if events.intersects(!poll::PollFlags::POLLIN) {
+ return Err(Error::new(
+ "Unexpected error status for socket file descriptor",
+ ));
}
+ continue;
}
+
+ let fd = pollfd.as_raw_fd();
+ let (socket, task_ref) = self.tasks_running.remove(&fd).unwrap();
+
+ let task_output = Runner::result(&socket)?;
+ self.tasks_done.insert(task_ref.clone(), task_output);
+ self.update_runnable(&task_ref);
}
Ok(())