diff options
author | Matthias Schiffer <mschiffer@universe-factory.net> | 2021-10-21 21:34:07 +0200 |
---|---|---|
committer | Matthias Schiffer <mschiffer@universe-factory.net> | 2021-10-21 21:34:07 +0200 |
commit | 57ad0ecfd81a8c65aa21a7320dc63831c424ec0e (patch) | |
tree | ec4342988a041d68f2169a5b521660ef7fa10047 | |
parent | 94e299928560b58791e02d5143530e7f534e843c (diff) | |
download | rebel-57ad0ecfd81a8c65aa21a7320dc63831c424ec0e.tar rebel-57ad0ecfd81a8c65aa21a7320dc63831c424ec0e.zip |
executor: run ready tasks in parallel
-rw-r--r-- | src/executor.rs | 66 |
1 files changed, 58 insertions, 8 deletions
diff --git a/src/executor.rs b/src/executor.rs index 853d2be..99a2804 100644 --- a/src/executor.rs +++ b/src/executor.rs @@ -1,6 +1,7 @@ use std::collections::{HashMap, HashSet}; use indoc::indoc; +use ipc_channel::ipc; use crate::{ context::{Context, TaskRef}, @@ -12,11 +13,12 @@ use crate::{ util::error::*, }; -#[derive(Debug)] pub struct Executor<'ctx> { ctx: &'ctx Context, + receiver_set: ipc::IpcReceiverSet, tasks_blocked: HashSet<TaskRef<'ctx>>, tasks_runnable: Vec<TaskRef<'ctx>>, + tasks_running: HashMap<u64, TaskRef<'ctx>>, tasks_done: HashMap<TaskRef<'ctx>, TaskOutput>, rdeps: HashMap<TaskRef<'ctx>, Vec<TaskRef<'ctx>>>, tpl: TemplateEngine, @@ -26,8 +28,10 @@ impl<'ctx> Executor<'ctx> { pub fn new(ctx: &'ctx Context, taskset: HashSet<TaskRef<'ctx>>) -> Result<Self> { let mut exc = Executor { ctx, + receiver_set: ipc::IpcReceiverSet::new().expect("IpcReceiverSet::new()"), tasks_blocked: HashSet::new(), tasks_runnable: Vec::new(), + tasks_running: HashMap::new(), tasks_done: HashMap::new(), rdeps: HashMap::new(), tpl: TemplateEngine::new(), @@ -202,11 +206,11 @@ impl<'ctx> Executor<'ctx> { ret } - fn run_one( + fn spawn_one( &self, task_ref: &TaskRef<'ctx>, runner: &impl runner::Runner, - ) -> Result<TaskOutput> { + ) -> Result<ipc::IpcReceiver<Result<TaskOutput>>> { let task_def = &self.ctx[task_ref.id]; let task_deps = self.task_deps(task_ref)?; let task_output = task_def @@ -244,7 +248,20 @@ impl<'ctx> Executor<'ctx> { input, }; - runner.spawn(&task).recv().expect("IPC recv() error") + Ok(runner.spawn(&task)) + } + + fn run_tasks(&mut self, runner: &impl runner::Runner) -> Result<()> { + while let Some(task_ref) = self.tasks_runnable.pop() { + let channel = self.spawn_one(&task_ref, runner)?; + let id = self + .receiver_set + .add(channel) + .expect("Failed to add channel to receiver set"); + self.tasks_running.insert(id, task_ref); + } + + Ok(()) } fn update_runnable(&mut self, task_ref: &TaskRef) { @@ -261,11 +278,44 @@ impl<'ctx> Executor<'ctx> { } } + fn wait_for_task(&mut self) -> Result<()> { + let mut progress = false; + + while !progress { + let events = self + .receiver_set + .select() + .expect("Failed to get messages from receiver set"); + for event in events { + match event { + ipc::IpcSelectionResult::MessageReceived(id, msg) => { + let task_ref = self + .tasks_running + .remove(&id) + .expect("Received message for unknown task"); + let task_output = msg + .to::<Result<TaskOutput>>() + .expect("Failed to decode message from runner")?; + + self.tasks_done.insert(task_ref.clone(), task_output); + self.update_runnable(&task_ref); + + progress = true; + } + ipc::IpcSelectionResult::ChannelClosed(id) => { + self.tasks_running.remove(&id); + } + } + } + } + + Ok(()) + } + pub fn run(&mut self, runner: &impl runner::Runner) -> Result<()> { - while let Some(task_ref) = self.tasks_runnable.pop() { - let task_done = self.run_one(&task_ref, runner)?; - self.tasks_done.insert(task_ref.clone(), task_done); - self.update_runnable(&task_ref); + while !(self.tasks_runnable.is_empty() && self.tasks_running.is_empty()) { + self.run_tasks(runner)?; + self.wait_for_task()?; } assert!(self.tasks_blocked.is_empty(), "No runnable tasks left"); |