summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Schiffer <mschiffer@universe-factory.net>2021-10-21 21:34:07 +0200
committerMatthias Schiffer <mschiffer@universe-factory.net>2021-10-21 21:34:07 +0200
commit57ad0ecfd81a8c65aa21a7320dc63831c424ec0e (patch)
treeec4342988a041d68f2169a5b521660ef7fa10047
parent94e299928560b58791e02d5143530e7f534e843c (diff)
downloadrebel-57ad0ecfd81a8c65aa21a7320dc63831c424ec0e.tar
rebel-57ad0ecfd81a8c65aa21a7320dc63831c424ec0e.zip
executor: run ready tasks in parallel
-rw-r--r--src/executor.rs66
1 files changed, 58 insertions, 8 deletions
diff --git a/src/executor.rs b/src/executor.rs
index 853d2be..99a2804 100644
--- a/src/executor.rs
+++ b/src/executor.rs
@@ -1,6 +1,7 @@
use std::collections::{HashMap, HashSet};
use indoc::indoc;
+use ipc_channel::ipc;
use crate::{
context::{Context, TaskRef},
@@ -12,11 +13,12 @@ use crate::{
util::error::*,
};
-#[derive(Debug)]
pub struct Executor<'ctx> {
ctx: &'ctx Context,
+ receiver_set: ipc::IpcReceiverSet,
tasks_blocked: HashSet<TaskRef<'ctx>>,
tasks_runnable: Vec<TaskRef<'ctx>>,
+ tasks_running: HashMap<u64, TaskRef<'ctx>>,
tasks_done: HashMap<TaskRef<'ctx>, TaskOutput>,
rdeps: HashMap<TaskRef<'ctx>, Vec<TaskRef<'ctx>>>,
tpl: TemplateEngine,
@@ -26,8 +28,10 @@ impl<'ctx> Executor<'ctx> {
pub fn new(ctx: &'ctx Context, taskset: HashSet<TaskRef<'ctx>>) -> Result<Self> {
let mut exc = Executor {
ctx,
+ receiver_set: ipc::IpcReceiverSet::new().expect("IpcReceiverSet::new()"),
tasks_blocked: HashSet::new(),
tasks_runnable: Vec::new(),
+ tasks_running: HashMap::new(),
tasks_done: HashMap::new(),
rdeps: HashMap::new(),
tpl: TemplateEngine::new(),
@@ -202,11 +206,11 @@ impl<'ctx> Executor<'ctx> {
ret
}
- fn run_one(
+ fn spawn_one(
&self,
task_ref: &TaskRef<'ctx>,
runner: &impl runner::Runner,
- ) -> Result<TaskOutput> {
+ ) -> Result<ipc::IpcReceiver<Result<TaskOutput>>> {
let task_def = &self.ctx[task_ref.id];
let task_deps = self.task_deps(task_ref)?;
let task_output = task_def
@@ -244,7 +248,20 @@ impl<'ctx> Executor<'ctx> {
input,
};
- runner.spawn(&task).recv().expect("IPC recv() error")
+ Ok(runner.spawn(&task))
+ }
+
+ fn run_tasks(&mut self, runner: &impl runner::Runner) -> Result<()> {
+ while let Some(task_ref) = self.tasks_runnable.pop() {
+ let channel = self.spawn_one(&task_ref, runner)?;
+ let id = self
+ .receiver_set
+ .add(channel)
+ .expect("Failed to add channel to receiver set");
+ self.tasks_running.insert(id, task_ref);
+ }
+
+ Ok(())
}
fn update_runnable(&mut self, task_ref: &TaskRef) {
@@ -261,11 +278,44 @@ impl<'ctx> Executor<'ctx> {
}
}
+ fn wait_for_task(&mut self) -> Result<()> {
+ let mut progress = false;
+
+ while !progress {
+ let events = self
+ .receiver_set
+ .select()
+ .expect("Failed to get messages from receiver set");
+ for event in events {
+ match event {
+ ipc::IpcSelectionResult::MessageReceived(id, msg) => {
+ let task_ref = self
+ .tasks_running
+ .remove(&id)
+ .expect("Received message for unknown task");
+ let task_output = msg
+ .to::<Result<TaskOutput>>()
+ .expect("Failed to decode message from runner")?;
+
+ self.tasks_done.insert(task_ref.clone(), task_output);
+ self.update_runnable(&task_ref);
+
+ progress = true;
+ }
+ ipc::IpcSelectionResult::ChannelClosed(id) => {
+ self.tasks_running.remove(&id);
+ }
+ }
+ }
+ }
+
+ Ok(())
+ }
+
pub fn run(&mut self, runner: &impl runner::Runner) -> Result<()> {
- while let Some(task_ref) = self.tasks_runnable.pop() {
- let task_done = self.run_one(&task_ref, runner)?;
- self.tasks_done.insert(task_ref.clone(), task_done);
- self.update_runnable(&task_ref);
+ while !(self.tasks_runnable.is_empty() && self.tasks_running.is_empty()) {
+ self.run_tasks(runner)?;
+ self.wait_for_task()?;
}
assert!(self.tasks_blocked.is_empty(), "No runnable tasks left");