From e9bf0fc40c0eb7e9d4228b804d62f31b0a136528 Mon Sep 17 00:00:00 2001 From: Matthias Schiffer Date: Sat, 20 Apr 2024 14:28:05 +0200 Subject: Rename directories to match crate names --- crates/common/Cargo.toml | 12 - crates/common/src/error.rs | 119 ------ crates/common/src/lib.rs | 3 - crates/common/src/string_hash.rs | 53 --- crates/common/src/types.rs | 54 --- crates/driver/Cargo.toml | 25 -- crates/driver/src/args.rs | 122 ------ crates/driver/src/context.rs | 533 ------------------------- crates/driver/src/driver.rs | 480 ---------------------- crates/driver/src/main.rs | 79 ---- crates/driver/src/parse.rs | 72 ---- crates/driver/src/paths.rs | 4 - crates/driver/src/pin.rs | 39 -- crates/driver/src/recipe.rs | 188 --------- crates/driver/src/resolve.rs | 334 ---------------- crates/driver/src/task.rs | 96 ----- crates/driver/src/template.rs | 42 -- crates/rebel-common/Cargo.toml | 12 + crates/rebel-common/src/error.rs | 119 ++++++ crates/rebel-common/src/lib.rs | 3 + crates/rebel-common/src/string_hash.rs | 53 +++ crates/rebel-common/src/types.rs | 54 +++ crates/rebel-runner/Cargo.toml | 25 ++ crates/rebel-runner/src/init.rs | 68 ++++ crates/rebel-runner/src/jobserver.rs | 79 ++++ crates/rebel-runner/src/lib.rs | 217 ++++++++++ crates/rebel-runner/src/ns.rs | 84 ++++ crates/rebel-runner/src/paths.rs | 118 ++++++ crates/rebel-runner/src/tar.rs | 105 +++++ crates/rebel-runner/src/task.rs | 638 ++++++++++++++++++++++++++++++ crates/rebel-runner/src/util/checkable.rs | 37 ++ crates/rebel-runner/src/util/cjson.rs | 37 ++ crates/rebel-runner/src/util/clone.rs | 59 +++ crates/rebel-runner/src/util/fs.rs | 127 ++++++ crates/rebel-runner/src/util/mod.rs | 7 + crates/rebel-runner/src/util/stack.rs | 25 ++ crates/rebel-runner/src/util/steal.rs | 40 ++ crates/rebel-runner/src/util/unix.rs | 86 ++++ crates/rebel/Cargo.toml | 25 ++ crates/rebel/src/args.rs | 122 ++++++ crates/rebel/src/context.rs | 533 +++++++++++++++++++++++++ crates/rebel/src/driver.rs | 480 ++++++++++++++++++++++ crates/rebel/src/main.rs | 79 ++++ crates/rebel/src/parse.rs | 72 ++++ crates/rebel/src/paths.rs | 4 + crates/rebel/src/pin.rs | 39 ++ crates/rebel/src/recipe.rs | 188 +++++++++ crates/rebel/src/resolve.rs | 334 ++++++++++++++++ crates/rebel/src/task.rs | 96 +++++ crates/rebel/src/template.rs | 42 ++ crates/runner/Cargo.toml | 25 -- crates/runner/src/init.rs | 68 ---- crates/runner/src/jobserver.rs | 79 ---- crates/runner/src/lib.rs | 217 ---------- crates/runner/src/ns.rs | 84 ---- crates/runner/src/paths.rs | 118 ------ crates/runner/src/tar.rs | 105 ----- crates/runner/src/task.rs | 638 ------------------------------ crates/runner/src/util/checkable.rs | 37 -- crates/runner/src/util/cjson.rs | 37 -- crates/runner/src/util/clone.rs | 59 --- crates/runner/src/util/fs.rs | 127 ------ crates/runner/src/util/mod.rs | 7 - crates/runner/src/util/stack.rs | 25 -- crates/runner/src/util/steal.rs | 40 -- crates/runner/src/util/unix.rs | 86 ---- 66 files changed, 4007 insertions(+), 4007 deletions(-) delete mode 100644 crates/common/Cargo.toml delete mode 100644 crates/common/src/error.rs delete mode 100644 crates/common/src/lib.rs delete mode 100644 crates/common/src/string_hash.rs delete mode 100644 crates/common/src/types.rs delete mode 100644 crates/driver/Cargo.toml delete mode 100644 crates/driver/src/args.rs delete mode 100644 crates/driver/src/context.rs delete mode 100644 crates/driver/src/driver.rs delete mode 100644 crates/driver/src/main.rs delete mode 100644 crates/driver/src/parse.rs delete mode 100644 crates/driver/src/paths.rs delete mode 100644 crates/driver/src/pin.rs delete mode 100644 crates/driver/src/recipe.rs delete mode 100644 crates/driver/src/resolve.rs delete mode 100644 crates/driver/src/task.rs delete mode 100644 crates/driver/src/template.rs create mode 100644 crates/rebel-common/Cargo.toml create mode 100644 crates/rebel-common/src/error.rs create mode 100644 crates/rebel-common/src/lib.rs create mode 100644 crates/rebel-common/src/string_hash.rs create mode 100644 crates/rebel-common/src/types.rs create mode 100644 crates/rebel-runner/Cargo.toml create mode 100644 crates/rebel-runner/src/init.rs create mode 100644 crates/rebel-runner/src/jobserver.rs create mode 100644 crates/rebel-runner/src/lib.rs create mode 100644 crates/rebel-runner/src/ns.rs create mode 100644 crates/rebel-runner/src/paths.rs create mode 100644 crates/rebel-runner/src/tar.rs create mode 100644 crates/rebel-runner/src/task.rs create mode 100644 crates/rebel-runner/src/util/checkable.rs create mode 100644 crates/rebel-runner/src/util/cjson.rs create mode 100644 crates/rebel-runner/src/util/clone.rs create mode 100644 crates/rebel-runner/src/util/fs.rs create mode 100644 crates/rebel-runner/src/util/mod.rs create mode 100644 crates/rebel-runner/src/util/stack.rs create mode 100644 crates/rebel-runner/src/util/steal.rs create mode 100644 crates/rebel-runner/src/util/unix.rs create mode 100644 crates/rebel/Cargo.toml create mode 100644 crates/rebel/src/args.rs create mode 100644 crates/rebel/src/context.rs create mode 100644 crates/rebel/src/driver.rs create mode 100644 crates/rebel/src/main.rs create mode 100644 crates/rebel/src/parse.rs create mode 100644 crates/rebel/src/paths.rs create mode 100644 crates/rebel/src/pin.rs create mode 100644 crates/rebel/src/recipe.rs create mode 100644 crates/rebel/src/resolve.rs create mode 100644 crates/rebel/src/task.rs create mode 100644 crates/rebel/src/template.rs delete mode 100644 crates/runner/Cargo.toml delete mode 100644 crates/runner/src/init.rs delete mode 100644 crates/runner/src/jobserver.rs delete mode 100644 crates/runner/src/lib.rs delete mode 100644 crates/runner/src/ns.rs delete mode 100644 crates/runner/src/paths.rs delete mode 100644 crates/runner/src/tar.rs delete mode 100644 crates/runner/src/task.rs delete mode 100644 crates/runner/src/util/checkable.rs delete mode 100644 crates/runner/src/util/cjson.rs delete mode 100644 crates/runner/src/util/clone.rs delete mode 100644 crates/runner/src/util/fs.rs delete mode 100644 crates/runner/src/util/mod.rs delete mode 100644 crates/runner/src/util/stack.rs delete mode 100644 crates/runner/src/util/steal.rs delete mode 100644 crates/runner/src/util/unix.rs diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml deleted file mode 100644 index 954ebe5..0000000 --- a/crates/common/Cargo.toml +++ /dev/null @@ -1,12 +0,0 @@ -[package] -name = "rebel-common" -version = "0.1.0" -authors = ["Matthias Schiffer "] -license = "MIT" -edition = "2021" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -hex = { version = "0.4.3", features = ["std", "serde"] } -serde = { version = "1", features = ["derive"] } diff --git a/crates/common/src/error.rs b/crates/common/src/error.rs deleted file mode 100644 index ba25af4..0000000 --- a/crates/common/src/error.rs +++ /dev/null @@ -1,119 +0,0 @@ -//! Serializable errors with context - -use std::{error::Error as _, fmt::Display, io, result}; - -use serde::{Deserialize, Serialize}; - -pub trait Contextualizable: Sized { - type Output; - - fn with_context C>(self, f: F) -> Self::Output; - - fn context(self, c: C) -> Self::Output { - self.with_context(|| c) - } -} - -#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)] -pub enum ErrorCause { - Code(i32), - String(String), -} - -impl Display for ErrorCause { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - ErrorCause::Code(code) => io::Error::from_raw_os_error(*code).fmt(f), - ErrorCause::String(string) => f.write_str(string), - } - } -} - -#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)] -pub struct Error { - pub cause: ErrorCause, - pub context: Vec, -} - -impl Error { - pub fn new(cause: D) -> Self { - Error { - cause: ErrorCause::String(cause.to_string()), - context: Vec::new(), - } - } - - pub fn from_io(err: &io::Error) -> Self { - if let Some(source) = err - .source() - .and_then(|source| source.downcast_ref::()) - { - return Error::from_io(source).context(err.to_string()); - } - - let cause = match err.raw_os_error() { - Some(code) => ErrorCause::Code(code), - None => ErrorCause::String(err.to_string()), - }; - Error { - cause, - context: vec![], - } - } -} - -impl Display for Error { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_str("Error: ")?; - - let mut it = self.context.iter().rev(); - if let Some(ctx) = it.next() { - write!(f, "{}\n\nCaused by:\n ", ctx)?; - - for ctx in it { - write!(f, "{}\n ", ctx)?; - } - } - - self.cause.fmt(f) - } -} - -impl Contextualizable for Error { - type Output = Error; - fn with_context C>(self, f: F) -> Self::Output { - let Error { cause, mut context } = self; - context.push(f().to_string()); - Error { cause, context } - } -} - -impl From for Error -where - E: Into, -{ - fn from(err: E) -> Self { - Error::from_io(&err.into()) - } -} - -pub type Result = result::Result; - -impl Contextualizable for result::Result -where - E: Into, -{ - type Output = Result; - - fn with_context C>(self, f: F) -> Self::Output { - self.map_err(|err| err.into().with_context(f)) - } -} - -impl Contextualizable for Option { - type Output = Result; - - fn with_context C>(self, f: F) -> Self::Output { - self.ok_or_else(|| Error::new(f())) - } -} diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs deleted file mode 100644 index 8d630dd..0000000 --- a/crates/common/src/lib.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub mod error; -pub mod string_hash; -pub mod types; diff --git a/crates/common/src/string_hash.rs b/crates/common/src/string_hash.rs deleted file mode 100644 index a2b00db..0000000 --- a/crates/common/src/string_hash.rs +++ /dev/null @@ -1,53 +0,0 @@ -use std::fmt::Display; - -use serde::{Deserialize, Serialize}; - -#[derive(Clone, Copy, PartialEq, Eq, Hash)] -pub struct StringHash(pub [u8; 32]); - -impl Display for StringHash { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_str(&hex::encode(self.0)) - } -} - -impl std::fmt::Debug for StringHash { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "\"{}\"", self) - } -} - -impl Serialize for StringHash { - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - hex::serialize(self.0, serializer) - } -} -impl<'de> Deserialize<'de> for StringHash { - fn deserialize(deserializer: D) -> Result - where - D: serde::Deserializer<'de>, - { - Ok(StringHash(hex::deserialize(deserializer)?)) - } -} - -macro_rules! stringhash_newtype { - ($id:ident) => { - #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Deserialize, Serialize)] - pub struct $id(pub StringHash); - - impl Display for $id { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.0.fmt(f) - } - } - }; -} - -stringhash_newtype!(InputHash); -stringhash_newtype!(DependencyHash); -stringhash_newtype!(LayerHash); -stringhash_newtype!(ArchiveHash); diff --git a/crates/common/src/types.rs b/crates/common/src/types.rs deleted file mode 100644 index 2a06275..0000000 --- a/crates/common/src/types.rs +++ /dev/null @@ -1,54 +0,0 @@ -use std::{ - collections::{HashMap, HashSet}, - fmt::Display, -}; - -use serde::{Deserialize, Serialize}; - -use crate::string_hash::*; - -#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq, Hash)] -pub struct TaskID { - pub recipe: String, - pub task: String, -} - -impl Display for TaskID { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}::{}", self.recipe, self.task) - } -} - -#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq, Hash)] -#[serde(rename_all = "snake_case")] -pub enum Dependency { - Fetch { - name: String, - target_dir: String, - sha256: StringHash, - }, - Task { - output: ArchiveHash, - path: String, - }, -} - -#[derive(Clone, Debug, Deserialize, Serialize)] -pub struct Task { - pub label: String, - pub command: String, - pub workdir: String, - pub rootfs: ArchiveHash, - pub ancestors: Vec, - pub depends: HashSet, - pub outputs: HashMap, - pub pins: HashMap, - pub force_run: bool, -} - -#[derive(Clone, Debug, Deserialize, Serialize, Default)] -pub struct TaskOutput { - pub input_hash: Option, - pub layer: Option, - pub outputs: HashMap, -} diff --git a/crates/driver/Cargo.toml b/crates/driver/Cargo.toml deleted file mode 100644 index df1fb19..0000000 --- a/crates/driver/Cargo.toml +++ /dev/null @@ -1,25 +0,0 @@ -[package] -name = "rebel" -version = "0.1.0" -authors = ["Matthias Schiffer "] -license = "MIT" -edition = "2021" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -common = { path = "../common", package = "rebel-common" } -runner = { path = "../runner", package = "rebel-runner" } - -clap = { version = "4.0.0", features = ["derive"] } -deb-version = "0.1.1" -enum-kinds = "0.5.1" -handlebars = "5.1.2" -indoc = "2.0.4" -lazy_static = "1.4.0" -nix = { version = "0.28.0", features = ["poll", "signal"] } -scoped-tls-hkt = "0.1.2" -serde = { version = "1", features = ["derive", "rc"] } -serde_yaml = "0.9" -walkdir = "2" -peg = "0.8.2" diff --git a/crates/driver/src/args.rs b/crates/driver/src/args.rs deleted file mode 100644 index 805646a..0000000 --- a/crates/driver/src/args.rs +++ /dev/null @@ -1,122 +0,0 @@ -use std::{ - collections::{hash_map, HashMap}, - hash, - rc::Rc, -}; - -use enum_kinds::EnumKind; -use serde::{Deserialize, Serialize}; - -#[derive(Debug, Serialize, PartialEq, Eq)] -pub struct Platform { - #[serde(skip)] - pub short: String, - pub gnu_triplet: String, - pub karch: String, - pub prefix: String, -} - -#[derive(Debug, Serialize, PartialEq, Eq)] -pub struct PlatformRelation { - pub is_same: bool, - pub sysroot: String, - pub cross_compile: String, -} - -#[derive(Clone, Debug, Serialize, PartialEq, Eq, EnumKind)] -#[serde(untagged)] -#[enum_kind(ArgType, derive(Deserialize), serde(rename_all = "snake_case"))] -pub enum Arg { - String(Rc), - Platform(Rc), - PlatformRelation(Rc), -} - -impl From<&Arg> for Arg { - fn from(value: &Arg) -> Self { - value.clone() - } -} - -impl From for Arg { - fn from(value: String) -> Self { - Arg::String(Rc::new(value)) - } -} - -impl From for Arg { - fn from(value: Platform) -> Self { - Arg::Platform(Rc::new(value)) - } -} - -impl From for Arg { - fn from(value: PlatformRelation) -> Self { - Arg::PlatformRelation(Rc::new(value)) - } -} - -#[derive(Clone, Debug, Serialize, PartialEq, Eq, Default)] -pub struct TaskArgs(HashMap); - -impl TaskArgs { - pub fn contains_key(&self, key: &str) -> bool { - self.0.contains_key(key) - } - - pub fn get(&self, key: &str) -> Option<&Arg> { - self.0.get(key) - } - - pub fn set(&mut self, key: &str, value: Option) - where - T: Into, - { - if let Some(v) = value { - self.0.insert(key.to_string(), v.into()); - } else { - self.0.remove(key); - } - } - - pub fn iter(&self) -> hash_map::Iter { - self.into_iter() - } -} - -impl FromIterator<(String, Arg)> for TaskArgs { - fn from_iter>(iter: T) -> Self { - TaskArgs(HashMap::from_iter(iter)) - } -} - -impl<'a> IntoIterator for &'a TaskArgs { - type Item = (&'a String, &'a Arg); - - type IntoIter = hash_map::Iter<'a, String, Arg>; - - fn into_iter(self) -> Self::IntoIter { - self.0.iter() - } -} - -#[allow(clippy::derived_hash_with_manual_eq)] -impl hash::Hash for TaskArgs { - fn hash(&self, _state: &mut H) { - // Don't do anything: Properly hashing the task args is likely to cost - // much more performance than the hash collisions caused by TaskRefs - // that only differ by the args - } -} - -pub fn arg>(key: &str, value: A) -> (String, Arg) { - (key.to_string(), value.into()) -} - -#[derive(Clone, Debug, Deserialize, Default, PartialEq, Eq)] -pub struct ArgMapping(pub HashMap); - -#[allow(clippy::derived_hash_with_manual_eq)] -impl hash::Hash for ArgMapping { - fn hash(&self, _state: &mut H) {} -} diff --git a/crates/driver/src/context.rs b/crates/driver/src/context.rs deleted file mode 100644 index be98813..0000000 --- a/crates/driver/src/context.rs +++ /dev/null @@ -1,533 +0,0 @@ -use std::{ - borrow::Cow, - cmp::Ordering, - collections::{HashMap, HashSet}, - fmt::Display, - hash::Hash, - ops::Index, - rc::Rc, - result, -}; - -use common::{ - error::{self, Contextualizable}, - string_hash::ArchiveHash, - types::TaskID, -}; - -use crate::{ - args::*, - parse::{self, TaskFlags}, - paths, - pin::{self, Pins}, - task::*, -}; - -#[derive(Debug, Clone, Copy)] -pub enum ErrorKind<'a> { - TaskNotFound, - InvalidArgument(&'a str), - InvalidArgRef(&'a str), -} - -#[derive(Debug, Clone, Copy)] -pub struct Error<'a> { - pub task: &'a TaskID, - pub kind: ErrorKind<'a>, -} - -impl<'a> Display for Error<'a> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let Error { task, kind } = self; - match kind { - ErrorKind::TaskNotFound => write!(f, "Task '{}' not found", task), - ErrorKind::InvalidArgument(arg) => write!( - f, - "Invalid or missing argument '{}' for task '{}'", - arg, task - ), - ErrorKind::InvalidArgRef(arg) => write!( - f, - "Invalid reference for argument '{}' of task '{}'", - arg, task - ), - } - } -} - -impl<'a> From> for error::Error { - fn from(err: Error) -> Self { - error::Error::new(err) - } -} - -pub type Result<'a, T> = result::Result>; - -#[derive(Clone, Debug, PartialEq, Eq, Hash)] -pub struct TaskRef<'ctx> { - pub id: &'ctx TaskID, - pub args: Rc, -} - -impl<'ctx> Display for TaskRef<'ctx> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - if !f.alternate() { - return self.id.fmt(f); - } - - let version_arg = match self.args.get("version") { - Some(Arg::String(s)) => Some(s), - _ => None, - }; - let host_arg = match self.args.get("host") { - Some(Arg::Platform(platform)) => Some(platform), - _ => None, - }; - let target_arg = match self.args.get("target") { - Some(Arg::Platform(platform)) => Some(platform), - _ => None, - }; - - write!(f, "{}", self.id.recipe)?; - if let Some(version) = version_arg { - write!(f, "#{}", version)?; - } - write!(f, "::{}", self.id.task)?; - - if host_arg.is_some() || target_arg.is_some() { - write!(f, "@")?; - } - - if let Some(host) = host_arg { - write!(f, "{}", host.short)?; - } - if let Some(target) = target_arg { - write!(f, ":{}", target.short)?; - } - Ok(()) - } -} - -#[derive(Clone, Debug, PartialEq, Eq, Hash)] -pub struct OutputRef<'ctx> { - pub task: TaskRef<'ctx>, - pub output: &'ctx str, -} - -fn platform_relation(args: &TaskArgs, from: &str, to: &str) -> Option { - let plat_from = match args.get(from)? { - Arg::Platform(plat) => plat, - _ => return None, - }; - let plat_to = match args.get(to)? { - Arg::Platform(plat) => plat, - _ => return None, - }; - - let plat_rel = if plat_from == plat_to { - PlatformRelation { - is_same: true, - sysroot: "".to_string(), - cross_compile: "".to_string(), - } - } else { - PlatformRelation { - is_same: false, - sysroot: paths::TASK_SYSROOT.to_string(), - cross_compile: format!("{}/bin/{}-", plat_from.prefix, plat_to.gnu_triplet), - } - }; - Some(plat_rel) -} - -#[derive(Debug)] -pub struct Context { - platforms: HashMap, - globals: TaskArgs, - tasks: HashMap>, - rootfs: (ArchiveHash, String), -} - -impl Context { - pub fn new(mut tasks: HashMap>, pins: Pins) -> error::Result { - let platforms: HashMap<_, _> = [ - arg( - "build", - Platform { - short: "build".to_string(), - gnu_triplet: "x86_64-linux-gnu".to_string(), - karch: "x86_64".to_string(), - prefix: "/opt/toolchain".to_string(), - }, - ), - arg( - "aarch64", - Platform { - short: "aarch64".to_string(), - gnu_triplet: "aarch64-linux-gnu".to_string(), - karch: "arm64".to_string(), - prefix: "/usr".to_string(), - }, - ), - ] - .into_iter() - .collect(); - - let globals = TaskArgs::from_iter([ - ("build".to_string(), platforms["build"].clone()), - arg("workdir", paths::TASK_WORKDIR.to_string()), - arg("dldir", paths::TASK_DLDIR.to_string()), - arg("destdir", paths::TASK_DESTDIR.to_string()), - arg("sysroot", paths::TASK_SYSROOT.to_string()), - ]); - let (rootfs, rootfs_provides) = - Context::handle_pins(pins).context("Failed to process pin list")?; - - Context::add_rootfs_tasks(&mut tasks, rootfs_provides, &globals) - .context("Failed to determine rootfs-provided tasks from pin list")?; - - Ok(Context { - platforms, - globals, - tasks, - rootfs, - }) - } - - fn handle_pins(pins: Pins) -> error::Result<((ArchiveHash, String), Vec)> { - let mut ret = None; - - for (name, pin) in pins { - if pin.is_rootfs { - if ret.is_some() { - return Err(error::Error::new("Multiple is-rootfs pins")); - } - let hash = pin.hash.context("is-rootfs pin without hash")?; - - ret = Some(((hash, name), pin.provides)); - } - } - - ret.context("No is-rootfs pins") - } - - fn add_rootfs_tasks( - tasks: &mut HashMap>, - provides: Vec, - globals: &TaskArgs, - ) -> error::Result<()> { - let build = globals.get("build").unwrap(); - - for pin::Provides { - recipe, - task, - output, - args, - } in provides - { - let mut task_def = TaskDef::default(); - - if let Some(host) = args.host { - if host != "build" { - return Err(error::Error::new(format!("Invalid host value '{}'", host))); - } - task_def.args.insert("host".to_string(), build.into()); - task_def.arg_match.set("host", Some(build)); - } - - if let Some(target) = args.target { - if target != "build" { - return Err(error::Error::new(format!( - "Invalid target value '{}'", - target - ))); - } - task_def.args.insert("target".to_string(), build.into()); - task_def.arg_match.set("target", Some(build)); - } - - for output_entry in output { - task_def - .output - .insert(output_entry.to_string(), Output::default()); - } - - task_def.priority = i32::MAX; - - tasks - .entry(TaskID { - recipe: recipe.to_string(), - task: task.to_string(), - }) - .or_default() - .push(task_def); - } - - Ok(()) - } - - pub fn get_rootfs(&self) -> &(ArchiveHash, String) { - &self.rootfs - } - - fn match_task(task: &TaskDef, args: &TaskArgs) -> bool { - task.arg_match - .iter() - .all(|(key, value)| args.get(key) == Some(value)) - } - - fn compare_tasks(task1: &TaskDef, task2: &TaskDef) -> Ordering { - task1 - .priority - .cmp(&task2.priority) - .then(deb_version::compare_versions( - task1.meta.version.as_deref().unwrap_or_default(), - task2.meta.version.as_deref().unwrap_or_default(), - )) - } - - fn select_task<'ctx>(tasks: &'ctx [TaskDef], args: &TaskArgs) -> Option<&'ctx TaskDef> { - tasks - .iter() - .filter(|task| Self::match_task(task, args)) - .max_by(|task1, task2| Self::compare_tasks(task1, task2)) - } - - fn get_with_args<'a>(&self, id: &'a TaskID, args: &TaskArgs) -> Result<'a, &TaskDef> { - self.tasks - .get(id) - .and_then(|tasks| Self::select_task(tasks, args)) - .ok_or(Error { - task: id, - kind: ErrorKind::TaskNotFound, - }) - } - - pub fn get<'a>(&self, task: &TaskRef<'a>) -> Result<'a, &TaskDef> { - self.get_with_args(task.id, task.args.as_ref()) - } - - fn task_ref<'ctx>(&'ctx self, id: &'ctx TaskID, args: &TaskArgs) -> Result { - let task_def = self.get_with_args(id, args)?; - - let mut arg_def: HashMap<_, _> = task_def.args.iter().map(|(k, &v)| (k, v)).collect(); - for (key, arg) in &self.globals { - // TODO: Handle conflicts between explicit args and globals - arg_def.insert(key, ArgType::from(arg)); - } - - let mut new_args = TaskArgs::default(); - - for (key, typ) in arg_def { - if let Some(arg) = args.get(key) { - if ArgType::from(arg) == typ { - new_args.set(key, Some(arg)); - continue; - } - } - return Err(Error { - task: id, - kind: ErrorKind::InvalidArgument(key), - }); - } - - let build_to_host = platform_relation(&new_args, "build", "host"); - let host_to_target = platform_relation(&new_args, "host", "target"); - let build_to_target = platform_relation(&new_args, "build", "target"); - - let cross_compile = build_to_host - .as_ref() - .map(|build_to_host| build_to_host.cross_compile.clone()); - - new_args.set("build_to_host", build_to_host); - new_args.set("host_to_target", host_to_target); - new_args.set("build_to_target", build_to_target); - - new_args.set("cross_compile", cross_compile); - - new_args.set("basename", Some(task_def.meta.basename.clone())); - new_args.set("recipename", Some(task_def.meta.recipename.clone())); - new_args.set("recipe", Some(task_def.meta.recipe.clone())); - new_args.set("name", Some(task_def.meta.name.clone())); - new_args.set("version", task_def.meta.version.clone()); - - Ok(TaskRef { - id, - args: Rc::new(new_args), - }) - } - - pub fn parse(&self, s: &str) -> error::Result<(TaskRef, TaskFlags)> { - let (parsed, flags) = parse::task_with_flags(s) - .ok() - .context("Invalid task syntax")?; - - let recipe = parsed.id.recipe.to_string(); - let task = parsed.id.task.to_string(); - - let id = TaskID { recipe, task }; - let (ctx_id, _) = self - .tasks - .get_key_value(&id) - .with_context(|| format!("Task {} not found", id))?; - - let mut args = self.globals.clone(); - - if let Some(host) = parsed.args.host { - let plat = self - .platforms - .get(host) - .with_context(|| format!("Platform '{}' not found", host))?; - args.set("host", Some(plat)); - args.set("target", Some(plat)); - } - if let Some(target) = parsed.args.target { - let plat = self - .platforms - .get(target) - .with_context(|| format!("Platform '{}' not found", target))?; - args.set("target", Some(plat)); - } - - let task_ref = self - .task_ref(ctx_id, &args) - .with_context(|| format!("Failed to instantiate task {}", id))?; - - Ok((task_ref, flags)) - } - - fn map_args<'ctx, 'args>( - task: &'ctx TaskID, - mapping: &'ctx ArgMapping, - args: &'args TaskArgs, - build_dep: bool, - ) -> Result<'ctx, Cow<'args, TaskArgs>> { - if mapping.0.is_empty() && !build_dep { - return Ok(Cow::Borrowed(args)); - } - - let mut ret = args.clone(); - - if build_dep { - ret.set("host", args.get("build")); - ret.set("target", args.get("host")); - } - - for (to, from) in &mapping.0 { - let value = args.get(from).ok_or(Error { - task, - kind: ErrorKind::InvalidArgRef(to), - })?; - ret.set(to, Some(value.clone())); - } - - Ok(Cow::Owned(ret)) - } - - fn parent_ref<'ctx>(&'ctx self, dep: &'ctx ParentDep, args: &TaskArgs) -> Result { - let mapped_args = Context::map_args(&dep.dep.id, &dep.dep.args, args, false)?; - self.task_ref(&dep.dep.id, mapped_args.as_ref()) - } - - pub fn output_ref<'ctx>( - &'ctx self, - dep: &'ctx OutputDep, - args: &TaskArgs, - build_dep: bool, - ) -> Result> { - let mapped_args = Context::map_args(&dep.dep.id, &dep.dep.args, args, build_dep)?; - Ok(OutputRef { - task: self.task_ref(&dep.dep.id, mapped_args.as_ref())?, - output: &dep.output, - }) - } - - pub fn get_parent_depend<'ctx>( - &'ctx self, - task_ref: &TaskRef<'ctx>, - ) -> Result> { - let task = self.get(task_ref)?; - let Some(parent) = &task.parent else { - return Ok(None); - }; - Some(self.parent_ref(parent, &task_ref.args)).transpose() - } - - fn ancestor_iter<'ctx>( - &'ctx self, - task_ref: &TaskRef<'ctx>, - ) -> impl Iterator> { - struct Iter<'ctx>(&'ctx Context, Option>>); - - impl<'ctx> Iterator for Iter<'ctx> { - type Item = Result<'ctx, TaskRef<'ctx>>; - - fn next(&mut self) -> Option { - let task_ref = match self.1.take()? { - Ok(task_ref) => task_ref, - Err(err) => return Some(Err(err)), - }; - self.1 = self.0.get_parent_depend(&task_ref).transpose(); - Some(Ok(task_ref)) - } - } - - Iter(self, Some(Ok(task_ref.clone()))) - } - - pub fn get_build_depends<'ctx>( - &'ctx self, - task_ref: &TaskRef<'ctx>, - ) -> Result> { - let mut ret = HashSet::new(); - let mut allow_noinherit = true; - - for current in self.ancestor_iter(task_ref) { - let current_ref = current?; - let task = self.get(¤t_ref)?; - let entries = task - .build_depends - .iter() - .filter(|dep| allow_noinherit || !dep.noinherit) - .map(|dep| self.output_ref(dep, ¤t_ref.args, true)) - .collect::>>()?; - ret.extend(entries); - - allow_noinherit = false; - } - - Ok(ret) - } - - pub fn get_host_depends<'ctx>( - &'ctx self, - task_ref: &TaskRef<'ctx>, - ) -> Result> { - let mut ret = HashSet::new(); - let mut allow_noinherit = true; - - for current in self.ancestor_iter(task_ref) { - let current_ref = current?; - let task = self.get(¤t_ref)?; - let entries = task - .depends - .iter() - .filter(|dep| allow_noinherit || !dep.noinherit) - .map(|dep| self.output_ref(dep, ¤t_ref.args, false)) - .collect::>>()?; - ret.extend(entries); - - allow_noinherit = false; - } - - Ok(ret) - } -} - -impl Index<&TaskRef<'_>> for Context { - type Output = TaskDef; - - fn index(&self, index: &TaskRef) -> &TaskDef { - self.get(index).expect("Invalid TaskRef") - } -} diff --git a/crates/driver/src/driver.rs b/crates/driver/src/driver.rs deleted file mode 100644 index b2655c6..0000000 --- a/crates/driver/src/driver.rs +++ /dev/null @@ -1,480 +0,0 @@ -use std::{ - collections::{HashMap, HashSet}, - iter, - os::unix::{net::UnixStream, prelude::*}, -}; - -use indoc::indoc; -use nix::{ - poll, - sys::{ - signal, - signalfd::{SfdFlags, SignalFd}, - }, -}; - -use common::{error::*, string_hash::*, types::*}; -use runner::Runner; - -use crate::{ - context::{Context, OutputRef, TaskRef}, - paths, resolve, - task::*, - template, -}; - -#[derive(Debug)] -pub struct CompletionState<'ctx> { - ctx: &'ctx Context, - tasks_done: HashMap, TaskOutput>, -} - -impl<'ctx> CompletionState<'ctx> { - pub fn new(ctx: &'ctx Context) -> Self { - CompletionState { - ctx, - tasks_done: Default::default(), - } - } - - // Treats both "depends" and "parent" as dependencies - fn deps_satisfied(&self, task_ref: &TaskRef) -> bool { - resolve::get_dependent_tasks(self.ctx, task_ref) - .map_err(|_| Error::new(format!("invalid dependency for {}", task_ref))) - .unwrap() - .into_iter() - .all(|dep| self.tasks_done.contains_key(&dep)) - } - - fn fetch_deps(&self, task: &TaskRef<'ctx>) -> Result> { - let task_def = &self.ctx[task]; - task_def - .fetch - .iter() - .map(|Fetch { name, sha256 }| { - Ok(Dependency::Fetch { - name: template::ENGINE.eval(name, &task.args).with_context(|| { - format!("Failed to evaluate fetch filename for task {}", task) - })?, - target_dir: paths::TASK_DLDIR.to_string(), - sha256: *sha256, - }) - }) - .collect() - } - - fn dep_closure(&self, deps: I, path: &'ctx str) -> impl Iterator + '_ - where - I: IntoIterator>, - { - resolve::runtime_depends(self.ctx, deps) - .expect("invalid runtime depends") - .into_iter() - .filter_map(|dep| self.tasks_done[&dep.task].outputs.get(dep.output)) - .map(|&output| Dependency::Task { - output, - path: path.to_string(), - }) - } - - fn build_deps(&self, task: &TaskRef<'ctx>) -> Result + '_> { - Ok(self.dep_closure( - self.ctx - .get_build_depends(task) - .with_context(|| format!("invalid build depends for {}", task))?, - "", - )) - } - - fn host_deps(&self, task: &TaskRef<'ctx>) -> Result + '_> { - Ok(self.dep_closure( - self.ctx - .get_host_depends(task) - .with_context(|| format!("invalid depends for {}", task))?, - paths::TASK_SYSROOT, - )) - } - - fn task_deps(&self, task: &TaskRef<'ctx>) -> Result> { - let fetch_deps = self.fetch_deps(task)?.into_iter(); - let build_deps = self.build_deps(task)?; - let host_deps = self.host_deps(task)?; - - Ok(fetch_deps.chain(build_deps).chain(host_deps).collect()) - } - - fn task_ancestors(&self, task_ref: &TaskRef<'ctx>) -> Vec { - let Some(parent) = self - .ctx - .get_parent_depend(task_ref) - .expect("invalid parent depends") - else { - return vec![]; - }; - - let mut chain = self.task_ancestors(&parent); - if let Some(layer) = self.tasks_done[&parent].layer { - chain.push(layer); - } - chain - } - - fn print_summary(&self) { - println!(); - println!("Summary:"); - - let mut tasks: Box<[_]> = self.tasks_done.iter().collect(); - tasks.sort_by_cached_key(|(task, _)| format!("{:#}", task)); - for (task_ref, task) in tasks.iter() { - println!(); - println!("{:#}", task_ref); - if let Some(hash) = task.input_hash { - println!(" input: {}", hash); - } - if let Some(hash) = task.layer { - println!(" layer: {}", hash); - } - if !task.outputs.is_empty() { - println!(" outputs:"); - - let mut outputs: Box<[_]> = task.outputs.iter().collect(); - outputs.sort_by_key(|(output, _)| *output); - for (output, hash) in outputs.iter() { - println!(" {}: {}", output, hash); - } - } - } - } -} - -#[derive(Debug)] -enum SpawnResult { - Spawned(UnixStream), - Skipped(TaskOutput), -} - -#[derive(Debug, PartialEq, Eq, Hash)] -enum TaskWaitResult { - Failed, - Interrupted, -} - -#[derive(Debug)] -pub struct Driver<'ctx> { - rdeps: HashMap, Vec>>, - force_run: HashSet>, - tasks_blocked: HashSet>, - tasks_runnable: Vec>, - tasks_running: HashMap)>, - state: CompletionState<'ctx>, -} - -impl<'ctx> Driver<'ctx> { - pub fn new( - ctx: &'ctx Context, - taskset: HashSet>, - force_run: HashSet>, - ) -> Result { - let mut driver = Driver { - rdeps: Default::default(), - force_run, - tasks_blocked: Default::default(), - tasks_runnable: Default::default(), - tasks_running: Default::default(), - state: CompletionState::new(ctx), - }; - - for task in taskset { - let mut has_depends = false; - for dep in resolve::get_dependent_tasks(ctx, &task) - .map_err(|_| Error::new(format!("invalid dependency for {}", task)))? - { - let rdep = driver.rdeps.entry(dep.clone()).or_default(); - rdep.push(task.clone()); - has_depends = true; - } - - if has_depends { - driver.tasks_blocked.insert(task); - } else { - driver.tasks_runnable.push(task); - } - } - - Ok(driver) - } - - const PREAMBLE: &'static str = indoc! {" - export PATH={{build.prefix}}/sbin:{{build.prefix}}/bin:$PATH - cd {{workdir}} - - export SOURCE_DATE_EPOCH=1 - - export AR_FOR_BUILD=ar - export AS_FOR_BUILD=as - export DLLTOOL_FOR_BUILD=dlltool - export CC_FOR_BUILD=gcc - export CXX_FOR_BUILD=g++ - export GCC_FOR_BUILD=gcc - export GFORTRAN_FOR_BUILD=gfortran - export GOC_FOR_BUILD=goc - export LD_FOR_BUILD=ld - export LIPO_FOR_BUILD=lipo - export NM_FOR_BUILD=nm - export OBJCOPY_FOR_BUILD=objcopy - export OBJDUMP_FOR_BUILD=objdump - export RANLIB_FOR_BUILD=ranlib - export STRIP_FOR_BUILD=strip - export WINDRES_FOR_BUILD=windres - export WINDMC_FOR_BUILD=windmc - "}; - const PREAMBLE_HOST: &'static str = indoc! {" - export AR={{build_to_host.cross_compile}}ar - export AS={{build_to_host.cross_compile}}as - export DLLTOOL={{build_to_host.cross_compile}}dlltool - export CC={{build_to_host.cross_compile}}gcc - export CXX={{build_to_host.cross_compile}}g++ - export GCC={{build_to_host.cross_compile}}gcc - export GFORTRAN={{build_to_host.cross_compile}}gfortran - export GOC={{build_to_host.cross_compile}}goc - export LD={{build_to_host.cross_compile}}ld - export LIPO={{build_to_host.cross_compile}}lipo - export NM={{build_to_host.cross_compile}}nm - export OBJCOPY={{build_to_host.cross_compile}}objcopy - export OBJDUMP={{build_to_host.cross_compile}}objdump - export RANLIB={{build_to_host.cross_compile}}ranlib - export STRIP={{build_to_host.cross_compile}}strip - export WINDRES={{build_to_host.cross_compile}}windres - export WINDMC={{build_to_host.cross_compile}}windmc - "}; - const PREAMBLE_TARGET: &'static str = indoc! {" - export AR_FOR_TARGET={{build_to_target.cross_compile}}ar - export AS_FOR_TARGET={{build_to_target.cross_compile}}as - export DLLTOOL_FOR_TARGET={{build_to_target.cross_compile}}dlltool - export CC_FOR_TARGET={{build_to_target.cross_compile}}gcc - export CXX_FOR_TARGET={{build_to_target.cross_compile}}g++ - export GCC_FOR_TARGET={{build_to_target.cross_compile}}gcc - export GFORTRAN_FOR_TARGET={{build_to_target.cross_compile}}gfortran - export GOC_FOR_TARGET={{build_to_target.cross_compile}}goc - export LD_FOR_TARGET={{build_to_target.cross_compile}}ld - export LIPO_FOR_TARGET={{build_to_target.cross_compile}}lipo - export NM_FOR_TARGET={{build_to_target.cross_compile}}nm - export OBJCOPY_FOR_TARGET={{build_to_target.cross_compile}}objcopy - export OBJDUMP_FOR_TARGET={{build_to_target.cross_compile}}objdump - export RANLIB_FOR_TARGET={{build_to_target.cross_compile}}ranlib - export STRIP_FOR_TARGET={{build_to_target.cross_compile}}strip - export WINDRES_FOR_TARGET={{build_to_target.cross_compile}}windres - export WINDMC_FOR_TARGET={{build_to_target.cross_compile}}windmc - "}; - - fn task_preamble(task_ref: &TaskRef<'ctx>) -> Vec<&'static str> { - let mut ret = vec![Self::PREAMBLE]; - - if task_ref.args.contains_key("build_to_host") { - ret.push(Self::PREAMBLE_HOST); - } - if task_ref.args.contains_key("build_to_target") { - ret.push(Self::PREAMBLE_TARGET); - } - ret - } - - fn update_runnable(&mut self, task_ref: TaskRef<'ctx>, task_output: TaskOutput) { - let rdeps = self.rdeps.get(&task_ref); - - self.state.tasks_done.insert(task_ref, task_output); - - for rdep in rdeps.unwrap_or(&Vec::new()) { - if !self.tasks_blocked.contains(rdep) { - continue; - } - if self.state.deps_satisfied(rdep) { - self.tasks_blocked.remove(rdep); - self.tasks_runnable.push(rdep.clone()); - } - } - } - - fn spawn_task(&self, task_ref: &TaskRef<'ctx>, runner: &Runner) -> Result { - let task_def = &self.state.ctx[task_ref]; - if task_def.action.is_empty() { - println!("Skipping empty task {:#}", task_ref); - return Ok(SpawnResult::Skipped(TaskOutput::default())); - } - - let task_deps = self.state.task_deps(task_ref)?; - let task_output = task_def - .output - .iter() - .map(|(name, Output { path, .. })| { - let output_path = if let Some(path) = path { - format!("{}/{}", paths::TASK_DESTDIR, path) - } else { - paths::TASK_DESTDIR.to_string() - }; - (name.clone(), output_path) - }) - .collect(); - - let ancestors = self.state.task_ancestors(task_ref); - - let mut run = Self::task_preamble(task_ref); - run.push(&task_def.action.run); - - let command = template::ENGINE - .eval_sh(&run.concat(), &task_ref.args) - .with_context(|| { - format!("Failed to evaluate command template for task {}", task_ref) - })?; - - let rootfs = self.state.ctx.get_rootfs(); - let task = Task { - label: format!("{:#}", task_ref), - command, - workdir: paths::TASK_WORKDIR.to_string(), - rootfs: rootfs.0, - ancestors, - depends: task_deps, - outputs: task_output, - pins: HashMap::from([rootfs.clone()]), - force_run: self.force_run.contains(task_ref), - }; - - Ok(SpawnResult::Spawned(runner.spawn(&task))) - } - - fn run_task(&mut self, task_ref: TaskRef<'ctx>, runner: &Runner) -> Result<()> { - match self.spawn_task(&task_ref, runner)? { - SpawnResult::Spawned(socket) => { - assert!(self - .tasks_running - .insert(socket.as_raw_fd(), (socket, task_ref)) - .is_none()); - } - SpawnResult::Skipped(result) => { - self.update_runnable(task_ref, result); - } - } - Ok(()) - } - - fn run_tasks(&mut self, runner: &Runner) -> Result<()> { - while let Some(task_ref) = self.tasks_runnable.pop() { - self.run_task(task_ref, runner)?; - } - Ok(()) - } - - fn wait_for_task(&mut self, signal_fd: &mut SignalFd) -> Result> { - let mut pollfds: Vec<_> = self - .tasks_running - .values() - .map(|(socket, _)| socket.as_fd()) - .chain(iter::once(signal_fd.as_fd())) - .map(|fd| poll::PollFd::new(fd, poll::PollFlags::POLLIN)) - .collect(); - - while poll::poll(&mut pollfds, poll::PollTimeout::NONE).context("poll()")? == 0 {} - - let pollevents: Vec<_> = pollfds - .into_iter() - .map(|pollfd| { - ( - pollfd.as_fd().as_raw_fd(), - pollfd.revents().expect("Unknown events in poll() return"), - ) - }) - .collect(); - - for (fd, events) in pollevents { - if !events.contains(poll::PollFlags::POLLIN) { - if events.intersects(!poll::PollFlags::POLLIN) { - return Err(Error::new( - "Unexpected error status for socket file descriptor", - )); - } - continue; - } - - if fd == signal_fd.as_raw_fd() { - let _signal = signal_fd.read_signal().expect("read_signal()").unwrap(); - return Ok(Some(TaskWaitResult::Interrupted)); - } - - let (socket, task_ref) = self.tasks_running.remove(&fd).unwrap(); - - match Runner::result(&socket) { - Ok(task_output) => { - self.update_runnable(task_ref, task_output); - } - Err(error) => { - eprintln!("{}", error); - return Ok(Some(TaskWaitResult::Failed)); - } - } - } - - Ok(None) - } - - fn is_done(&self) -> bool { - self.tasks_blocked.is_empty() - && self.tasks_runnable.is_empty() - && self.tasks_running.is_empty() - } - - fn setup_signalfd() -> Result { - let mut signals = signal::SigSet::empty(); - signals.add(signal::Signal::SIGINT); - signal::pthread_sigmask(signal::SigmaskHow::SIG_BLOCK, Some(&signals), None) - .expect("pthread_sigmask()"); - SignalFd::with_flags(&signals, SfdFlags::SFD_CLOEXEC) - .context("Failed to create signal file descriptor") - } - - fn raise_sigint() { - let mut signals = signal::SigSet::empty(); - signals.add(signal::Signal::SIGINT); - signal::pthread_sigmask(signal::SigmaskHow::SIG_UNBLOCK, Some(&signals), None) - .expect("pthread_sigmask()"); - signal::raise(signal::Signal::SIGINT).expect("raise()"); - unreachable!(); - } - - pub fn run(&mut self, runner: &Runner, keep_going: bool) -> Result { - let mut success = true; - let mut interrupted = false; - - let mut signal_fd = Self::setup_signalfd()?; - - self.run_tasks(runner)?; - - while !self.tasks_running.is_empty() { - match self.wait_for_task(&mut signal_fd)? { - Some(TaskWaitResult::Failed) => { - success = false; - } - Some(TaskWaitResult::Interrupted) => { - if interrupted { - Self::raise_sigint(); - } - eprintln!("Interrupt received, not spawning new tasks. Interrupt again to stop immediately."); - interrupted = true; - } - None => {} - } - if !interrupted && (success || keep_going) { - self.run_tasks(runner)?; - } - } - - if interrupted || !success { - return Ok(false); - } - - assert!(self.is_done(), "No runnable tasks left"); - self.state.print_summary(); - - Ok(true) - } -} diff --git a/crates/driver/src/main.rs b/crates/driver/src/main.rs deleted file mode 100644 index bd08f18..0000000 --- a/crates/driver/src/main.rs +++ /dev/null @@ -1,79 +0,0 @@ -mod args; -mod context; -mod driver; -mod parse; -mod paths; -mod pin; -mod recipe; -mod resolve; -mod task; -mod template; - -use std::collections::HashSet; - -use clap::Parser; - -use runner::Runner; - -#[derive(Parser)] -#[clap(version, about)] -struct Opts { - /// Allow N jobs at once. - /// Defaults to the number of available CPUs - #[clap(short, long)] - jobs: Option, - /// Keep going after some tasks have failed - #[clap(short, long)] - keep_going: bool, - /// The tasks to run - #[clap(name = "task", required = true)] - tasks: Vec, -} - -fn main() { - let opts: Opts = Opts::parse(); - - let runner = unsafe { Runner::new(&runner::Options { jobs: opts.jobs }) }.unwrap(); - - let ctx = context::Context::new( - recipe::read_recipes("examples/recipes").unwrap(), - pin::read_pins("examples/pins.yml").unwrap(), - ) - .unwrap(); - - let mut rsv = resolve::Resolver::new(&ctx); - let mut force_run = HashSet::new(); - - for task in opts.tasks { - let (task_ref, flags) = match ctx.parse(&task) { - Ok(task_ref) => task_ref, - Err(err) => { - eprintln!("{}", err); - std::process::exit(1); - } - }; - let errors = rsv.add_goal(&task_ref); - if !errors.is_empty() { - for error in errors { - eprintln!("{}", error); - } - std::process::exit(1); - } - if flags.force_run { - force_run.insert(task_ref); - } - } - let taskset = rsv.into_taskset(); - let mut driver = driver::Driver::new(&ctx, taskset, force_run).unwrap(); - match driver.run(&runner, opts.keep_going) { - Ok(success) => { - if !success { - std::process::exit(1); - } - } - Err(error) => { - eprintln!("{}", error); - std::process::exit(1); - } - } -} diff --git a/crates/driver/src/parse.rs b/crates/driver/src/parse.rs deleted file mode 100644 index 5857efb..0000000 --- a/crates/driver/src/parse.rs +++ /dev/null @@ -1,72 +0,0 @@ -#[derive(Debug, Clone, Copy)] -pub struct TaskID<'a> { - pub recipe: &'a str, - pub task: &'a str, -} - -#[derive(Debug, Clone, Copy, Default)] -pub struct TaskArgs<'a> { - pub host: Option<&'a str>, - pub target: Option<&'a str>, -} - -#[derive(Debug, Clone, Copy)] -pub struct Task<'a> { - pub id: TaskID<'a>, - pub args: TaskArgs<'a>, -} - -#[derive(Debug, Clone, Copy)] -pub struct TaskFlags { - pub force_run: bool, -} - -peg::parser! { - grammar rules() for str { - rule t(tag: rule<()>, value: rule) -> T - = tag() v:value() { v } - - rule name_char() - = ['a'..='z' | 'A' ..='Z' | '0'..='9' | '_' | '-'] - - rule name() -> &'input str - = $(name_char()+) - - rule recipe_id() -> &'input str - = $(name() ("/" name())?) - - rule task_id() -> TaskID<'input> - = recipe:recipe_id() "::" task:name() { - TaskID { recipe, task } - } - - rule task_args() -> TaskArgs<'input> - = "@" host:name()? target:t(<":">, )? { - TaskArgs { - host, - target, - } - } - / { Default::default() } - - pub rule task() -> Task<'input> - = id:task_id() args:task_args() { - Task { - id, - args, - } - } - - rule force_run() -> bool - = "+" { true } - / { false } - - rule task_flags() -> TaskFlags - = force_run:force_run() { TaskFlags { force_run } } - - pub rule task_with_flags() -> (Task<'input>, TaskFlags) - = task:task() flags:task_flags() { (task, flags) } - } -} - -pub use rules::*; diff --git a/crates/driver/src/paths.rs b/crates/driver/src/paths.rs deleted file mode 100644 index 274dda1..0000000 --- a/crates/driver/src/paths.rs +++ /dev/null @@ -1,4 +0,0 @@ -pub const TASK_DESTDIR: &str = "/build/dest"; -pub const TASK_DLDIR: &str = "/build/downloads"; -pub const TASK_WORKDIR: &str = "/build/work"; -pub const TASK_SYSROOT: &str = "/opt/toolchain/sysroot"; diff --git a/crates/driver/src/pin.rs b/crates/driver/src/pin.rs deleted file mode 100644 index 26e445c..0000000 --- a/crates/driver/src/pin.rs +++ /dev/null @@ -1,39 +0,0 @@ -use std::{collections::HashMap, fs::File, path::Path}; - -use serde::{Deserialize, Serialize}; - -use common::{error::*, string_hash::*}; - -#[derive(Clone, Debug, Deserialize, Serialize)] -pub struct Args { - pub host: Option, - pub target: Option, -} - -#[derive(Clone, Debug, Deserialize, Serialize)] -pub struct Provides { - pub recipe: String, - pub task: String, - pub output: Vec, - pub args: Args, -} - -#[derive(Clone, Debug, Deserialize, Serialize)] -#[serde(rename_all = "kebab-case")] -pub struct Pin { - pub hash: Option, - #[serde(default)] - pub provides: Vec, - #[serde(default)] - pub is_rootfs: bool, -} - -pub type Pins = HashMap; - -pub fn read_pins>(path: P) -> Result { - let f = File::open(path)?; - let pins: Pins = serde_yaml::from_reader(f) - .map_err(Error::new) - .context("YAML error")?; - Ok(pins) -} diff --git a/crates/driver/src/recipe.rs b/crates/driver/src/recipe.rs deleted file mode 100644 index 16d3751..0000000 --- a/crates/driver/src/recipe.rs +++ /dev/null @@ -1,188 +0,0 @@ -use std::{collections::HashMap, ffi::OsStr, fs::File, path::Path, result}; - -use scoped_tls_hkt::scoped_thread_local; -use serde::{de::DeserializeOwned, Deserialize, Deserializer}; -use walkdir::WalkDir; - -use common::{error::*, types::*}; - -use crate::task::{TaskDef, TaskMeta}; - -scoped_thread_local!(static CURRENT_RECIPE: str); - -fn current_recipe() -> String { - CURRENT_RECIPE.with(|current| current.to_string()) -} - -pub fn deserialize_task_id<'de, D>(deserializer: D) -> result::Result -where - D: Deserializer<'de>, -{ - #[derive(Deserialize)] - struct RecipeTaskID { - recipe: Option, - task: String, - } - let RecipeTaskID { recipe, task } = RecipeTaskID::deserialize(deserializer)?; - Ok(TaskID { - recipe: recipe.unwrap_or_else(current_recipe), - task, - }) -} - -#[derive(Clone, Debug, Deserialize, Default)] -pub struct RecipeMeta { - pub name: Option, - pub version: Option, -} - -#[derive(Debug, Deserialize)] -struct Recipe { - #[serde(default)] - pub meta: RecipeMeta, - pub tasks: HashMap, -} - -#[derive(Debug, Deserialize)] -struct Subrecipe { - pub tasks: HashMap, -} - -fn read_yaml(path: &Path) -> Result { - let f = File::open(path).context("IO error")?; - - let value: T = serde_yaml::from_reader(f) - .map_err(Error::new) - .context("YAML error")?; - - Ok(value) -} - -const RECIPE_NAME: &str = "build"; -const RECIPE_PREFIX: &str = "build."; - -fn recipe_name(path: &Path) -> Option<&str> { - if path.extension() != Some("yml".as_ref()) { - return None; - } - - let stem = path.file_stem()?.to_str()?; - if stem == RECIPE_NAME { - return Some(""); - } - stem.strip_prefix(RECIPE_PREFIX) -} - -fn handle_recipe_tasks( - tasks: &mut HashMap>, - recipe_tasks: HashMap, - meta: &TaskMeta, -) { - for (label, mut task) in recipe_tasks { - let task_id = TaskID { - recipe: meta.recipe.clone(), - task: label, - }; - task.meta = meta.clone(); - tasks.entry(task_id).or_default().push(task); - } -} - -fn read_recipe_tasks( - path: &Path, - basename: &str, - tasks: &mut HashMap>, -) -> Result { - let recipe_def = CURRENT_RECIPE.set(basename, || read_yaml::(path))?; - - let name = recipe_def - .meta - .name - .as_deref() - .unwrap_or(basename) - .to_string(); - - let meta = TaskMeta { - basename: basename.to_string(), - recipename: "".to_string(), - recipe: basename.to_string(), - name, - version: recipe_def.meta.version.clone(), - }; - - handle_recipe_tasks(tasks, recipe_def.tasks, &meta); - - Ok(recipe_def.meta) -} - -fn read_subrecipe_tasks( - path: &Path, - basename: &str, - recipename: &str, - recipe_meta: &RecipeMeta, - tasks: &mut HashMap>, -) -> Result<()> { - let recipe = format!("{basename}/{recipename}"); - let recipe_def = CURRENT_RECIPE.set(&recipe, || read_yaml::(path))?; - - let name = recipe_meta.name.as_deref().unwrap_or(basename).to_string(); - - let meta = TaskMeta { - basename: basename.to_string(), - recipename: recipename.to_string(), - recipe: recipe.clone(), - name, - version: recipe_meta.version.clone(), - }; - - handle_recipe_tasks(tasks, recipe_def.tasks, &meta); - - Ok(()) -} - -pub fn read_recipes>(path: P) -> Result>> { - let mut tasks = HashMap::>::new(); - let mut recipe_metas = HashMap::::new(); - - for entry in WalkDir::new(path) - .sort_by(|a, b| { - // Files are sorted first by stem, then by extension, so that - // recipe.yml will always be read before recipe.NAME.yml - let stem_cmp = a.path().file_stem().cmp(&b.path().file_stem()); - let ext_cmp = a.path().extension().cmp(&b.path().extension()); - stem_cmp.then(ext_cmp) - }) - .into_iter() - .filter_map(|e| e.ok()) - { - let path = entry.path(); - if !path.is_file() { - continue; - } - - let Some(recipename) = recipe_name(path) else { - continue; - }; - let Some(basename) = path - .parent() - .and_then(Path::file_name) - .and_then(OsStr::to_str) - else { - continue; - }; - - if recipename.is_empty() { - recipe_metas.insert( - basename.to_string(), - read_recipe_tasks(path, basename, &mut tasks)?, - ); - } else { - let Some(recipe_meta) = recipe_metas.get(basename) else { - continue; - }; - read_subrecipe_tasks(path, basename, recipename, recipe_meta, &mut tasks)?; - } - } - - Ok(tasks) -} diff --git a/crates/driver/src/resolve.rs b/crates/driver/src/resolve.rs deleted file mode 100644 index 102c483..0000000 --- a/crates/driver/src/resolve.rs +++ /dev/null @@ -1,334 +0,0 @@ -use std::collections::{HashMap, HashSet}; -use std::fmt; -use std::rc::Rc; - -use common::types::TaskID; - -use crate::args::TaskArgs; -use crate::context::{self, Context, OutputRef, TaskRef}; - -#[derive(Debug, Default)] -pub struct DepChain<'ctx>(pub Vec>); - -impl<'ctx> fmt::Display for DepChain<'ctx> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let mut first = true; - for task in self.0.iter().rev() { - if !first { - write!(f, " -> ")?; - } - write!(f, "{}", task)?; - - first = false; - } - - Ok(()) - } -} - -impl<'ctx> From> for DepChain<'ctx> { - fn from(task: TaskRef<'ctx>) -> Self { - DepChain(vec![task]) - } -} - -impl<'ctx> From<&TaskRef<'ctx>> for DepChain<'ctx> { - fn from(task: &TaskRef<'ctx>) -> Self { - task.clone().into() - } -} - -impl<'ctx> From<&'ctx TaskID> for DepChain<'ctx> { - fn from(id: &'ctx TaskID) -> Self { - TaskRef { - id, - args: Rc::new(TaskArgs::default()), - } - .into() - } -} - -const MAX_ERRORS: usize = 100; - -#[derive(Debug)] -pub enum ErrorKind<'ctx> { - Context(context::Error<'ctx>), - OutputNotFound(&'ctx str), - DependencyCycle, - TooManyErrors, -} - -#[derive(Debug)] -pub struct Error<'ctx> { - pub dep_chain: DepChain<'ctx>, - pub kind: ErrorKind<'ctx>, -} - -impl<'ctx> Error<'ctx> { - fn output_not_found(task: &TaskRef<'ctx>, output: &'ctx str) -> Self { - Error { - dep_chain: task.into(), - kind: ErrorKind::OutputNotFound(output), - } - } - - fn dependency_cycle(task: &TaskRef<'ctx>) -> Self { - Error { - dep_chain: task.into(), - kind: ErrorKind::DependencyCycle, - } - } - - fn too_many_errors() -> Self { - Error { - dep_chain: DepChain::default(), - kind: ErrorKind::TooManyErrors, - } - } - - fn extend(&mut self, task: &TaskRef<'ctx>) { - self.dep_chain.0.push(task.clone()); - } -} - -impl<'ctx> fmt::Display for Error<'ctx> { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let Error { dep_chain, kind } = self; - match kind { - ErrorKind::Context(err) => { - write!(f, "{}: ", err)?; - } - ErrorKind::OutputNotFound(output) => { - write!(f, "Output '{}' not found: ", output)?; - } - ErrorKind::DependencyCycle => { - write!(f, "Dependency Cycle: ")?; - } - ErrorKind::TooManyErrors => { - write!(f, "Too many errors, stopping.")?; - } - } - dep_chain.fmt(f) - } -} - -impl<'ctx> From> for Error<'ctx> { - fn from(err: context::Error<'ctx>) -> Self { - Error { - dep_chain: err.task.into(), - kind: ErrorKind::Context(err), - } - } -} - -impl<'ctx> std::error::Error for Error<'ctx> {} - -#[derive(Debug, PartialEq)] -enum ResolveState { - Resolving, - Resolved, -} - -pub fn runtime_depends<'ctx, I>( - ctx: &'ctx Context, - deps: I, -) -> Result, Vec> -where - I: IntoIterator>, -{ - fn add_dep<'ctx>( - ret: &mut HashSet>, - ctx: &'ctx Context, - dep: OutputRef<'ctx>, - ) -> Vec> { - if ret.contains(&dep) { - return Vec::new(); - } - - let task = &dep.task; - let task_def = match ctx.get(task) { - Ok(task) => task, - Err(err) => return vec![err.into()], - }; - - let output = match task_def.output.get(dep.output) { - Some(output) => output, - None => { - return vec![Error::output_not_found(task, dep.output)]; - } - }; - - ret.insert(dep.clone()); - - let mut errors = Vec::new(); - for runtime_dep in &output.runtime_depends { - match ctx.output_ref(runtime_dep, &task.args, false) { - Ok(output_ref) => { - for mut error in add_dep(ret, ctx, output_ref) { - error.extend(task); - errors.push(error); - } - } - Err(err) => { - let mut err: Error = err.into(); - err.extend(task); - errors.push(err); - } - }; - } - errors - } - - let mut ret = HashSet::new(); - let mut errors = Vec::new(); - - for dep in deps { - errors.extend(add_dep(&mut ret, ctx, dep)); - } - - if !errors.is_empty() { - return Err(errors); - } - - Ok(ret) -} - -pub fn get_dependent_outputs<'ctx>( - ctx: &'ctx Context, - task_ref: &TaskRef<'ctx>, -) -> Result>, Vec>> { - let deps: HashSet<_> = ctx - .get_build_depends(task_ref) - .map_err(|err| vec![err.into()])? - .into_iter() - .chain( - ctx.get_host_depends(task_ref) - .map_err(|err| vec![err.into()])?, - ) - .collect(); - runtime_depends(ctx, deps) -} - -pub fn get_dependent_tasks<'ctx>( - ctx: &'ctx Context, - task_ref: &TaskRef<'ctx>, -) -> Result>, Vec>> { - Ok(ctx - .get_parent_depend(task_ref) - .map_err(|err| vec![err.into()])? - .into_iter() - .chain( - get_dependent_outputs(ctx, task_ref)? - .into_iter() - .map(|dep| dep.task), - ) - .collect()) -} - -#[derive(Debug)] -pub struct Resolver<'ctx> { - ctx: &'ctx Context, - resolve_state: HashMap, ResolveState>, -} - -impl<'ctx> Resolver<'ctx> { - pub fn new(ctx: &'ctx Context) -> Self { - Resolver { - ctx, - resolve_state: HashMap::new(), - } - } - - fn tasks_resolved(&self) -> bool { - self.resolve_state - .values() - .all(|resolved| *resolved == ResolveState::Resolved) - } - - fn add_task(&mut self, task: &TaskRef<'ctx>, output: Option<&'ctx str>) -> Vec> { - match self.resolve_state.get(task) { - Some(ResolveState::Resolving) => return vec![Error::dependency_cycle(task)], - Some(ResolveState::Resolved) => return vec![], - None => (), - } - - let task_def = match self.ctx.get(task) { - Ok(task_def) => task_def, - Err(err) => return vec![err.into()], - }; - - if let Some(task_output) = output { - if !task_def.output.contains_key(task_output) { - return vec![Error::output_not_found(task, task_output)]; - } - } - - self.resolve_state - .insert(task.clone(), ResolveState::Resolving); - - let mut ret = Vec::new(); - let mut handle_errors = |errors: Vec>| -> Result<(), ()> { - for mut error in errors { - error.extend(task); - ret.push(error); - - if ret.len() > MAX_ERRORS { - ret.push(Error::too_many_errors()); - return Err(()); - } - } - Ok(()) - }; - - let _ = (|| -> Result<(), ()> { - match self.ctx.get_parent_depend(task) { - Ok(Some(parent)) => { - handle_errors(self.add_task(&parent, None))?; - } - Ok(None) => {} - Err(err) => { - handle_errors(vec![err.into()])?; - } - } - - match get_dependent_outputs(self.ctx, task) { - Ok(rdeps) => { - for rdep in rdeps { - handle_errors(self.add_task(&rdep.task, Some(rdep.output)))?; - } - } - Err(errors) => { - handle_errors(errors)?; - } - } - - Ok(()) - })(); - - if ret.is_empty() { - *self - .resolve_state - .get_mut(task) - .expect("Missing resolve_state") = ResolveState::Resolved; - } else { - self.resolve_state.remove(task); - } - - ret - } - - pub fn add_goal(&mut self, task: &TaskRef<'ctx>) -> Vec> { - let ret = self.add_task(task, None); - debug_assert!(self.tasks_resolved()); - ret - } - - pub fn into_taskset(self) -> HashSet> { - debug_assert!(self.tasks_resolved()); - - self.resolve_state - .into_iter() - .map(|entry| entry.0) - .collect() - } -} diff --git a/crates/driver/src/task.rs b/crates/driver/src/task.rs deleted file mode 100644 index e84766e..0000000 --- a/crates/driver/src/task.rs +++ /dev/null @@ -1,96 +0,0 @@ -use std::collections::{HashMap, HashSet}; - -use serde::Deserialize; - -use common::{string_hash::StringHash, types::TaskID}; - -use crate::{ - args::{ArgMapping, ArgType, TaskArgs}, - recipe, -}; - -#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Hash)] -pub struct TaskDep { - #[serde(flatten, deserialize_with = "recipe::deserialize_task_id")] - pub id: TaskID, - #[serde(default)] - pub args: ArgMapping, -} - -#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Hash)] -pub struct Fetch { - pub name: String, - pub sha256: StringHash, -} - -fn default_output_name() -> String { - "default".to_string() -} - -#[derive(Clone, Debug, Deserialize)] -pub struct ParentDep { - #[serde(flatten)] - pub dep: TaskDep, -} - -#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Hash)] -pub struct OutputDep { - #[serde(flatten)] - pub dep: TaskDep, - #[serde(default)] - pub noinherit: bool, - #[serde(default = "default_output_name")] - pub output: String, -} - -#[derive(Clone, Debug, Deserialize, Default)] -pub struct Output { - pub path: Option, - #[serde(default)] - pub runtime_depends: HashSet, -} - -#[derive(Clone, Debug, Deserialize, Default)] -pub struct Action { - #[serde(default)] - pub run: String, -} - -impl Action { - pub fn is_empty(&self) -> bool { - self.run.is_empty() - } -} - -#[derive(Clone, Debug, Default)] -pub struct TaskMeta { - pub basename: String, - pub recipename: String, - pub recipe: String, - pub name: String, - pub version: Option, -} - -#[derive(Clone, Debug, Deserialize, Default)] -pub struct TaskDef { - #[serde(skip)] - pub meta: TaskMeta, - #[serde(default)] - pub args: HashMap, - #[serde(default)] - pub parent: Option, - #[serde(default)] - pub fetch: HashSet, - #[serde(default)] - pub build_depends: HashSet, - #[serde(default)] - pub depends: HashSet, - #[serde(default)] - pub output: HashMap, - #[serde(flatten)] - pub action: Action, - #[serde(default)] - pub priority: i32, - #[serde(skip)] - pub arg_match: TaskArgs, -} diff --git a/crates/driver/src/template.rs b/crates/driver/src/template.rs deleted file mode 100644 index 1a091ed..0000000 --- a/crates/driver/src/template.rs +++ /dev/null @@ -1,42 +0,0 @@ -use handlebars::Handlebars; -use lazy_static::lazy_static; - -use common::error::*; - -use crate::args::TaskArgs; - -fn escape_sh(s: &str) -> String { - format!("'{}'", s.replace('\'', "'\\''")) -} - -#[derive(Debug)] -pub struct TemplateEngine { - tpl: Handlebars<'static>, - tpl_sh: Handlebars<'static>, -} - -impl TemplateEngine { - pub fn new() -> Self { - let mut tpl = Handlebars::new(); - tpl.set_strict_mode(true); - tpl.register_escape_fn(handlebars::no_escape); - - let mut tpl_sh = Handlebars::new(); - tpl_sh.set_strict_mode(true); - tpl_sh.register_escape_fn(escape_sh); - - TemplateEngine { tpl, tpl_sh } - } - - pub fn eval(&self, input: &str, args: &TaskArgs) -> Result { - self.tpl.render_template(input, args).map_err(Error::new) - } - - pub fn eval_sh(&self, input: &str, args: &TaskArgs) -> Result { - self.tpl_sh.render_template(input, args).map_err(Error::new) - } -} - -lazy_static! { - pub static ref ENGINE: TemplateEngine = TemplateEngine::new(); -} diff --git a/crates/rebel-common/Cargo.toml b/crates/rebel-common/Cargo.toml new file mode 100644 index 0000000..954ebe5 --- /dev/null +++ b/crates/rebel-common/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "rebel-common" +version = "0.1.0" +authors = ["Matthias Schiffer "] +license = "MIT" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +hex = { version = "0.4.3", features = ["std", "serde"] } +serde = { version = "1", features = ["derive"] } diff --git a/crates/rebel-common/src/error.rs b/crates/rebel-common/src/error.rs new file mode 100644 index 0000000..ba25af4 --- /dev/null +++ b/crates/rebel-common/src/error.rs @@ -0,0 +1,119 @@ +//! Serializable errors with context + +use std::{error::Error as _, fmt::Display, io, result}; + +use serde::{Deserialize, Serialize}; + +pub trait Contextualizable: Sized { + type Output; + + fn with_context C>(self, f: F) -> Self::Output; + + fn context(self, c: C) -> Self::Output { + self.with_context(|| c) + } +} + +#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)] +pub enum ErrorCause { + Code(i32), + String(String), +} + +impl Display for ErrorCause { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ErrorCause::Code(code) => io::Error::from_raw_os_error(*code).fmt(f), + ErrorCause::String(string) => f.write_str(string), + } + } +} + +#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)] +pub struct Error { + pub cause: ErrorCause, + pub context: Vec, +} + +impl Error { + pub fn new(cause: D) -> Self { + Error { + cause: ErrorCause::String(cause.to_string()), + context: Vec::new(), + } + } + + pub fn from_io(err: &io::Error) -> Self { + if let Some(source) = err + .source() + .and_then(|source| source.downcast_ref::()) + { + return Error::from_io(source).context(err.to_string()); + } + + let cause = match err.raw_os_error() { + Some(code) => ErrorCause::Code(code), + None => ErrorCause::String(err.to_string()), + }; + Error { + cause, + context: vec![], + } + } +} + +impl Display for Error { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("Error: ")?; + + let mut it = self.context.iter().rev(); + if let Some(ctx) = it.next() { + write!(f, "{}\n\nCaused by:\n ", ctx)?; + + for ctx in it { + write!(f, "{}\n ", ctx)?; + } + } + + self.cause.fmt(f) + } +} + +impl Contextualizable for Error { + type Output = Error; + fn with_context C>(self, f: F) -> Self::Output { + let Error { cause, mut context } = self; + context.push(f().to_string()); + Error { cause, context } + } +} + +impl From for Error +where + E: Into, +{ + fn from(err: E) -> Self { + Error::from_io(&err.into()) + } +} + +pub type Result = result::Result; + +impl Contextualizable for result::Result +where + E: Into, +{ + type Output = Result; + + fn with_context C>(self, f: F) -> Self::Output { + self.map_err(|err| err.into().with_context(f)) + } +} + +impl Contextualizable for Option { + type Output = Result; + + fn with_context C>(self, f: F) -> Self::Output { + self.ok_or_else(|| Error::new(f())) + } +} diff --git a/crates/rebel-common/src/lib.rs b/crates/rebel-common/src/lib.rs new file mode 100644 index 0000000..8d630dd --- /dev/null +++ b/crates/rebel-common/src/lib.rs @@ -0,0 +1,3 @@ +pub mod error; +pub mod string_hash; +pub mod types; diff --git a/crates/rebel-common/src/string_hash.rs b/crates/rebel-common/src/string_hash.rs new file mode 100644 index 0000000..a2b00db --- /dev/null +++ b/crates/rebel-common/src/string_hash.rs @@ -0,0 +1,53 @@ +use std::fmt::Display; + +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Copy, PartialEq, Eq, Hash)] +pub struct StringHash(pub [u8; 32]); + +impl Display for StringHash { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(&hex::encode(self.0)) + } +} + +impl std::fmt::Debug for StringHash { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "\"{}\"", self) + } +} + +impl Serialize for StringHash { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + hex::serialize(self.0, serializer) + } +} +impl<'de> Deserialize<'de> for StringHash { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + Ok(StringHash(hex::deserialize(deserializer)?)) + } +} + +macro_rules! stringhash_newtype { + ($id:ident) => { + #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Deserialize, Serialize)] + pub struct $id(pub StringHash); + + impl Display for $id { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f) + } + } + }; +} + +stringhash_newtype!(InputHash); +stringhash_newtype!(DependencyHash); +stringhash_newtype!(LayerHash); +stringhash_newtype!(ArchiveHash); diff --git a/crates/rebel-common/src/types.rs b/crates/rebel-common/src/types.rs new file mode 100644 index 0000000..2a06275 --- /dev/null +++ b/crates/rebel-common/src/types.rs @@ -0,0 +1,54 @@ +use std::{ + collections::{HashMap, HashSet}, + fmt::Display, +}; + +use serde::{Deserialize, Serialize}; + +use crate::string_hash::*; + +#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq, Hash)] +pub struct TaskID { + pub recipe: String, + pub task: String, +} + +impl Display for TaskID { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}::{}", self.recipe, self.task) + } +} + +#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq, Hash)] +#[serde(rename_all = "snake_case")] +pub enum Dependency { + Fetch { + name: String, + target_dir: String, + sha256: StringHash, + }, + Task { + output: ArchiveHash, + path: String, + }, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct Task { + pub label: String, + pub command: String, + pub workdir: String, + pub rootfs: ArchiveHash, + pub ancestors: Vec, + pub depends: HashSet, + pub outputs: HashMap, + pub pins: HashMap, + pub force_run: bool, +} + +#[derive(Clone, Debug, Deserialize, Serialize, Default)] +pub struct TaskOutput { + pub input_hash: Option, + pub layer: Option, + pub outputs: HashMap, +} diff --git a/crates/rebel-runner/Cargo.toml b/crates/rebel-runner/Cargo.toml new file mode 100644 index 0000000..076e47a --- /dev/null +++ b/crates/rebel-runner/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "rebel-runner" +version = "0.1.0" +authors = ["Matthias Schiffer "] +license = "MIT" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +common = { path = "../rebel-common", package = "rebel-common" } + +bincode = "1.3.3" +blake3 = { version = "1.3.0", features = ["traits-preview"] } +capctl = "0.2.0" +digest = "0.10.1" +libc = "0.2.84" +nix = { version = "0.28.0", features = ["user", "fs", "process", "mount", "sched", "poll", "signal", "hostname", "resource"] } +olpc-cjson = "0.1.0" +serde = { version = "1", features = ["derive"] } +serde_json = "1.0.62" +tar = "0.4.32" +tee_readwrite = "0.2.0" +uds = "0.4.1" +walkdir = "2.3.2" diff --git a/crates/rebel-runner/src/init.rs b/crates/rebel-runner/src/init.rs new file mode 100644 index 0000000..ede8fd8 --- /dev/null +++ b/crates/rebel-runner/src/init.rs @@ -0,0 +1,68 @@ +use nix::mount::{self, MsFlags}; + +use common::error::*; + +use crate::{paths, util::fs}; + +fn prepare_dev(path: &str) -> Result<()> { + fs::mkdir(path)?; + mount::mount::<_, _, str, str>(Some(path), path, None, MsFlags::MS_BIND, None) + .context("Failed to bind mount container /dev")?; + + for dir in ["pts", "shm"] { + fs::mkdir(paths::join(&[path, dir]))?; + } + + for (link, target) in [ + ("fd", "/proc/self/fd"), + ("stdin", "/proc/self/fd/0"), + ("stdout", "/proc/self/fd/1"), + ("stderr", "/proc/self/fd/2"), + ("ptmx", "pts/ptmx"), + ] { + let path = paths::join(&[path, link]); + std::os::unix::fs::symlink(target, &path) + .with_context(|| format!("Failed to create link {}", path))?; + } + + for dev in ["null", "zero", "full", "random", "urandom", "tty"] { + let source = paths::join(&["/dev", dev]); + let target = paths::join(&[path, dev]); + fs::create(&target)?; + mount::mount::(Some(&source), &target, None, MsFlags::MS_BIND, None) + .with_context(|| format!("Failed to bind mount {}", source))?; + } + + mount::mount::( + None, + path, + None, + MsFlags::MS_REMOUNT | MsFlags::MS_BIND | MsFlags::MS_RDONLY, + None, + ) + .context("Failed to mount container /dev read-only")?; + + Ok(()) +} + +pub fn init_runner() -> Result<()> { + fs::mkdir(paths::LAYER_STATE_DIR)?; + fs::mkdir(paths::OUTPUT_STATE_DIR)?; + + fs::ensure_removed(paths::TMP_DIR)?; + fs::mkdir(paths::TMP_DIR)?; + mount::mount::<_, _, str, str>( + Some(paths::TMP_DIR), + paths::TMP_DIR, + None, + MsFlags::MS_BIND, + None, + ) + .context("Failed to bind mount build tmpdir")?; + mount::mount::(None, paths::TMP_DIR, None, MsFlags::MS_PRIVATE, None) + .context("Failed to set MS_PRIVATE for build tmpdir")?; + + prepare_dev(paths::DEV_DIR)?; + + Ok(()) +} diff --git a/crates/rebel-runner/src/jobserver.rs b/crates/rebel-runner/src/jobserver.rs new file mode 100644 index 0000000..b0b88cd --- /dev/null +++ b/crates/rebel-runner/src/jobserver.rs @@ -0,0 +1,79 @@ +use std::{ + os::fd::{AsFd, AsRawFd, OwnedFd}, + slice, +}; + +use nix::{errno::Errno, fcntl::OFlag, poll, unistd}; + +use common::error::*; + +use super::util::unix; + +#[derive(Debug)] +pub struct Jobserver { + tokens: usize, + r: OwnedFd, + w: OwnedFd, +} + +impl Jobserver { + pub fn new(tokens: usize) -> Result { + let (r, w) = unistd::pipe2(OFlag::O_CLOEXEC | OFlag::O_NONBLOCK).context("pipe()")?; + + for _ in 0..tokens { + if unistd::write(w.as_fd(), b"+").is_err() { + break; + } + } + unix::set_blocking(&w, true)?; + + Ok(Jobserver { tokens, r, w }) + } + + pub fn wait(&mut self) -> u8 { + loop { + poll::poll( + &mut [poll::PollFd::new(self.r.as_fd(), poll::PollFlags::POLLIN)], + poll::PollTimeout::NONE, + ) + .expect("poll()"); + + let mut token = 0; + match unistd::read(self.r.as_raw_fd(), slice::from_mut(&mut token)) { + Ok(n) => { + assert!(n == 1); + return token; + } + Err(Errno::EAGAIN) => { + // Token was sniped by another task + continue; + } + error @ Err(_) => { + error.expect("read()"); + } + } + } + } + + pub fn post(&mut self, token: u8) { + let n = unistd::write(self.w.as_fd(), slice::from_ref(&token)).expect("write()"); + assert!(n == 1); + } + + pub fn set_cloexec(&mut self, cloexec: bool) -> Result<()> { + unix::set_cloexec(&self.r, cloexec)?; + unix::set_cloexec(&self.w, cloexec)?; + Ok(()) + } + + pub fn to_makeflags(&self) -> String { + format!( + " -j{} --jobserver-auth={},{}", + self.tokens, + self.r.as_raw_fd(), + self.w.as_raw_fd(), + ) + } +} + +// FIXME Log lost tokens on drop diff --git a/crates/rebel-runner/src/lib.rs b/crates/rebel-runner/src/lib.rs new file mode 100644 index 0000000..ab90420 --- /dev/null +++ b/crates/rebel-runner/src/lib.rs @@ -0,0 +1,217 @@ +mod init; +mod jobserver; +mod ns; +mod paths; +mod tar; +mod task; +mod util; + +use std::{ + collections::HashSet, + fs::File, + net, + os::unix::{net::UnixStream, prelude::*}, + process, slice, +}; + +use capctl::prctl; +use nix::{ + errno::Errno, + fcntl::Flock, + poll, + sched::CloneFlags, + sys::{ + signal, + signalfd::{SfdFlags, SignalFd}, + stat, wait, + }, + unistd::{self, Gid, Pid, Uid}, +}; +use uds::UnixSeqpacketConn; + +use common::{error::*, types::*}; + +use jobserver::Jobserver; +use util::{checkable::Checkable, clone, steal::Steal, unix}; + +#[derive(Debug, Clone)] +pub struct Options { + pub jobs: Option, +} + +#[derive(Debug)] +struct RunnerContext { + socket: Steal, + jobserver: Jobserver, + tasks: HashSet, +} + +fn handle_sigchld(ctx: &mut RunnerContext) -> Result<()> { + loop { + let status = match wait::waitpid(Pid::from_raw(-1), Some(wait::WaitPidFlag::WNOHANG)) { + Ok(wait::WaitStatus::StillAlive) | Err(Errno::ECHILD) => return Ok(()), + res => res.expect("waitpid()"), + }; + let pid = status.pid().unwrap(); + if ctx.tasks.remove(&pid) { + status.check()?; + } + } +} + +fn handle_request(ctx: &mut RunnerContext, request_socket: UnixStream) { + let run = || { + ctx.socket.steal(); + + let task: Task = + bincode::deserialize_from(&request_socket).expect("Failed to decode task description"); + + prctl::set_name(&task.label).expect("prctl(PR_SET_NAME)"); + + let result = task::handle(task, &mut ctx.jobserver); + bincode::serialize_into(&request_socket, &result).expect("Failed to send task result"); + drop(request_socket); + }; + + let pid = unsafe { clone::spawn(None, run) }.expect("fork()"); + assert!(ctx.tasks.insert(pid)); +} + +fn handle_socket(ctx: &mut RunnerContext) -> bool { + let mut fd = 0; + + match ctx + .socket + .recv_fds(&mut [0], slice::from_mut(&mut fd)) + .expect("recv_fds()") + { + (1, _, n_fd) => { + assert!(n_fd == 1); + } + _ => return false, + } + + let request_socket = unsafe { UnixStream::from_raw_fd(fd) }; + handle_request(ctx, request_socket); + true +} + +fn borrow_socket_fd(socket: &UnixSeqpacketConn) -> BorrowedFd<'_> { + unsafe { BorrowedFd::borrow_raw(socket.as_raw_fd()) } +} + +fn runner( + uid: Uid, + gid: Gid, + socket: UnixSeqpacketConn, + _lockfile: Flock, + options: &Options, +) -> ! { + unistd::setsid().expect("setsid()"); + ns::mount_proc(); + ns::setup_userns(Uid::from_raw(0), Gid::from_raw(0), uid, gid); + + stat::umask(stat::Mode::from_bits_truncate(0o022)); + + init::init_runner().unwrap(); + + let jobs = options + .jobs + .unwrap_or_else(|| unix::nproc().expect("Failed to get number of available CPUs")); + let jobserver = Jobserver::new(jobs).expect("Failed to initialize jobserver pipe"); + let mut ctx = RunnerContext { + socket: socket.into(), + jobserver, + tasks: HashSet::new(), + }; + + let mut signals = signal::SigSet::empty(); + signals.add(signal::Signal::SIGCHLD); + signal::pthread_sigmask(signal::SigmaskHow::SIG_BLOCK, Some(&signals), None) + .expect("pthread_sigmask()"); + let mut signal_fd = SignalFd::with_flags(&signals, SfdFlags::SFD_CLOEXEC) + .expect("Failed to create signal file descriptor"); + + loop { + let socket_fd = borrow_socket_fd(&ctx.socket); + let mut pollfds = [ + poll::PollFd::new(signal_fd.as_fd(), poll::PollFlags::POLLIN), + poll::PollFd::new(socket_fd.as_fd(), poll::PollFlags::POLLIN), + ]; + poll::poll(&mut pollfds, poll::PollTimeout::NONE).expect("poll()"); + + let signal_events = pollfds[0] + .revents() + .expect("Unknown events in poll() return"); + let socket_events = pollfds[1] + .revents() + .expect("Unknown events in poll() return"); + + if signal_events.contains(poll::PollFlags::POLLIN) { + let _signal = signal_fd.read_signal().expect("read_signal()").unwrap(); + handle_sigchld(&mut ctx).expect("Task process exited abnormally"); + } else if signal_events.intersects(!poll::PollFlags::POLLIN) { + panic!("Unexpected error status for signal file descriptor"); + } + + if socket_events.contains(poll::PollFlags::POLLIN) { + if !handle_socket(&mut ctx) { + break; + } + } else if socket_events.intersects(!poll::PollFlags::POLLIN) { + panic!("Unexpected error status for socket file descriptor"); + } + } + + process::exit(0); +} + +pub struct Runner { + socket: UnixSeqpacketConn, +} + +impl Runner { + /// Creates a new container runner + /// + /// # Safety + /// + /// Do not call in multithreaded processes. + pub unsafe fn new(options: &Options) -> Result { + let lockfile = unix::lock(paths::LOCKFILE, true, false) + .context("Failed to get lock on build directory, is another instance running?")?; + + let uid = unistd::geteuid(); + let gid = unistd::getegid(); + + let (local, remote) = UnixSeqpacketConn::pair().expect("socketpair()"); + + match clone::clone( + CloneFlags::CLONE_NEWUSER | CloneFlags::CLONE_NEWNS | CloneFlags::CLONE_NEWPID, + ) + .expect("clone()") + { + unistd::ForkResult::Parent { .. } => Ok(Runner { socket: local }), + unistd::ForkResult::Child => { + drop(local); + runner(uid, gid, remote, lockfile, options); + } + } + } + + pub fn spawn(&self, task: &Task) -> UnixStream { + let (local, remote) = UnixStream::pair().expect("socketpair()"); + + self.socket + .send_fds(&[0], &[remote.as_raw_fd()]) + .expect("send()"); + + bincode::serialize_into(&local, task).expect("Task submission failed"); + local.shutdown(net::Shutdown::Write).expect("shutdown()"); + + local + } + + pub fn result(socket: &UnixStream) -> Result { + bincode::deserialize_from(socket).expect("Failed to read task result") + } +} diff --git a/crates/rebel-runner/src/ns.rs b/crates/rebel-runner/src/ns.rs new file mode 100644 index 0000000..4a8e3e7 --- /dev/null +++ b/crates/rebel-runner/src/ns.rs @@ -0,0 +1,84 @@ +use nix::{ + mount::{self, MntFlags, MsFlags}, + sched::CloneFlags, + unistd::{self, Gid, Pid, Uid}, +}; + +use common::error::*; + +use super::util::clone; + +pub fn mount_proc() { + mount::mount::<_, _, _, str>(Some("proc"), "/proc", Some("proc"), MsFlags::empty(), None) + .expect("Failed to mount /proc"); +} + +pub fn setup_userns(inner_uid: Uid, inner_gid: Gid, outer_uid: Uid, outer_gid: Gid) { + std::fs::write("/proc/self/setgroups", "deny").expect("Failed to write /proc/self/setgroups"); + std::fs::write( + "/proc/self/uid_map", + format!("{} {} 1", inner_uid, outer_uid), + ) + .expect("Failed to write /proc/self/uid_map"); + std::fs::write( + "/proc/self/gid_map", + format!("{} {} 1", inner_gid, outer_gid), + ) + .expect("Failed to write /proc/self/gid_map"); +} + +pub unsafe fn spawn(flags: CloneFlags, f: F) -> nix::Result +where + F: FnOnce(), +{ + assert!(flags.contains(CloneFlags::CLONE_NEWNS) || !flags.contains(CloneFlags::CLONE_NEWPID)); + + clone::spawn(Some(flags), || { + if flags.contains(CloneFlags::CLONE_NEWPID) { + mount_proc(); + } + f() + }) +} + +pub fn pivot_root(path: &str) { + (|| -> Result<()> { + unistd::chdir(path).context("chdir()")?; + mount::mount::<_, _, str, str>(Some("/proc"), "proc", None, MsFlags::MS_BIND, None) + .context("Failed to bind mount /proc")?; + unistd::pivot_root(".", ".").context("pivot_root()")?; + mount::umount2(".", MntFlags::MNT_DETACH).context("umount2()")?; + unistd::chdir("/").context("chdir(\"/\")")?; + Ok(()) + })() + .expect("Failed to pivot root"); +} + +pub fn container_mounts() -> Result<()> { + mount::mount( + Some("tmp"), + "/tmp", + Some("tmpfs"), + MsFlags::MS_NODEV | MsFlags::MS_NOSUID, + Some("mode=1777,size=1048576k"), + ) + .context("Failed to mount /tmp")?; + mount::mount( + Some("devpts"), + "/dev/pts", + Some("devpts"), + MsFlags::MS_NOSUID | MsFlags::MS_NOEXEC, + Some("newinstance,ptmxmode=0666,mode=0620"), + ) + .context("Failed to mount /dev/pts")?; + mount::mount( + Some("shm"), + "/dev/shm", + Some("tmpfs"), + MsFlags::MS_NOSUID | MsFlags::MS_NOEXEC | MsFlags::MS_NODEV, + Some("mode=1777,size=65536k"), + ) + .context("Failed to mount /dev/shm")?; + + Ok(()) +} diff --git a/crates/rebel-runner/src/paths.rs b/crates/rebel-runner/src/paths.rs new file mode 100644 index 0000000..4b3a126 --- /dev/null +++ b/crates/rebel-runner/src/paths.rs @@ -0,0 +1,118 @@ +//! Build directory structure used through rebel +//! +//! # Current structure +//! +//! ```text +//! build/ +//! ├── build.lock +//! ├── downloads/ +//! │   └── ... +//! ├── state/ +//! │   ├── output/ +//! │   │  ├── .tar.tmp # during packing +//! │   │  ├── .tar # files are renamed when packing is finished +//! │   │   └── ... +//! │   ├── layer/ +//! │   │ ├── / # overlayfs layer dir of finished tasks +//! │   │ └── ... +//! │   └── task/ +//! │     ├── / +//! │ │ ├── layer/ # overlayfs layer dir (moved to layer/ after build) +//! │ │ ├── work/ # overlayfs work dir (discarded after build) +//! │ │ ├── task.json.tmp # during write +//! │ │ ├── task.json # after write +//! │ │ ├── task.log # stdout/stderr output of the task +//! │ │ └── task.lock # task lockfile +//! │   └── ... +//! └── tmp/ # temporary files (cleaned on start) +//!    ├── dev/ # container /dev +//!    ├── depends/ # unpacked dependencies +//!    └── task/ +//!    └── / +//! ├── build/ # mount point for /build directory +//! │ ├── downloads/ # downloaded sources +//! │ ├── task/ # internal runner files +//! │ └── work/ # build overlay mountpoint +//! └── rootfs/ # rootfs overlay mountpoint +//! ``` + +use common::string_hash::*; + +pub const DOWNLOADS_DIR: &str = "build/downloads"; +pub const PIN_DIR: &str = "build/pinned"; + +pub const TMP_DIR: &str = "build/tmp"; +pub const DEV_DIR: &str = "build/tmp/dev"; +pub const DEPENDS_TMP_DIR: &str = "build/tmp/depends"; +pub const TASK_TMP_DIR: &str = "build/tmp/task"; + +pub const TASK_TMP_ROOTFS_SUBDIR: &str = "rootfs"; + +pub const LOCKFILE: &str = "build/build.lock"; +pub const OUTPUT_STATE_DIR: &str = "build/state/output"; +pub const TASK_STATE_DIR: &str = "build/state/task"; +pub const LAYER_STATE_DIR: &str = "build/state/layer"; + +pub const TASK_STATE_LAYER_SUBDIR: &str = "layer"; +pub const TASK_STATE_WORK_SUBDIR: &str = "work"; + +pub const TASK_BUILDDIR: &str = "build"; +pub const TASK_TASKDIR: &str = "build/task"; + +pub const TASK_RUN: &str = "run"; + +pub fn join(paths: &[&str]) -> String { + paths.join("/") +} + +pub fn task_tmp_dir(hash: &InputHash) -> String { + join(&[TASK_TMP_DIR, &hash.to_string()]) +} + +pub fn task_state_dir(hash: &InputHash) -> String { + join(&[TASK_STATE_DIR, &hash.to_string()]) +} + +pub fn task_cache_tmp_filename(hash: &InputHash) -> String { + join(&[TASK_STATE_DIR, &hash.to_string(), "task.json.tmp"]) +} + +pub fn task_cache_filename(hash: &InputHash) -> String { + join(&[TASK_STATE_DIR, &hash.to_string(), "task.json"]) +} + +pub fn task_log_filename(hash: &InputHash) -> String { + join(&[TASK_STATE_DIR, &hash.to_string(), "task.log"]) +} + +pub fn task_lock_filename(hash: &InputHash) -> String { + join(&[TASK_STATE_DIR, &hash.to_string(), "task.lock"]) +} + +pub fn layer_dir(hash: &LayerHash) -> String { + join(&[LAYER_STATE_DIR, &hash.to_string()]) +} + +pub fn depend_tmp_dir(hash: &ArchiveHash) -> String { + join(&[DEPENDS_TMP_DIR, &hash.to_string()]) +} + +pub fn depend_dir(hash: &ArchiveHash) -> String { + join(&[DEPENDS_TMP_DIR, &hash.to_string()]) +} + +pub fn depend_lock_filename(hash: &ArchiveHash) -> String { + join(&[DEPENDS_TMP_DIR, &format!("{}.lock", hash)]) +} + +pub fn archive_tmp_filename(hash: &InputHash) -> String { + join(&[OUTPUT_STATE_DIR, &format!("{}.tar.tmp", hash)]) +} + +pub fn archive_filename(hash: &ArchiveHash) -> String { + join(&[OUTPUT_STATE_DIR, &format!("{}.tar", hash)]) +} + +pub fn pinned_archive_filename(name: &str) -> String { + join(&[PIN_DIR, &format!("{}.tar", name)]) +} diff --git a/crates/rebel-runner/src/tar.rs b/crates/rebel-runner/src/tar.rs new file mode 100644 index 0000000..1a66408 --- /dev/null +++ b/crates/rebel-runner/src/tar.rs @@ -0,0 +1,105 @@ +use std::{ + io::{self, Read, Write}, + os::unix::prelude::CommandExt, + path::Path, + process::{self, Command, Stdio}, +}; + +use nix::{ + mount::{self, MsFlags}, + sched::CloneFlags, + sys::wait, +}; + +use common::{error::*, string_hash::ArchiveHash}; + +use super::{ + ns, + util::{checkable::Checkable, fs}, +}; +use crate::paths; + +pub fn pack>( + rootfs_hash: &ArchiveHash, + archive: &mut W, + source: P, +) -> Result<()> { + let rootfs = paths::depend_dir(rootfs_hash); + let _rootfs_mount = fs::mount(&rootfs, &rootfs, None, MsFlags::MS_BIND, None) + .with_context(|| format!("Failed to bind mount rootfs to {:?}", rootfs))?; + mount::mount::( + None, + &rootfs, + None, + MsFlags::MS_REMOUNT | MsFlags::MS_BIND | MsFlags::MS_RDONLY, + None, + ) + .context("Failed to mount container rootfs read-only")?; + + let (mut piper, pipew) = fs::pipe()?; + + let exec_tar = || -> Result<()> { + // We are in our own mount namespace, so mounting into the shared rootfs is fine + let dev_target = paths::join(&[&rootfs, "dev"]); + mount::mount::<_, _, str, str>( + Some(paths::DEV_DIR), + dev_target.as_str(), + None, + MsFlags::MS_BIND | MsFlags::MS_REC, + None, + )?; + let mount_target = paths::join(&[&rootfs, paths::TASK_BUILDDIR]); + mount::mount::<_, _, str, str>( + Some(source.as_ref()), + mount_target.as_str(), + None, + MsFlags::MS_BIND, + None, + )?; + + ns::pivot_root(&rootfs); + + let err = Command::new("tar") + .args([ + "-c", + "--sort=name", + "--numeric-owner", + "--owner=0", + "--group=0", + "--mtime=@0", + ".", + ]) + .stdin(Stdio::null()) + .stdout(pipew) + .current_dir(paths::TASK_BUILDDIR) + .env_clear() + .env("PATH", "/usr/sbin:/usr/bin:/sbin:/bin") + .exec(); + eprintln!("{}", err); + process::exit(127); + }; + + let pid = unsafe { ns::spawn(CloneFlags::CLONE_NEWNS, || exec_tar().unwrap()) } + .context("Failed to run tar")?; + + let result = io::copy(&mut piper, archive).context("Failed to write TAR archive"); + + wait::waitpid(pid, None)? + .check() + .context("tar did not exit successfully")?; + + result?; + Ok(()) +} + +pub fn unpack>(archive: R, dest: P) -> Result<()> { + fs::mkdir(&dest)?; + + let mut ar = tar::Archive::new(archive); + ar.set_preserve_permissions(true); + ar.set_preserve_mtime(true); + ar.set_unpack_xattrs(true); + ar.set_overwrite(false); + + ar.unpack(dest).context("Failed to unpack TAR archive") +} diff --git a/crates/rebel-runner/src/task.rs b/crates/rebel-runner/src/task.rs new file mode 100644 index 0000000..19b74f4 --- /dev/null +++ b/crates/rebel-runner/src/task.rs @@ -0,0 +1,638 @@ +use std::{ + collections::{BTreeMap, HashMap, HashSet}, + io::{self, BufWriter}, + os::unix::prelude::CommandExt, + path::{Path, PathBuf}, + process::{self, Command, Stdio}, + time::Instant, +}; + +use capctl::prctl; +use nix::{ + mount::{self, MsFlags}, + sched::{unshare, CloneFlags}, + sys::{resource, time::TimeVal, wait}, + unistd::{self, Gid, Uid}, +}; +use serde::Serialize; +use tee_readwrite::{TeeReader, TeeWriter}; + +use common::{error::*, string_hash::*, types::*}; +use walkdir::WalkDir; + +use super::{ + jobserver::Jobserver, + ns, tar, + util::{checkable::Checkable, cjson, fs}, +}; +use crate::{ + paths, + util::{stack::Stack, unix}, +}; + +const BUILD_UID: Uid = Uid::from_raw(1000); +const BUILD_GID: Gid = Gid::from_raw(1000); + +type InputHasher = blake3::Hasher; +type DependencyHasher = blake3::Hasher; +type LayerHasher = blake3::Hasher; +type ArchiveHasher = blake3::Hasher; + +type DependMap = BTreeMap>; + +fn dependency_hash(dep: &Dependency) -> DependencyHash { + DependencyHash(StringHash( + cjson::digest::(dep).unwrap().into(), + )) +} + +fn input_hash(task: &Task) -> InputHash { + #[derive(Debug, Serialize)] + struct HashInput<'a> { + pub command: &'a str, + pub workdir: &'a str, + pub rootfs: &'a ArchiveHash, + pub ancestors: &'a [LayerHash], + pub depends: HashMap, + pub outputs: &'a HashMap, + } + let input = HashInput { + command: &task.command, + workdir: &task.workdir, + rootfs: &task.rootfs, + ancestors: &task.ancestors, + depends: task + .depends + .iter() + .map(|dep| (dependency_hash(dep), dep)) + .collect(), + outputs: &task.outputs, + }; + + InputHash(StringHash( + cjson::digest::(&input).unwrap().into(), + )) +} + +fn init_task(input_hash: &InputHash, task: &Task) -> Result { + // Remove metadata first to ensure task invalidation + fs::ensure_removed(paths::task_cache_filename(input_hash))?; + + let task_state_dir = paths::task_state_dir(input_hash); + + let task_layer_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_LAYER_SUBDIR]); + fs::ensure_removed(&task_layer_dir)?; + fs::mkdir(&task_layer_dir)?; + fs::fixup_permissions(&task_layer_dir)?; + + let task_tmp_dir = paths::task_tmp_dir(input_hash); + + let taskdir = paths::join(&[&task_tmp_dir, paths::TASK_TASKDIR]); + fs::mkdir(&taskdir)?; + let runfile = paths::join(&[&taskdir, paths::TASK_RUN]); + std::fs::write(&runfile, &task.command) + .with_context(|| format!("Failed to write {}", runfile))?; + + let mount_target = paths::join(&[&task_tmp_dir, &task.workdir]); + let mount = if task.ancestors.is_empty() { + fs::mount(task_layer_dir, &mount_target, None, MsFlags::MS_BIND, None) + .with_context(|| format!("Failed to bind mount to {:?}", mount_target))? + } else { + let task_work_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_WORK_SUBDIR]); + fs::ensure_removed(&task_work_dir)?; + fs::mkdir(&task_work_dir)?; + fs::fixup_permissions(&task_work_dir)?; + + let lower = task + .ancestors + .iter() + .rev() + .map(paths::layer_dir) + .collect::>() + .join(":"); + let options = format!( + "xino=off,index=off,metacopy=off,lowerdir={lower},upperdir={upper},workdir={work}", + lower = lower, + upper = task_layer_dir, + work = task_work_dir + ); + fs::mount( + "overlay", + &mount_target, + Some("overlay"), + MsFlags::empty(), + Some(&options), + ) + .with_context(|| format!("Failed to mount work overlay to {:?}", mount_target))? + }; + + Ok(mount) +} + +fn get_contents, P2: AsRef>( + path: P1, + prefix: P2, +) -> Result<(HashSet, HashSet)> { + let mut dirs = HashSet::new(); + let mut files = HashSet::new(); + + let root: PathBuf = Path::new("/").join(prefix.as_ref()); + + for result in WalkDir::new(path.as_ref()).min_depth(1).into_iter() { + let entry = result + .with_context(|| format!("Failed to list contents of directory {:?}", path.as_ref()))?; + let is_dir = entry.file_type().is_dir(); + let entry_path = root.join(entry.into_path().strip_prefix(path.as_ref()).unwrap()); + if is_dir { + dirs.insert(entry_path); + } else { + files.insert(entry_path); + } + } + + dirs.insert(root); + + Ok((dirs, files)) +} + +fn check_conflicts( + dirs1: &HashSet, + files1: &HashSet, + dirs2: &HashSet, + files2: &HashSet, +) -> Result<()> { + let mut conflicts = Vec::new(); + + conflicts.extend(files1.intersection(files2)); + conflicts.extend(dirs1.intersection(files2)); + conflicts.extend(files1.intersection(dirs2)); + + if !conflicts.is_empty() { + let mut conflict_strings: Box<[_]> = conflicts + .into_iter() + .map(|path| path.to_string_lossy().to_string()) + .collect(); + conflict_strings.sort(); + return Err(Error::new(format!( + "Found the following file conflicts in dependencies:\n{}", + conflict_strings.join("\n") + ))); + } + + Ok(()) +} + +fn init_task_rootfs(input_hash: &InputHash, task: &Task) -> Result> { + let task_tmp_dir = paths::task_tmp_dir(input_hash); + let mount_target = paths::join(&[&task_tmp_dir, paths::TASK_TMP_ROOTFS_SUBDIR]); + let rootfs = paths::depend_dir(&task.rootfs); + + let depends = unpack_dependencies(input_hash, task).context("Failed to unpack dependencies")?; + + let mut mounts = Stack::new(); + + mounts.push( + fs::mount(rootfs, &mount_target, None, MsFlags::MS_BIND, None) + .with_context(|| format!("Failed to bind mount rootfs to {:?}", mount_target))?, + ); + mount::mount::( + None, + &mount_target, + None, + MsFlags::MS_REMOUNT | MsFlags::MS_BIND | MsFlags::MS_RDONLY, + None, + ) + .context("Failed to mount container rootfs read-only")?; + + let (mut dirs, mut files) = get_contents(&mount_target, "")?; + + for (path, dep_hashes) in depends { + assert!(!dep_hashes.is_empty()); + + if !path.is_empty() && !path.starts_with('/') { + return Err(Error::new(format!( + "Dependency path {:?} must be absolute", + path + ))); + } + + let dep_target = mount_target.clone() + &path; + let dep_paths: Box<[_]> = dep_hashes.iter().map(paths::depend_dir).collect(); + + for dep in dep_paths.iter() { + let (dep_dirs, dep_files) = get_contents(dep, &path)?; + check_conflicts(&dirs, &files, &dep_dirs, &dep_files)?; + dirs.extend(dep_dirs); + files.extend(dep_files); + } + + let options = format!( + "xino=off,index=off,metacopy=off,lowerdir={lower}:{base}", + lower = dep_paths.join(":"), + base = dep_target, + ); + + mounts.push( + fs::mount( + "overlay", + dep_target.as_str(), + Some("overlay"), + MsFlags::MS_RDONLY, + Some(&options), + ) + .with_context(|| format!("Failed to mount overlay to {:?}", dep_target))?, + ); + } + + Ok(mounts) +} + +fn cleanup_task(input_hash: &InputHash) -> Result<()> { + let task_tmp_dir = paths::task_tmp_dir(input_hash); + + fs::ensure_removed(task_tmp_dir).context("Failed to remove task tmp dir")?; + + let task_state_dir = paths::task_state_dir(input_hash); + let task_work_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_WORK_SUBDIR]); + fs::ensure_removed(task_work_dir).context("Failed to remove overlayfs workdir")?; + + Ok(()) +} + +fn get_archive_filename(task: &Task, hash: &ArchiveHash) -> String { + if let Some(pinned_name) = task.pins.get(hash) { + paths::pinned_archive_filename(pinned_name) + } else { + paths::archive_filename(hash) + } +} + +fn unpack_dependency(task: &Task, hash: &ArchiveHash) -> Result<()> { + let _lock = unix::lock(paths::depend_lock_filename(hash), true, true); + + let filename = get_archive_filename(task, hash); + + let dest = paths::depend_dir(hash); + if Path::new(&dest).is_dir() { + return Ok(()); + } + + (|| -> Result<()> { + let tmp_dest = paths::depend_tmp_dir(hash); + fs::ensure_removed(&tmp_dest)?; + + let file = fs::open(&filename)?; + let hasher = ArchiveHasher::new(); + let buffered_hasher = BufWriter::with_capacity(16 * 1024 * 1024, hasher); + let mut reader = TeeReader::new(file, buffered_hasher, false); + + tar::unpack(&mut reader, &tmp_dest)?; + + // Read file to end to get the correct hash + io::copy(&mut reader, &mut io::sink())?; + + let (_, buffered_hasher) = reader.into_inner(); + let hasher = buffered_hasher.into_inner()?; + + let actual_hash = ArchiveHash(StringHash(hasher.finalize().into())); + + if &actual_hash != hash { + return Err(Error::new(format!( + "Incorrect file hash for {:?} (expected: {}, actual: {})", + filename, hash, actual_hash, + ))); + } + + fs::rename(&tmp_dest, &dest)?; + + Ok(()) + })() + .with_context(|| format!("Failed to unpack {:?}", filename)) +} + +fn unpack_dependencies(input_hash: &InputHash, task: &Task) -> Result { + let task_tmp_dir = paths::task_tmp_dir(input_hash); + + unpack_dependency(task, &task.rootfs)?; + + let mut ret = DependMap::new(); + + for dep in &task.depends { + match dep { + Dependency::Fetch { + name, target_dir, .. + } => { + let path = paths::join(&[&task_tmp_dir, target_dir]); + fs::mkdir(&path)?; + fs::copy( + paths::join(&[paths::DOWNLOADS_DIR, name]), + paths::join(&[&path, name]), + )?; + } + Dependency::Task { output, path } => { + unpack_dependency(task, output)?; + ret.entry(path.clone()).or_default().push(*output); + } + } + } + + Ok(ret) +} + +fn collect_output(input_hash: &InputHash, task: &Task, path: &str) -> Result> { + let source = paths::join(&[&paths::task_tmp_dir(input_hash), path]); + if !Path::new(&source).is_dir() { + return Ok(None); + } + + let filename = paths::archive_tmp_filename(input_hash); + + let hash = (|| -> Result { + let file = fs::create(&filename)?; + let hasher = ArchiveHasher::new(); + let writer = TeeWriter::new(file, hasher); + let mut buffered_writer = BufWriter::with_capacity(16 * 1024 * 1024, writer); + + super::tar::pack(&task.rootfs, &mut buffered_writer, &source)?; + + let writer = buffered_writer.into_inner()?; + let (file, hasher) = writer.into_inner(); + file.sync_all()?; + drop(file); + + Ok(ArchiveHash(StringHash(hasher.finalize().into()))) + })() + .with_context(|| format!("Failed to pack {:?} to {:?}", source, filename))?; + + fs::rename(filename, paths::archive_filename(&hash))?; + + Ok(Some(hash)) +} + +fn collect_outputs(input_hash: &InputHash, task: &Task) -> Result> { + let mut ret = HashMap::new(); + + for (name, path) in &task.outputs { + if let Some(hash) = collect_output(input_hash, task, path)? { + ret.insert(name.clone(), hash); + } + } + + Ok(ret) +} + +fn run_task(input_hash: &InputHash, task: &Task, jobserver: &mut Jobserver) -> Result<()> { + let _workdir_mount = init_task(input_hash, task).context("Failed to initialize task")?; + let _rootfs_mounts = + init_task_rootfs(input_hash, task).context("Failed to initialize task rootfs")?; + + let task_tmp_dir = paths::task_tmp_dir(input_hash); + let rootfs = paths::join(&[&task_tmp_dir, paths::TASK_TMP_ROOTFS_SUBDIR]); + + let builddir_source = paths::join(&[&task_tmp_dir, paths::TASK_BUILDDIR]); + let builddir_target = paths::join(&[&rootfs, paths::TASK_BUILDDIR]); + + let log_filename = paths::task_log_filename(input_hash); + + let mut exec_cmd = || -> Result<()> { + let log = fs::create(&log_filename)?; + + let log_stdout = log + .try_clone() + .context("Failed to duplicate log file descriptor")?; + let log_stderr = log + .try_clone() + .context("Failed to duplicate log file descriptor")?; + + mount::mount::<_, _, str, str>( + Some(paths::DEV_DIR), + paths::join(&[&rootfs, "dev"]).as_str(), + None, + MsFlags::MS_BIND | MsFlags::MS_REC, + None, + ) + .expect("Failed to bind mount /dev directory"); + mount::mount::<_, _, str, str>( + Some(builddir_source.as_str()), + builddir_target.as_str(), + None, + MsFlags::MS_BIND | MsFlags::MS_REC, + None, + ) + .expect("Failed to bind mount build directory"); + + ns::pivot_root(&rootfs); + mount::mount::( + None, + "/", + None, + MsFlags::MS_PRIVATE | MsFlags::MS_REC, + None, + ) + .context("Failed to set MS_PRIVATE for container root")?; + ns::container_mounts().context("Failed to set up container mounts")?; + + unistd::sethostname("rebel-builder").context("Failed to set hostname")?; + + prctl::set_no_new_privs().context("set_no_new_privs()")?; + + unshare(CloneFlags::CLONE_NEWUSER | CloneFlags::CLONE_NEWNS) + .context("Failed to create user namespace")?; + ns::setup_userns(BUILD_UID, BUILD_GID, Uid::from_raw(0), Gid::from_raw(0)); + + jobserver + .set_cloexec(false) + .context("Failed to unset O_CLOEXEC on jobserver pipe")?; + + let err = Command::new("sh") + .args(["-ex", &paths::join(&[paths::TASK_TASKDIR, paths::TASK_RUN])]) + .stdin(Stdio::null()) + .stdout(log_stdout) + .stderr(log_stderr) + .env_clear() + .env("PATH", "/usr/sbin:/usr/bin:/sbin:/bin") + .env("HOME", "/build") + .env("MAKEFLAGS", jobserver.to_makeflags()) + .exec(); + eprintln!("{}", err); + process::exit(127); + }; + + let pid = unsafe { + ns::spawn( + CloneFlags::CLONE_NEWNS + | CloneFlags::CLONE_NEWPID + | CloneFlags::CLONE_NEWIPC + | CloneFlags::CLONE_NEWNET + | CloneFlags::CLONE_NEWUTS, + || exec_cmd().unwrap(), + ) + } + .context("Failed to run task container")?; + + let status = wait::waitpid(pid, None)?; + + if let Err(err) = status.check() { + return Err(Error::new(format!( + "Task failed: {}\nOutput: {}", + err, log_filename + ))); + } + + Ok(()) +} + +fn hash_layer(input_hash: &InputHash, task: &Task) -> Result> { + let task_state_dir = paths::task_state_dir(input_hash); + let task_layer_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_LAYER_SUBDIR]); + + (|| -> Result> { + if fs::is_dir_empty(&task_layer_dir)? { + return Ok(None); + } + + let hasher = LayerHasher::new(); + let mut buffered_writer = BufWriter::with_capacity(16 * 1024 * 1024, hasher); + + tar::pack(&task.rootfs, &mut buffered_writer, &task_layer_dir)?; + + let hasher = buffered_writer.into_inner()?; + Ok(Some(LayerHash(StringHash(hasher.finalize().into())))) + })() + .with_context(|| format!("Failed to hash layer directory {:?}", task_layer_dir)) +} + +fn move_layer(input_hash: &InputHash, hash: &Option) -> Result<()> { + let task_state_dir = paths::task_state_dir(input_hash); + let task_layer_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_LAYER_SUBDIR]); + + if let Some(hash) = hash { + let layer_dir = paths::layer_dir(hash); + + let err = match std::fs::rename(&task_layer_dir, &layer_dir) { + Ok(_) => return Ok(()), + Err(err) => err, + }; + + if !matches!( + err.raw_os_error(), + Some(libc::EEXIST) | Some(libc::ENOTEMPTY) + ) { + return Err(err).with_context(|| { + format!("Failed to rename {:?} to {:?}", task_layer_dir, layer_dir) + }); + } + } + + fs::ensure_removed(&task_layer_dir) +} + +fn run_and_hash_task( + input_hash: &InputHash, + task: &Task, + jobserver: &mut Jobserver, +) -> Result { + run_task(input_hash, task, jobserver)?; + + let outputs = collect_outputs(input_hash, task)?; + + let layer = hash_layer(input_hash, task)?; + move_layer(input_hash, &layer)?; + + Ok(TaskOutput { + input_hash: Some(*input_hash), + layer, + outputs, + }) +} + +fn load_cached(input_hash: &InputHash) -> Result { + let filename = paths::task_cache_filename(input_hash); + let file = fs::open(&filename)?; + + serde_json::from_reader(file) + .with_context(|| format!("Failed to read task cache data from {}", filename)) +} + +fn save_cached(input_hash: &InputHash, output: &TaskOutput) -> Result<()> { + fs::mkdir(paths::task_state_dir(input_hash))?; + + let tmp_filename = paths::task_cache_tmp_filename(input_hash); + let filename = paths::task_cache_filename(input_hash); + + cjson::to_file(&tmp_filename, output) + .with_context(|| format!("Failed to write task cache data to {}", tmp_filename))?; + + fs::rename(tmp_filename, filename)?; + + Ok(()) +} + +trait AsSecsF32 { + fn as_secs_f32(&self) -> f32; +} + +impl AsSecsF32 for TimeVal { + fn as_secs_f32(&self) -> f32 { + self.tv_sec() as f32 + 1e-6 * (self.tv_usec() as f32) + } +} + +fn get_usage(total: f32) -> String { + let usage = resource::getrusage(resource::UsageWho::RUSAGE_CHILDREN).expect("getrusage()"); + + let user = usage.user_time().as_secs_f32(); + let system = usage.system_time().as_secs_f32(); + let cpu = (100.0 * (user + system) / total).round(); + + format!("{user:.2}s user {system:.2}s system {cpu:.0}% cpu") +} + +pub fn handle(task: Task, jobserver: &mut Jobserver) -> Result { + let input_hash = input_hash(&task); + + let _lock = unix::lock(paths::task_lock_filename(&input_hash), true, true) + .context("Failed to get task lock")?; + + let cached_output = load_cached(&input_hash); + if !task.force_run { + if let Ok(task_output) = cached_output { + return Ok(task_output); + } + } + + let token = jobserver.wait(); + + let start_time = Instant::now(); + println!("Starting task {} ({})", task.label, input_hash); + + let task_ret = run_and_hash_task(&input_hash, &task, jobserver); + let cleanup_ret = cleanup_task(&input_hash); + + jobserver.post(token); + + let task_output = task_ret?; + cleanup_ret.context("Failed to clean up after task")?; + + save_cached(&input_hash, &task_output)?; + + let duration = Instant::now().duration_since(start_time).as_secs_f32(); + let usage = get_usage(duration); + println!( + "Finished task {} ({}) in {:.2}s ({})", + task.label, input_hash, duration, usage, + ); + + if let Ok(cached_output) = cached_output { + if cached_output.outputs != task_output.outputs { + println!( + "WARNING: Output hashes for task {} do not match cached result\n Cached output: {}\n New output: {}", + task.label, + cjson::to_string(&cached_output.outputs).unwrap(), + cjson::to_string(&task_output.outputs).unwrap(), + ); + } + } + + Ok(task_output) +} diff --git a/crates/rebel-runner/src/util/checkable.rs b/crates/rebel-runner/src/util/checkable.rs new file mode 100644 index 0000000..8528d29 --- /dev/null +++ b/crates/rebel-runner/src/util/checkable.rs @@ -0,0 +1,37 @@ +use std::{ + io::{Error, ErrorKind, Result}, + process::ExitStatus, +}; + +use nix::sys::wait; + +pub trait Checkable { + fn check(&self) -> Result<()>; +} + +impl Checkable for ExitStatus { + fn check(&self) -> Result<()> { + if self.success() { + Ok(()) + } else { + Err(Error::new( + ErrorKind::Other, + format!("Process exited with {}", self), + )) + } + } +} + +impl Checkable for wait::WaitStatus { + fn check(&self) -> Result<()> { + let message = match self { + wait::WaitStatus::Exited(_, 0) => return Ok(()), + wait::WaitStatus::Exited(_, code) => format!("Process exited with exit code: {}", code), + wait::WaitStatus::Signaled(_, signal, _) => { + format!("Process exited with signal: {}", signal) + } + _ => format!("Process in unexpected status: {:?}", self), + }; + Err(Error::new(ErrorKind::Other, message)) + } +} diff --git a/crates/rebel-runner/src/util/cjson.rs b/crates/rebel-runner/src/util/cjson.rs new file mode 100644 index 0000000..e3840ce --- /dev/null +++ b/crates/rebel-runner/src/util/cjson.rs @@ -0,0 +1,37 @@ +use std::{ + fs::File, + io::{self, Write}, + path::Path, +}; + +use digest::{self, Digest}; +use olpc_cjson::CanonicalFormatter; +use serde::Serialize; +use serde_json::error::Result; + +pub fn new_serializer(writer: W) -> serde_json::Serializer { + serde_json::Serializer::with_formatter(writer, CanonicalFormatter::new()) +} + +pub fn to_writer(writer: W, value: &T) -> Result<()> { + let mut ser = new_serializer(writer); + value.serialize(&mut ser) +} + +pub fn to_file, T: ?Sized + Serialize>(path: P, value: &T) -> io::Result<()> { + let file = File::create(path)?; + to_writer(&file, value)?; + file.sync_all() +} + +pub fn to_string(value: &T) -> Result { + let mut ret = Vec::new(); + to_writer(&mut ret, value)?; + Ok(String::from_utf8(ret).unwrap()) +} + +pub fn digest(value: &T) -> Result> { + let mut digest = ::new(); + to_writer(&mut digest, value)?; + Ok(digest.finalize()) +} diff --git a/crates/rebel-runner/src/util/clone.rs b/crates/rebel-runner/src/util/clone.rs new file mode 100644 index 0000000..51a31c3 --- /dev/null +++ b/crates/rebel-runner/src/util/clone.rs @@ -0,0 +1,59 @@ +use std::{mem, process}; + +use nix::{ + errno, sched, + unistd::{self, Pid}, +}; + +#[repr(C)] +#[derive(Debug, Default)] +struct CloneArgs { + flags: u64, + pidfd: u64, + child_tid: u64, + parent_tid: u64, + exit_signal: u64, + stack: u64, + stack_size: u64, + tls: u64, +} + +pub unsafe fn clone(flags: sched::CloneFlags) -> nix::Result { + let mut args = CloneArgs { + flags: flags.bits() as u64, + exit_signal: libc::SIGCHLD as u64, + ..CloneArgs::default() + }; + let size = mem::size_of_val(&args) as libc::size_t; + + let pid = libc::syscall(libc::SYS_clone3, &mut args, size); + + #[allow(clippy::comparison_chain)] + if pid < 0 { + Err(errno::Errno::last()) + } else if pid == 0 { + Ok(unistd::ForkResult::Child) + } else { + Ok(unistd::ForkResult::Parent { + child: Pid::from_raw(pid as libc::pid_t), + }) + } +} + +pub unsafe fn spawn(flags: Option, f: F) -> nix::Result +where + F: FnOnce(), +{ + let res = if let Some(flags) = flags { + clone(flags) + } else { + unistd::fork() + }; + match res? { + unistd::ForkResult::Parent { child } => Ok(child), + unistd::ForkResult::Child => { + f(); + process::exit(0) + } + } +} diff --git a/crates/rebel-runner/src/util/fs.rs b/crates/rebel-runner/src/util/fs.rs new file mode 100644 index 0000000..5efd159 --- /dev/null +++ b/crates/rebel-runner/src/util/fs.rs @@ -0,0 +1,127 @@ +use std::{ + fs::{self, File}, + io, + os::unix::prelude::*, + path::{Path, PathBuf}, +}; + +use nix::{ + fcntl::OFlag, + mount::{self, MsFlags}, + unistd, +}; + +use common::error::*; + +pub fn open>(path: P) -> Result { + fs::File::open(path.as_ref()) + .with_context(|| format!("Failed to open file {:?} for reading", path.as_ref())) +} + +pub fn create>(path: P) -> Result { + fs::File::create(path.as_ref()) + .with_context(|| format!("Failed to open file {:?} for writing", path.as_ref())) +} + +pub fn rename, P2: AsRef>(from: P1, to: P2) -> Result<()> { + fs::rename(from.as_ref(), to.as_ref()) + .with_context(|| format!("Failed to rename {:?} to {:?}", from.as_ref(), to.as_ref())) +} + +// Unlike fs::copy, this doesn't preserve file flags +pub fn copy, P2: AsRef>(from: P1, to: P2) -> Result<()> { + (|| -> Result<()> { + let mut src = open(from.as_ref())?; + let mut dest = create(to.as_ref())?; + io::copy(&mut src, &mut dest)?; + dest.sync_all()?; + Ok(()) + })() + .with_context(|| format!("Failed to copy {:?} to {:?}", from.as_ref(), to.as_ref())) +} + +pub fn mkdir>(path: P) -> Result<()> { + let mut builder = fs::DirBuilder::new(); + builder.recursive(true); + builder + .create(path.as_ref()) + .with_context(|| format!("Failed to create directory {:?}", path.as_ref())) +} + +pub fn ensure_removed>(path: P) -> Result<()> { + let result = if path.as_ref().is_dir() { + fs::remove_dir_all(path.as_ref()) + } else { + fs::remove_file(path.as_ref()) + }; + result + .or_else(|err| match err.kind() { + io::ErrorKind::NotFound => Ok(()), + _ => Err(err), + }) + .with_context(|| format!("Failed to delete {:?}", path.as_ref())) +} + +pub fn is_dir_empty>(path: P) -> Result { + Ok(fs::read_dir(path)?.next().is_none()) +} + +/// Fixes up weirdness of set-group-ID or bsdgroups +pub fn fixup_permissions>(path: P) -> Result<()> { + let path = path.as_ref(); + let gid = unistd::getegid(); + + let metadata = path + .metadata() + .with_context(|| format!("Failed to get metadata of {:?}", path))?; + + if metadata.gid() != gid.as_raw() { + unistd::chown(path, None, Some(gid)) + .with_context(|| format!("Failed to set group of {:?}", path))?; + } + + let mut perms = metadata.permissions(); + let mode = perms.mode(); + if (mode & 0o777) != mode { + perms.set_mode(mode & 0o777); + std::fs::set_permissions(path, perms) + .with_context(|| format!("Failed to set mode of {:?}", path))?; + } + + Ok(()) +} + +#[must_use] +pub struct Mount(PathBuf); + +impl Drop for Mount { + fn drop(&mut self) { + mount::umount(&self.0) + .with_context(|| format!("Failed to unmount {:?}", self.0)) + .unwrap(); + } +} + +pub fn mount, P2: AsRef>( + source: P1, + target: P2, + fstype: Option<&str>, + flags: MsFlags, + data: Option<&str>, +) -> Result { + mkdir(target.as_ref()).with_context(|| format!("Failed to create {:?}", target.as_ref()))?; + + let canon_target = target + .as_ref() + .canonicalize() + .with_context(|| format!("Failed to get absolute path for {:?}", target.as_ref()))?; + mount::mount(Some(source.as_ref()), &canon_target, fstype, flags, data) + .with_context(|| format!("Failed to mount {:?}", canon_target))?; + Ok(Mount(canon_target)) +} + +pub fn pipe() -> Result<(File, File)> { + unistd::pipe2(OFlag::O_CLOEXEC) + .context("pipe2()") + .map(|(piper, pipew)| (File::from(piper), File::from(pipew))) +} diff --git a/crates/rebel-runner/src/util/mod.rs b/crates/rebel-runner/src/util/mod.rs new file mode 100644 index 0000000..0fbe3b5 --- /dev/null +++ b/crates/rebel-runner/src/util/mod.rs @@ -0,0 +1,7 @@ +pub mod checkable; +pub mod cjson; +pub mod clone; +pub mod fs; +pub mod stack; +pub mod steal; +pub mod unix; diff --git a/crates/rebel-runner/src/util/stack.rs b/crates/rebel-runner/src/util/stack.rs new file mode 100644 index 0000000..15d5daf --- /dev/null +++ b/crates/rebel-runner/src/util/stack.rs @@ -0,0 +1,25 @@ +use std::mem; + +/// Simple inefficient datastructure with guaranteed drop order +#[derive(Debug)] +pub enum Stack { + Cons(Box<(T, Stack)>), + Empty, +} + +impl Default for Stack { + fn default() -> Self { + Self::Empty + } +} + +impl Stack { + pub fn new() -> Self { + Self::Empty + } + + pub fn push(&mut self, value: T) { + let tmp = mem::take(self); + *self = Stack::Cons(Box::new((value, tmp))); + } +} diff --git a/crates/rebel-runner/src/util/steal.rs b/crates/rebel-runner/src/util/steal.rs new file mode 100644 index 0000000..91b2cdf --- /dev/null +++ b/crates/rebel-runner/src/util/steal.rs @@ -0,0 +1,40 @@ +use std::ops::{Deref, DerefMut}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +pub struct Steal(pub Option); + +impl Steal { + pub fn new(value: T) -> Steal { + Steal(Some(value)) + } + + pub fn steal(&mut self) -> T { + self.0 + .take() + .expect("Attempted to steal already stoken value") + } +} + +impl From for Steal { + fn from(value: T) -> Self { + Steal::new(value) + } +} + +impl Deref for Steal { + type Target = T; + + fn deref(&self) -> &Self::Target { + self.0 + .as_ref() + .expect("Attempted to dereference stolen value") + } +} + +impl DerefMut for Steal { + fn deref_mut(&mut self) -> &mut Self::Target { + self.0 + .as_mut() + .expect("Attempted to dereference stolen value") + } +} diff --git a/crates/rebel-runner/src/util/unix.rs b/crates/rebel-runner/src/util/unix.rs new file mode 100644 index 0000000..08884ec --- /dev/null +++ b/crates/rebel-runner/src/util/unix.rs @@ -0,0 +1,86 @@ +use std::{fs::File, os::unix::prelude::*, path::Path}; + +use nix::{ + fcntl::{self, FcntlArg, FdFlag, Flock, OFlag}, + sched, + unistd::Pid, +}; + +use common::error::*; + +use super::fs; + +pub fn set_blocking(fd: &Fd, blocking: bool) -> Result<()> { + let raw_fd = fd.as_fd().as_raw_fd(); + + let flags = + OFlag::from_bits_retain(fcntl::fcntl(raw_fd, FcntlArg::F_GETFL).context("fcntl(F_GETFL)")?); + + let new_flags = if blocking { + flags & !OFlag::O_NONBLOCK + } else { + flags | OFlag::O_NONBLOCK + }; + + if new_flags != flags { + fcntl::fcntl(raw_fd, FcntlArg::F_SETFL(new_flags)).context("fcntl(F_SETFL)")?; + } + + Ok(()) +} + +pub fn set_cloexec(fd: &Fd, cloexec: bool) -> Result<()> { + let raw_fd = fd.as_fd().as_raw_fd(); + + let flags = FdFlag::from_bits_retain( + fcntl::fcntl(raw_fd, FcntlArg::F_GETFD).context("fcntl(F_GETFD)")?, + ); + + let new_flags = if cloexec { + flags | FdFlag::FD_CLOEXEC + } else { + flags & !FdFlag::FD_CLOEXEC + }; + + if new_flags != flags { + fcntl::fcntl(raw_fd, FcntlArg::F_SETFD(new_flags)).context("fcntl(F_SETFD)")?; + } + + Ok(()) +} + +pub fn nproc() -> Result { + const MAXCPU: usize = sched::CpuSet::count(); + + let affinity = sched::sched_getaffinity(Pid::from_raw(0)).context("sched_getaffinity()")?; + + let mut count = 0; + + for cpu in 0..MAXCPU { + if affinity.is_set(cpu).unwrap() { + count += 1; + } + } + + Ok(count) +} + +pub fn lock>(path: P, exclusive: bool, blocking: bool) -> Result> { + use fcntl::FlockArg::*; + + if let Some(parent) = path.as_ref().parent() { + fs::mkdir(parent)?; + } + + let arg = match (exclusive, blocking) { + (true, true) => LockExclusive, + (true, false) => LockExclusiveNonblock, + (false, true) => LockShared, + (false, false) => LockSharedNonblock, + }; + + let file = fs::create(path.as_ref())?; + fcntl::Flock::lock(file, arg) + .map_err(|(_, errno)| errno) + .with_context(|| format!("flock failed on {:?}", path.as_ref())) +} diff --git a/crates/rebel/Cargo.toml b/crates/rebel/Cargo.toml new file mode 100644 index 0000000..7164ca6 --- /dev/null +++ b/crates/rebel/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "rebel" +version = "0.1.0" +authors = ["Matthias Schiffer "] +license = "MIT" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +common = { path = "../rebel-common", package = "rebel-common" } +runner = { path = "../rebel-runner", package = "rebel-runner" } + +clap = { version = "4.0.0", features = ["derive"] } +deb-version = "0.1.1" +enum-kinds = "0.5.1" +handlebars = "5.1.2" +indoc = "2.0.4" +lazy_static = "1.4.0" +nix = { version = "0.28.0", features = ["poll", "signal"] } +scoped-tls-hkt = "0.1.2" +serde = { version = "1", features = ["derive", "rc"] } +serde_yaml = "0.9" +walkdir = "2" +peg = "0.8.2" diff --git a/crates/rebel/src/args.rs b/crates/rebel/src/args.rs new file mode 100644 index 0000000..805646a --- /dev/null +++ b/crates/rebel/src/args.rs @@ -0,0 +1,122 @@ +use std::{ + collections::{hash_map, HashMap}, + hash, + rc::Rc, +}; + +use enum_kinds::EnumKind; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Serialize, PartialEq, Eq)] +pub struct Platform { + #[serde(skip)] + pub short: String, + pub gnu_triplet: String, + pub karch: String, + pub prefix: String, +} + +#[derive(Debug, Serialize, PartialEq, Eq)] +pub struct PlatformRelation { + pub is_same: bool, + pub sysroot: String, + pub cross_compile: String, +} + +#[derive(Clone, Debug, Serialize, PartialEq, Eq, EnumKind)] +#[serde(untagged)] +#[enum_kind(ArgType, derive(Deserialize), serde(rename_all = "snake_case"))] +pub enum Arg { + String(Rc), + Platform(Rc), + PlatformRelation(Rc), +} + +impl From<&Arg> for Arg { + fn from(value: &Arg) -> Self { + value.clone() + } +} + +impl From for Arg { + fn from(value: String) -> Self { + Arg::String(Rc::new(value)) + } +} + +impl From for Arg { + fn from(value: Platform) -> Self { + Arg::Platform(Rc::new(value)) + } +} + +impl From for Arg { + fn from(value: PlatformRelation) -> Self { + Arg::PlatformRelation(Rc::new(value)) + } +} + +#[derive(Clone, Debug, Serialize, PartialEq, Eq, Default)] +pub struct TaskArgs(HashMap); + +impl TaskArgs { + pub fn contains_key(&self, key: &str) -> bool { + self.0.contains_key(key) + } + + pub fn get(&self, key: &str) -> Option<&Arg> { + self.0.get(key) + } + + pub fn set(&mut self, key: &str, value: Option) + where + T: Into, + { + if let Some(v) = value { + self.0.insert(key.to_string(), v.into()); + } else { + self.0.remove(key); + } + } + + pub fn iter(&self) -> hash_map::Iter { + self.into_iter() + } +} + +impl FromIterator<(String, Arg)> for TaskArgs { + fn from_iter>(iter: T) -> Self { + TaskArgs(HashMap::from_iter(iter)) + } +} + +impl<'a> IntoIterator for &'a TaskArgs { + type Item = (&'a String, &'a Arg); + + type IntoIter = hash_map::Iter<'a, String, Arg>; + + fn into_iter(self) -> Self::IntoIter { + self.0.iter() + } +} + +#[allow(clippy::derived_hash_with_manual_eq)] +impl hash::Hash for TaskArgs { + fn hash(&self, _state: &mut H) { + // Don't do anything: Properly hashing the task args is likely to cost + // much more performance than the hash collisions caused by TaskRefs + // that only differ by the args + } +} + +pub fn arg>(key: &str, value: A) -> (String, Arg) { + (key.to_string(), value.into()) +} + +#[derive(Clone, Debug, Deserialize, Default, PartialEq, Eq)] +pub struct ArgMapping(pub HashMap); + +#[allow(clippy::derived_hash_with_manual_eq)] +impl hash::Hash for ArgMapping { + fn hash(&self, _state: &mut H) {} +} diff --git a/crates/rebel/src/context.rs b/crates/rebel/src/context.rs new file mode 100644 index 0000000..be98813 --- /dev/null +++ b/crates/rebel/src/context.rs @@ -0,0 +1,533 @@ +use std::{ + borrow::Cow, + cmp::Ordering, + collections::{HashMap, HashSet}, + fmt::Display, + hash::Hash, + ops::Index, + rc::Rc, + result, +}; + +use common::{ + error::{self, Contextualizable}, + string_hash::ArchiveHash, + types::TaskID, +}; + +use crate::{ + args::*, + parse::{self, TaskFlags}, + paths, + pin::{self, Pins}, + task::*, +}; + +#[derive(Debug, Clone, Copy)] +pub enum ErrorKind<'a> { + TaskNotFound, + InvalidArgument(&'a str), + InvalidArgRef(&'a str), +} + +#[derive(Debug, Clone, Copy)] +pub struct Error<'a> { + pub task: &'a TaskID, + pub kind: ErrorKind<'a>, +} + +impl<'a> Display for Error<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let Error { task, kind } = self; + match kind { + ErrorKind::TaskNotFound => write!(f, "Task '{}' not found", task), + ErrorKind::InvalidArgument(arg) => write!( + f, + "Invalid or missing argument '{}' for task '{}'", + arg, task + ), + ErrorKind::InvalidArgRef(arg) => write!( + f, + "Invalid reference for argument '{}' of task '{}'", + arg, task + ), + } + } +} + +impl<'a> From> for error::Error { + fn from(err: Error) -> Self { + error::Error::new(err) + } +} + +pub type Result<'a, T> = result::Result>; + +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub struct TaskRef<'ctx> { + pub id: &'ctx TaskID, + pub args: Rc, +} + +impl<'ctx> Display for TaskRef<'ctx> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if !f.alternate() { + return self.id.fmt(f); + } + + let version_arg = match self.args.get("version") { + Some(Arg::String(s)) => Some(s), + _ => None, + }; + let host_arg = match self.args.get("host") { + Some(Arg::Platform(platform)) => Some(platform), + _ => None, + }; + let target_arg = match self.args.get("target") { + Some(Arg::Platform(platform)) => Some(platform), + _ => None, + }; + + write!(f, "{}", self.id.recipe)?; + if let Some(version) = version_arg { + write!(f, "#{}", version)?; + } + write!(f, "::{}", self.id.task)?; + + if host_arg.is_some() || target_arg.is_some() { + write!(f, "@")?; + } + + if let Some(host) = host_arg { + write!(f, "{}", host.short)?; + } + if let Some(target) = target_arg { + write!(f, ":{}", target.short)?; + } + Ok(()) + } +} + +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub struct OutputRef<'ctx> { + pub task: TaskRef<'ctx>, + pub output: &'ctx str, +} + +fn platform_relation(args: &TaskArgs, from: &str, to: &str) -> Option { + let plat_from = match args.get(from)? { + Arg::Platform(plat) => plat, + _ => return None, + }; + let plat_to = match args.get(to)? { + Arg::Platform(plat) => plat, + _ => return None, + }; + + let plat_rel = if plat_from == plat_to { + PlatformRelation { + is_same: true, + sysroot: "".to_string(), + cross_compile: "".to_string(), + } + } else { + PlatformRelation { + is_same: false, + sysroot: paths::TASK_SYSROOT.to_string(), + cross_compile: format!("{}/bin/{}-", plat_from.prefix, plat_to.gnu_triplet), + } + }; + Some(plat_rel) +} + +#[derive(Debug)] +pub struct Context { + platforms: HashMap, + globals: TaskArgs, + tasks: HashMap>, + rootfs: (ArchiveHash, String), +} + +impl Context { + pub fn new(mut tasks: HashMap>, pins: Pins) -> error::Result { + let platforms: HashMap<_, _> = [ + arg( + "build", + Platform { + short: "build".to_string(), + gnu_triplet: "x86_64-linux-gnu".to_string(), + karch: "x86_64".to_string(), + prefix: "/opt/toolchain".to_string(), + }, + ), + arg( + "aarch64", + Platform { + short: "aarch64".to_string(), + gnu_triplet: "aarch64-linux-gnu".to_string(), + karch: "arm64".to_string(), + prefix: "/usr".to_string(), + }, + ), + ] + .into_iter() + .collect(); + + let globals = TaskArgs::from_iter([ + ("build".to_string(), platforms["build"].clone()), + arg("workdir", paths::TASK_WORKDIR.to_string()), + arg("dldir", paths::TASK_DLDIR.to_string()), + arg("destdir", paths::TASK_DESTDIR.to_string()), + arg("sysroot", paths::TASK_SYSROOT.to_string()), + ]); + let (rootfs, rootfs_provides) = + Context::handle_pins(pins).context("Failed to process pin list")?; + + Context::add_rootfs_tasks(&mut tasks, rootfs_provides, &globals) + .context("Failed to determine rootfs-provided tasks from pin list")?; + + Ok(Context { + platforms, + globals, + tasks, + rootfs, + }) + } + + fn handle_pins(pins: Pins) -> error::Result<((ArchiveHash, String), Vec)> { + let mut ret = None; + + for (name, pin) in pins { + if pin.is_rootfs { + if ret.is_some() { + return Err(error::Error::new("Multiple is-rootfs pins")); + } + let hash = pin.hash.context("is-rootfs pin without hash")?; + + ret = Some(((hash, name), pin.provides)); + } + } + + ret.context("No is-rootfs pins") + } + + fn add_rootfs_tasks( + tasks: &mut HashMap>, + provides: Vec, + globals: &TaskArgs, + ) -> error::Result<()> { + let build = globals.get("build").unwrap(); + + for pin::Provides { + recipe, + task, + output, + args, + } in provides + { + let mut task_def = TaskDef::default(); + + if let Some(host) = args.host { + if host != "build" { + return Err(error::Error::new(format!("Invalid host value '{}'", host))); + } + task_def.args.insert("host".to_string(), build.into()); + task_def.arg_match.set("host", Some(build)); + } + + if let Some(target) = args.target { + if target != "build" { + return Err(error::Error::new(format!( + "Invalid target value '{}'", + target + ))); + } + task_def.args.insert("target".to_string(), build.into()); + task_def.arg_match.set("target", Some(build)); + } + + for output_entry in output { + task_def + .output + .insert(output_entry.to_string(), Output::default()); + } + + task_def.priority = i32::MAX; + + tasks + .entry(TaskID { + recipe: recipe.to_string(), + task: task.to_string(), + }) + .or_default() + .push(task_def); + } + + Ok(()) + } + + pub fn get_rootfs(&self) -> &(ArchiveHash, String) { + &self.rootfs + } + + fn match_task(task: &TaskDef, args: &TaskArgs) -> bool { + task.arg_match + .iter() + .all(|(key, value)| args.get(key) == Some(value)) + } + + fn compare_tasks(task1: &TaskDef, task2: &TaskDef) -> Ordering { + task1 + .priority + .cmp(&task2.priority) + .then(deb_version::compare_versions( + task1.meta.version.as_deref().unwrap_or_default(), + task2.meta.version.as_deref().unwrap_or_default(), + )) + } + + fn select_task<'ctx>(tasks: &'ctx [TaskDef], args: &TaskArgs) -> Option<&'ctx TaskDef> { + tasks + .iter() + .filter(|task| Self::match_task(task, args)) + .max_by(|task1, task2| Self::compare_tasks(task1, task2)) + } + + fn get_with_args<'a>(&self, id: &'a TaskID, args: &TaskArgs) -> Result<'a, &TaskDef> { + self.tasks + .get(id) + .and_then(|tasks| Self::select_task(tasks, args)) + .ok_or(Error { + task: id, + kind: ErrorKind::TaskNotFound, + }) + } + + pub fn get<'a>(&self, task: &TaskRef<'a>) -> Result<'a, &TaskDef> { + self.get_with_args(task.id, task.args.as_ref()) + } + + fn task_ref<'ctx>(&'ctx self, id: &'ctx TaskID, args: &TaskArgs) -> Result { + let task_def = self.get_with_args(id, args)?; + + let mut arg_def: HashMap<_, _> = task_def.args.iter().map(|(k, &v)| (k, v)).collect(); + for (key, arg) in &self.globals { + // TODO: Handle conflicts between explicit args and globals + arg_def.insert(key, ArgType::from(arg)); + } + + let mut new_args = TaskArgs::default(); + + for (key, typ) in arg_def { + if let Some(arg) = args.get(key) { + if ArgType::from(arg) == typ { + new_args.set(key, Some(arg)); + continue; + } + } + return Err(Error { + task: id, + kind: ErrorKind::InvalidArgument(key), + }); + } + + let build_to_host = platform_relation(&new_args, "build", "host"); + let host_to_target = platform_relation(&new_args, "host", "target"); + let build_to_target = platform_relation(&new_args, "build", "target"); + + let cross_compile = build_to_host + .as_ref() + .map(|build_to_host| build_to_host.cross_compile.clone()); + + new_args.set("build_to_host", build_to_host); + new_args.set("host_to_target", host_to_target); + new_args.set("build_to_target", build_to_target); + + new_args.set("cross_compile", cross_compile); + + new_args.set("basename", Some(task_def.meta.basename.clone())); + new_args.set("recipename", Some(task_def.meta.recipename.clone())); + new_args.set("recipe", Some(task_def.meta.recipe.clone())); + new_args.set("name", Some(task_def.meta.name.clone())); + new_args.set("version", task_def.meta.version.clone()); + + Ok(TaskRef { + id, + args: Rc::new(new_args), + }) + } + + pub fn parse(&self, s: &str) -> error::Result<(TaskRef, TaskFlags)> { + let (parsed, flags) = parse::task_with_flags(s) + .ok() + .context("Invalid task syntax")?; + + let recipe = parsed.id.recipe.to_string(); + let task = parsed.id.task.to_string(); + + let id = TaskID { recipe, task }; + let (ctx_id, _) = self + .tasks + .get_key_value(&id) + .with_context(|| format!("Task {} not found", id))?; + + let mut args = self.globals.clone(); + + if let Some(host) = parsed.args.host { + let plat = self + .platforms + .get(host) + .with_context(|| format!("Platform '{}' not found", host))?; + args.set("host", Some(plat)); + args.set("target", Some(plat)); + } + if let Some(target) = parsed.args.target { + let plat = self + .platforms + .get(target) + .with_context(|| format!("Platform '{}' not found", target))?; + args.set("target", Some(plat)); + } + + let task_ref = self + .task_ref(ctx_id, &args) + .with_context(|| format!("Failed to instantiate task {}", id))?; + + Ok((task_ref, flags)) + } + + fn map_args<'ctx, 'args>( + task: &'ctx TaskID, + mapping: &'ctx ArgMapping, + args: &'args TaskArgs, + build_dep: bool, + ) -> Result<'ctx, Cow<'args, TaskArgs>> { + if mapping.0.is_empty() && !build_dep { + return Ok(Cow::Borrowed(args)); + } + + let mut ret = args.clone(); + + if build_dep { + ret.set("host", args.get("build")); + ret.set("target", args.get("host")); + } + + for (to, from) in &mapping.0 { + let value = args.get(from).ok_or(Error { + task, + kind: ErrorKind::InvalidArgRef(to), + })?; + ret.set(to, Some(value.clone())); + } + + Ok(Cow::Owned(ret)) + } + + fn parent_ref<'ctx>(&'ctx self, dep: &'ctx ParentDep, args: &TaskArgs) -> Result { + let mapped_args = Context::map_args(&dep.dep.id, &dep.dep.args, args, false)?; + self.task_ref(&dep.dep.id, mapped_args.as_ref()) + } + + pub fn output_ref<'ctx>( + &'ctx self, + dep: &'ctx OutputDep, + args: &TaskArgs, + build_dep: bool, + ) -> Result> { + let mapped_args = Context::map_args(&dep.dep.id, &dep.dep.args, args, build_dep)?; + Ok(OutputRef { + task: self.task_ref(&dep.dep.id, mapped_args.as_ref())?, + output: &dep.output, + }) + } + + pub fn get_parent_depend<'ctx>( + &'ctx self, + task_ref: &TaskRef<'ctx>, + ) -> Result> { + let task = self.get(task_ref)?; + let Some(parent) = &task.parent else { + return Ok(None); + }; + Some(self.parent_ref(parent, &task_ref.args)).transpose() + } + + fn ancestor_iter<'ctx>( + &'ctx self, + task_ref: &TaskRef<'ctx>, + ) -> impl Iterator> { + struct Iter<'ctx>(&'ctx Context, Option>>); + + impl<'ctx> Iterator for Iter<'ctx> { + type Item = Result<'ctx, TaskRef<'ctx>>; + + fn next(&mut self) -> Option { + let task_ref = match self.1.take()? { + Ok(task_ref) => task_ref, + Err(err) => return Some(Err(err)), + }; + self.1 = self.0.get_parent_depend(&task_ref).transpose(); + Some(Ok(task_ref)) + } + } + + Iter(self, Some(Ok(task_ref.clone()))) + } + + pub fn get_build_depends<'ctx>( + &'ctx self, + task_ref: &TaskRef<'ctx>, + ) -> Result> { + let mut ret = HashSet::new(); + let mut allow_noinherit = true; + + for current in self.ancestor_iter(task_ref) { + let current_ref = current?; + let task = self.get(¤t_ref)?; + let entries = task + .build_depends + .iter() + .filter(|dep| allow_noinherit || !dep.noinherit) + .map(|dep| self.output_ref(dep, ¤t_ref.args, true)) + .collect::>>()?; + ret.extend(entries); + + allow_noinherit = false; + } + + Ok(ret) + } + + pub fn get_host_depends<'ctx>( + &'ctx self, + task_ref: &TaskRef<'ctx>, + ) -> Result> { + let mut ret = HashSet::new(); + let mut allow_noinherit = true; + + for current in self.ancestor_iter(task_ref) { + let current_ref = current?; + let task = self.get(¤t_ref)?; + let entries = task + .depends + .iter() + .filter(|dep| allow_noinherit || !dep.noinherit) + .map(|dep| self.output_ref(dep, ¤t_ref.args, false)) + .collect::>>()?; + ret.extend(entries); + + allow_noinherit = false; + } + + Ok(ret) + } +} + +impl Index<&TaskRef<'_>> for Context { + type Output = TaskDef; + + fn index(&self, index: &TaskRef) -> &TaskDef { + self.get(index).expect("Invalid TaskRef") + } +} diff --git a/crates/rebel/src/driver.rs b/crates/rebel/src/driver.rs new file mode 100644 index 0000000..b2655c6 --- /dev/null +++ b/crates/rebel/src/driver.rs @@ -0,0 +1,480 @@ +use std::{ + collections::{HashMap, HashSet}, + iter, + os::unix::{net::UnixStream, prelude::*}, +}; + +use indoc::indoc; +use nix::{ + poll, + sys::{ + signal, + signalfd::{SfdFlags, SignalFd}, + }, +}; + +use common::{error::*, string_hash::*, types::*}; +use runner::Runner; + +use crate::{ + context::{Context, OutputRef, TaskRef}, + paths, resolve, + task::*, + template, +}; + +#[derive(Debug)] +pub struct CompletionState<'ctx> { + ctx: &'ctx Context, + tasks_done: HashMap, TaskOutput>, +} + +impl<'ctx> CompletionState<'ctx> { + pub fn new(ctx: &'ctx Context) -> Self { + CompletionState { + ctx, + tasks_done: Default::default(), + } + } + + // Treats both "depends" and "parent" as dependencies + fn deps_satisfied(&self, task_ref: &TaskRef) -> bool { + resolve::get_dependent_tasks(self.ctx, task_ref) + .map_err(|_| Error::new(format!("invalid dependency for {}", task_ref))) + .unwrap() + .into_iter() + .all(|dep| self.tasks_done.contains_key(&dep)) + } + + fn fetch_deps(&self, task: &TaskRef<'ctx>) -> Result> { + let task_def = &self.ctx[task]; + task_def + .fetch + .iter() + .map(|Fetch { name, sha256 }| { + Ok(Dependency::Fetch { + name: template::ENGINE.eval(name, &task.args).with_context(|| { + format!("Failed to evaluate fetch filename for task {}", task) + })?, + target_dir: paths::TASK_DLDIR.to_string(), + sha256: *sha256, + }) + }) + .collect() + } + + fn dep_closure(&self, deps: I, path: &'ctx str) -> impl Iterator + '_ + where + I: IntoIterator>, + { + resolve::runtime_depends(self.ctx, deps) + .expect("invalid runtime depends") + .into_iter() + .filter_map(|dep| self.tasks_done[&dep.task].outputs.get(dep.output)) + .map(|&output| Dependency::Task { + output, + path: path.to_string(), + }) + } + + fn build_deps(&self, task: &TaskRef<'ctx>) -> Result + '_> { + Ok(self.dep_closure( + self.ctx + .get_build_depends(task) + .with_context(|| format!("invalid build depends for {}", task))?, + "", + )) + } + + fn host_deps(&self, task: &TaskRef<'ctx>) -> Result + '_> { + Ok(self.dep_closure( + self.ctx + .get_host_depends(task) + .with_context(|| format!("invalid depends for {}", task))?, + paths::TASK_SYSROOT, + )) + } + + fn task_deps(&self, task: &TaskRef<'ctx>) -> Result> { + let fetch_deps = self.fetch_deps(task)?.into_iter(); + let build_deps = self.build_deps(task)?; + let host_deps = self.host_deps(task)?; + + Ok(fetch_deps.chain(build_deps).chain(host_deps).collect()) + } + + fn task_ancestors(&self, task_ref: &TaskRef<'ctx>) -> Vec { + let Some(parent) = self + .ctx + .get_parent_depend(task_ref) + .expect("invalid parent depends") + else { + return vec![]; + }; + + let mut chain = self.task_ancestors(&parent); + if let Some(layer) = self.tasks_done[&parent].layer { + chain.push(layer); + } + chain + } + + fn print_summary(&self) { + println!(); + println!("Summary:"); + + let mut tasks: Box<[_]> = self.tasks_done.iter().collect(); + tasks.sort_by_cached_key(|(task, _)| format!("{:#}", task)); + for (task_ref, task) in tasks.iter() { + println!(); + println!("{:#}", task_ref); + if let Some(hash) = task.input_hash { + println!(" input: {}", hash); + } + if let Some(hash) = task.layer { + println!(" layer: {}", hash); + } + if !task.outputs.is_empty() { + println!(" outputs:"); + + let mut outputs: Box<[_]> = task.outputs.iter().collect(); + outputs.sort_by_key(|(output, _)| *output); + for (output, hash) in outputs.iter() { + println!(" {}: {}", output, hash); + } + } + } + } +} + +#[derive(Debug)] +enum SpawnResult { + Spawned(UnixStream), + Skipped(TaskOutput), +} + +#[derive(Debug, PartialEq, Eq, Hash)] +enum TaskWaitResult { + Failed, + Interrupted, +} + +#[derive(Debug)] +pub struct Driver<'ctx> { + rdeps: HashMap, Vec>>, + force_run: HashSet>, + tasks_blocked: HashSet>, + tasks_runnable: Vec>, + tasks_running: HashMap)>, + state: CompletionState<'ctx>, +} + +impl<'ctx> Driver<'ctx> { + pub fn new( + ctx: &'ctx Context, + taskset: HashSet>, + force_run: HashSet>, + ) -> Result { + let mut driver = Driver { + rdeps: Default::default(), + force_run, + tasks_blocked: Default::default(), + tasks_runnable: Default::default(), + tasks_running: Default::default(), + state: CompletionState::new(ctx), + }; + + for task in taskset { + let mut has_depends = false; + for dep in resolve::get_dependent_tasks(ctx, &task) + .map_err(|_| Error::new(format!("invalid dependency for {}", task)))? + { + let rdep = driver.rdeps.entry(dep.clone()).or_default(); + rdep.push(task.clone()); + has_depends = true; + } + + if has_depends { + driver.tasks_blocked.insert(task); + } else { + driver.tasks_runnable.push(task); + } + } + + Ok(driver) + } + + const PREAMBLE: &'static str = indoc! {" + export PATH={{build.prefix}}/sbin:{{build.prefix}}/bin:$PATH + cd {{workdir}} + + export SOURCE_DATE_EPOCH=1 + + export AR_FOR_BUILD=ar + export AS_FOR_BUILD=as + export DLLTOOL_FOR_BUILD=dlltool + export CC_FOR_BUILD=gcc + export CXX_FOR_BUILD=g++ + export GCC_FOR_BUILD=gcc + export GFORTRAN_FOR_BUILD=gfortran + export GOC_FOR_BUILD=goc + export LD_FOR_BUILD=ld + export LIPO_FOR_BUILD=lipo + export NM_FOR_BUILD=nm + export OBJCOPY_FOR_BUILD=objcopy + export OBJDUMP_FOR_BUILD=objdump + export RANLIB_FOR_BUILD=ranlib + export STRIP_FOR_BUILD=strip + export WINDRES_FOR_BUILD=windres + export WINDMC_FOR_BUILD=windmc + "}; + const PREAMBLE_HOST: &'static str = indoc! {" + export AR={{build_to_host.cross_compile}}ar + export AS={{build_to_host.cross_compile}}as + export DLLTOOL={{build_to_host.cross_compile}}dlltool + export CC={{build_to_host.cross_compile}}gcc + export CXX={{build_to_host.cross_compile}}g++ + export GCC={{build_to_host.cross_compile}}gcc + export GFORTRAN={{build_to_host.cross_compile}}gfortran + export GOC={{build_to_host.cross_compile}}goc + export LD={{build_to_host.cross_compile}}ld + export LIPO={{build_to_host.cross_compile}}lipo + export NM={{build_to_host.cross_compile}}nm + export OBJCOPY={{build_to_host.cross_compile}}objcopy + export OBJDUMP={{build_to_host.cross_compile}}objdump + export RANLIB={{build_to_host.cross_compile}}ranlib + export STRIP={{build_to_host.cross_compile}}strip + export WINDRES={{build_to_host.cross_compile}}windres + export WINDMC={{build_to_host.cross_compile}}windmc + "}; + const PREAMBLE_TARGET: &'static str = indoc! {" + export AR_FOR_TARGET={{build_to_target.cross_compile}}ar + export AS_FOR_TARGET={{build_to_target.cross_compile}}as + export DLLTOOL_FOR_TARGET={{build_to_target.cross_compile}}dlltool + export CC_FOR_TARGET={{build_to_target.cross_compile}}gcc + export CXX_FOR_TARGET={{build_to_target.cross_compile}}g++ + export GCC_FOR_TARGET={{build_to_target.cross_compile}}gcc + export GFORTRAN_FOR_TARGET={{build_to_target.cross_compile}}gfortran + export GOC_FOR_TARGET={{build_to_target.cross_compile}}goc + export LD_FOR_TARGET={{build_to_target.cross_compile}}ld + export LIPO_FOR_TARGET={{build_to_target.cross_compile}}lipo + export NM_FOR_TARGET={{build_to_target.cross_compile}}nm + export OBJCOPY_FOR_TARGET={{build_to_target.cross_compile}}objcopy + export OBJDUMP_FOR_TARGET={{build_to_target.cross_compile}}objdump + export RANLIB_FOR_TARGET={{build_to_target.cross_compile}}ranlib + export STRIP_FOR_TARGET={{build_to_target.cross_compile}}strip + export WINDRES_FOR_TARGET={{build_to_target.cross_compile}}windres + export WINDMC_FOR_TARGET={{build_to_target.cross_compile}}windmc + "}; + + fn task_preamble(task_ref: &TaskRef<'ctx>) -> Vec<&'static str> { + let mut ret = vec![Self::PREAMBLE]; + + if task_ref.args.contains_key("build_to_host") { + ret.push(Self::PREAMBLE_HOST); + } + if task_ref.args.contains_key("build_to_target") { + ret.push(Self::PREAMBLE_TARGET); + } + ret + } + + fn update_runnable(&mut self, task_ref: TaskRef<'ctx>, task_output: TaskOutput) { + let rdeps = self.rdeps.get(&task_ref); + + self.state.tasks_done.insert(task_ref, task_output); + + for rdep in rdeps.unwrap_or(&Vec::new()) { + if !self.tasks_blocked.contains(rdep) { + continue; + } + if self.state.deps_satisfied(rdep) { + self.tasks_blocked.remove(rdep); + self.tasks_runnable.push(rdep.clone()); + } + } + } + + fn spawn_task(&self, task_ref: &TaskRef<'ctx>, runner: &Runner) -> Result { + let task_def = &self.state.ctx[task_ref]; + if task_def.action.is_empty() { + println!("Skipping empty task {:#}", task_ref); + return Ok(SpawnResult::Skipped(TaskOutput::default())); + } + + let task_deps = self.state.task_deps(task_ref)?; + let task_output = task_def + .output + .iter() + .map(|(name, Output { path, .. })| { + let output_path = if let Some(path) = path { + format!("{}/{}", paths::TASK_DESTDIR, path) + } else { + paths::TASK_DESTDIR.to_string() + }; + (name.clone(), output_path) + }) + .collect(); + + let ancestors = self.state.task_ancestors(task_ref); + + let mut run = Self::task_preamble(task_ref); + run.push(&task_def.action.run); + + let command = template::ENGINE + .eval_sh(&run.concat(), &task_ref.args) + .with_context(|| { + format!("Failed to evaluate command template for task {}", task_ref) + })?; + + let rootfs = self.state.ctx.get_rootfs(); + let task = Task { + label: format!("{:#}", task_ref), + command, + workdir: paths::TASK_WORKDIR.to_string(), + rootfs: rootfs.0, + ancestors, + depends: task_deps, + outputs: task_output, + pins: HashMap::from([rootfs.clone()]), + force_run: self.force_run.contains(task_ref), + }; + + Ok(SpawnResult::Spawned(runner.spawn(&task))) + } + + fn run_task(&mut self, task_ref: TaskRef<'ctx>, runner: &Runner) -> Result<()> { + match self.spawn_task(&task_ref, runner)? { + SpawnResult::Spawned(socket) => { + assert!(self + .tasks_running + .insert(socket.as_raw_fd(), (socket, task_ref)) + .is_none()); + } + SpawnResult::Skipped(result) => { + self.update_runnable(task_ref, result); + } + } + Ok(()) + } + + fn run_tasks(&mut self, runner: &Runner) -> Result<()> { + while let Some(task_ref) = self.tasks_runnable.pop() { + self.run_task(task_ref, runner)?; + } + Ok(()) + } + + fn wait_for_task(&mut self, signal_fd: &mut SignalFd) -> Result> { + let mut pollfds: Vec<_> = self + .tasks_running + .values() + .map(|(socket, _)| socket.as_fd()) + .chain(iter::once(signal_fd.as_fd())) + .map(|fd| poll::PollFd::new(fd, poll::PollFlags::POLLIN)) + .collect(); + + while poll::poll(&mut pollfds, poll::PollTimeout::NONE).context("poll()")? == 0 {} + + let pollevents: Vec<_> = pollfds + .into_iter() + .map(|pollfd| { + ( + pollfd.as_fd().as_raw_fd(), + pollfd.revents().expect("Unknown events in poll() return"), + ) + }) + .collect(); + + for (fd, events) in pollevents { + if !events.contains(poll::PollFlags::POLLIN) { + if events.intersects(!poll::PollFlags::POLLIN) { + return Err(Error::new( + "Unexpected error status for socket file descriptor", + )); + } + continue; + } + + if fd == signal_fd.as_raw_fd() { + let _signal = signal_fd.read_signal().expect("read_signal()").unwrap(); + return Ok(Some(TaskWaitResult::Interrupted)); + } + + let (socket, task_ref) = self.tasks_running.remove(&fd).unwrap(); + + match Runner::result(&socket) { + Ok(task_output) => { + self.update_runnable(task_ref, task_output); + } + Err(error) => { + eprintln!("{}", error); + return Ok(Some(TaskWaitResult::Failed)); + } + } + } + + Ok(None) + } + + fn is_done(&self) -> bool { + self.tasks_blocked.is_empty() + && self.tasks_runnable.is_empty() + && self.tasks_running.is_empty() + } + + fn setup_signalfd() -> Result { + let mut signals = signal::SigSet::empty(); + signals.add(signal::Signal::SIGINT); + signal::pthread_sigmask(signal::SigmaskHow::SIG_BLOCK, Some(&signals), None) + .expect("pthread_sigmask()"); + SignalFd::with_flags(&signals, SfdFlags::SFD_CLOEXEC) + .context("Failed to create signal file descriptor") + } + + fn raise_sigint() { + let mut signals = signal::SigSet::empty(); + signals.add(signal::Signal::SIGINT); + signal::pthread_sigmask(signal::SigmaskHow::SIG_UNBLOCK, Some(&signals), None) + .expect("pthread_sigmask()"); + signal::raise(signal::Signal::SIGINT).expect("raise()"); + unreachable!(); + } + + pub fn run(&mut self, runner: &Runner, keep_going: bool) -> Result { + let mut success = true; + let mut interrupted = false; + + let mut signal_fd = Self::setup_signalfd()?; + + self.run_tasks(runner)?; + + while !self.tasks_running.is_empty() { + match self.wait_for_task(&mut signal_fd)? { + Some(TaskWaitResult::Failed) => { + success = false; + } + Some(TaskWaitResult::Interrupted) => { + if interrupted { + Self::raise_sigint(); + } + eprintln!("Interrupt received, not spawning new tasks. Interrupt again to stop immediately."); + interrupted = true; + } + None => {} + } + if !interrupted && (success || keep_going) { + self.run_tasks(runner)?; + } + } + + if interrupted || !success { + return Ok(false); + } + + assert!(self.is_done(), "No runnable tasks left"); + self.state.print_summary(); + + Ok(true) + } +} diff --git a/crates/rebel/src/main.rs b/crates/rebel/src/main.rs new file mode 100644 index 0000000..bd08f18 --- /dev/null +++ b/crates/rebel/src/main.rs @@ -0,0 +1,79 @@ +mod args; +mod context; +mod driver; +mod parse; +mod paths; +mod pin; +mod recipe; +mod resolve; +mod task; +mod template; + +use std::collections::HashSet; + +use clap::Parser; + +use runner::Runner; + +#[derive(Parser)] +#[clap(version, about)] +struct Opts { + /// Allow N jobs at once. + /// Defaults to the number of available CPUs + #[clap(short, long)] + jobs: Option, + /// Keep going after some tasks have failed + #[clap(short, long)] + keep_going: bool, + /// The tasks to run + #[clap(name = "task", required = true)] + tasks: Vec, +} + +fn main() { + let opts: Opts = Opts::parse(); + + let runner = unsafe { Runner::new(&runner::Options { jobs: opts.jobs }) }.unwrap(); + + let ctx = context::Context::new( + recipe::read_recipes("examples/recipes").unwrap(), + pin::read_pins("examples/pins.yml").unwrap(), + ) + .unwrap(); + + let mut rsv = resolve::Resolver::new(&ctx); + let mut force_run = HashSet::new(); + + for task in opts.tasks { + let (task_ref, flags) = match ctx.parse(&task) { + Ok(task_ref) => task_ref, + Err(err) => { + eprintln!("{}", err); + std::process::exit(1); + } + }; + let errors = rsv.add_goal(&task_ref); + if !errors.is_empty() { + for error in errors { + eprintln!("{}", error); + } + std::process::exit(1); + } + if flags.force_run { + force_run.insert(task_ref); + } + } + let taskset = rsv.into_taskset(); + let mut driver = driver::Driver::new(&ctx, taskset, force_run).unwrap(); + match driver.run(&runner, opts.keep_going) { + Ok(success) => { + if !success { + std::process::exit(1); + } + } + Err(error) => { + eprintln!("{}", error); + std::process::exit(1); + } + } +} diff --git a/crates/rebel/src/parse.rs b/crates/rebel/src/parse.rs new file mode 100644 index 0000000..5857efb --- /dev/null +++ b/crates/rebel/src/parse.rs @@ -0,0 +1,72 @@ +#[derive(Debug, Clone, Copy)] +pub struct TaskID<'a> { + pub recipe: &'a str, + pub task: &'a str, +} + +#[derive(Debug, Clone, Copy, Default)] +pub struct TaskArgs<'a> { + pub host: Option<&'a str>, + pub target: Option<&'a str>, +} + +#[derive(Debug, Clone, Copy)] +pub struct Task<'a> { + pub id: TaskID<'a>, + pub args: TaskArgs<'a>, +} + +#[derive(Debug, Clone, Copy)] +pub struct TaskFlags { + pub force_run: bool, +} + +peg::parser! { + grammar rules() for str { + rule t(tag: rule<()>, value: rule) -> T + = tag() v:value() { v } + + rule name_char() + = ['a'..='z' | 'A' ..='Z' | '0'..='9' | '_' | '-'] + + rule name() -> &'input str + = $(name_char()+) + + rule recipe_id() -> &'input str + = $(name() ("/" name())?) + + rule task_id() -> TaskID<'input> + = recipe:recipe_id() "::" task:name() { + TaskID { recipe, task } + } + + rule task_args() -> TaskArgs<'input> + = "@" host:name()? target:t(<":">, )? { + TaskArgs { + host, + target, + } + } + / { Default::default() } + + pub rule task() -> Task<'input> + = id:task_id() args:task_args() { + Task { + id, + args, + } + } + + rule force_run() -> bool + = "+" { true } + / { false } + + rule task_flags() -> TaskFlags + = force_run:force_run() { TaskFlags { force_run } } + + pub rule task_with_flags() -> (Task<'input>, TaskFlags) + = task:task() flags:task_flags() { (task, flags) } + } +} + +pub use rules::*; diff --git a/crates/rebel/src/paths.rs b/crates/rebel/src/paths.rs new file mode 100644 index 0000000..274dda1 --- /dev/null +++ b/crates/rebel/src/paths.rs @@ -0,0 +1,4 @@ +pub const TASK_DESTDIR: &str = "/build/dest"; +pub const TASK_DLDIR: &str = "/build/downloads"; +pub const TASK_WORKDIR: &str = "/build/work"; +pub const TASK_SYSROOT: &str = "/opt/toolchain/sysroot"; diff --git a/crates/rebel/src/pin.rs b/crates/rebel/src/pin.rs new file mode 100644 index 0000000..26e445c --- /dev/null +++ b/crates/rebel/src/pin.rs @@ -0,0 +1,39 @@ +use std::{collections::HashMap, fs::File, path::Path}; + +use serde::{Deserialize, Serialize}; + +use common::{error::*, string_hash::*}; + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct Args { + pub host: Option, + pub target: Option, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct Provides { + pub recipe: String, + pub task: String, + pub output: Vec, + pub args: Args, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(rename_all = "kebab-case")] +pub struct Pin { + pub hash: Option, + #[serde(default)] + pub provides: Vec, + #[serde(default)] + pub is_rootfs: bool, +} + +pub type Pins = HashMap; + +pub fn read_pins>(path: P) -> Result { + let f = File::open(path)?; + let pins: Pins = serde_yaml::from_reader(f) + .map_err(Error::new) + .context("YAML error")?; + Ok(pins) +} diff --git a/crates/rebel/src/recipe.rs b/crates/rebel/src/recipe.rs new file mode 100644 index 0000000..16d3751 --- /dev/null +++ b/crates/rebel/src/recipe.rs @@ -0,0 +1,188 @@ +use std::{collections::HashMap, ffi::OsStr, fs::File, path::Path, result}; + +use scoped_tls_hkt::scoped_thread_local; +use serde::{de::DeserializeOwned, Deserialize, Deserializer}; +use walkdir::WalkDir; + +use common::{error::*, types::*}; + +use crate::task::{TaskDef, TaskMeta}; + +scoped_thread_local!(static CURRENT_RECIPE: str); + +fn current_recipe() -> String { + CURRENT_RECIPE.with(|current| current.to_string()) +} + +pub fn deserialize_task_id<'de, D>(deserializer: D) -> result::Result +where + D: Deserializer<'de>, +{ + #[derive(Deserialize)] + struct RecipeTaskID { + recipe: Option, + task: String, + } + let RecipeTaskID { recipe, task } = RecipeTaskID::deserialize(deserializer)?; + Ok(TaskID { + recipe: recipe.unwrap_or_else(current_recipe), + task, + }) +} + +#[derive(Clone, Debug, Deserialize, Default)] +pub struct RecipeMeta { + pub name: Option, + pub version: Option, +} + +#[derive(Debug, Deserialize)] +struct Recipe { + #[serde(default)] + pub meta: RecipeMeta, + pub tasks: HashMap, +} + +#[derive(Debug, Deserialize)] +struct Subrecipe { + pub tasks: HashMap, +} + +fn read_yaml(path: &Path) -> Result { + let f = File::open(path).context("IO error")?; + + let value: T = serde_yaml::from_reader(f) + .map_err(Error::new) + .context("YAML error")?; + + Ok(value) +} + +const RECIPE_NAME: &str = "build"; +const RECIPE_PREFIX: &str = "build."; + +fn recipe_name(path: &Path) -> Option<&str> { + if path.extension() != Some("yml".as_ref()) { + return None; + } + + let stem = path.file_stem()?.to_str()?; + if stem == RECIPE_NAME { + return Some(""); + } + stem.strip_prefix(RECIPE_PREFIX) +} + +fn handle_recipe_tasks( + tasks: &mut HashMap>, + recipe_tasks: HashMap, + meta: &TaskMeta, +) { + for (label, mut task) in recipe_tasks { + let task_id = TaskID { + recipe: meta.recipe.clone(), + task: label, + }; + task.meta = meta.clone(); + tasks.entry(task_id).or_default().push(task); + } +} + +fn read_recipe_tasks( + path: &Path, + basename: &str, + tasks: &mut HashMap>, +) -> Result { + let recipe_def = CURRENT_RECIPE.set(basename, || read_yaml::(path))?; + + let name = recipe_def + .meta + .name + .as_deref() + .unwrap_or(basename) + .to_string(); + + let meta = TaskMeta { + basename: basename.to_string(), + recipename: "".to_string(), + recipe: basename.to_string(), + name, + version: recipe_def.meta.version.clone(), + }; + + handle_recipe_tasks(tasks, recipe_def.tasks, &meta); + + Ok(recipe_def.meta) +} + +fn read_subrecipe_tasks( + path: &Path, + basename: &str, + recipename: &str, + recipe_meta: &RecipeMeta, + tasks: &mut HashMap>, +) -> Result<()> { + let recipe = format!("{basename}/{recipename}"); + let recipe_def = CURRENT_RECIPE.set(&recipe, || read_yaml::(path))?; + + let name = recipe_meta.name.as_deref().unwrap_or(basename).to_string(); + + let meta = TaskMeta { + basename: basename.to_string(), + recipename: recipename.to_string(), + recipe: recipe.clone(), + name, + version: recipe_meta.version.clone(), + }; + + handle_recipe_tasks(tasks, recipe_def.tasks, &meta); + + Ok(()) +} + +pub fn read_recipes>(path: P) -> Result>> { + let mut tasks = HashMap::>::new(); + let mut recipe_metas = HashMap::::new(); + + for entry in WalkDir::new(path) + .sort_by(|a, b| { + // Files are sorted first by stem, then by extension, so that + // recipe.yml will always be read before recipe.NAME.yml + let stem_cmp = a.path().file_stem().cmp(&b.path().file_stem()); + let ext_cmp = a.path().extension().cmp(&b.path().extension()); + stem_cmp.then(ext_cmp) + }) + .into_iter() + .filter_map(|e| e.ok()) + { + let path = entry.path(); + if !path.is_file() { + continue; + } + + let Some(recipename) = recipe_name(path) else { + continue; + }; + let Some(basename) = path + .parent() + .and_then(Path::file_name) + .and_then(OsStr::to_str) + else { + continue; + }; + + if recipename.is_empty() { + recipe_metas.insert( + basename.to_string(), + read_recipe_tasks(path, basename, &mut tasks)?, + ); + } else { + let Some(recipe_meta) = recipe_metas.get(basename) else { + continue; + }; + read_subrecipe_tasks(path, basename, recipename, recipe_meta, &mut tasks)?; + } + } + + Ok(tasks) +} diff --git a/crates/rebel/src/resolve.rs b/crates/rebel/src/resolve.rs new file mode 100644 index 0000000..102c483 --- /dev/null +++ b/crates/rebel/src/resolve.rs @@ -0,0 +1,334 @@ +use std::collections::{HashMap, HashSet}; +use std::fmt; +use std::rc::Rc; + +use common::types::TaskID; + +use crate::args::TaskArgs; +use crate::context::{self, Context, OutputRef, TaskRef}; + +#[derive(Debug, Default)] +pub struct DepChain<'ctx>(pub Vec>); + +impl<'ctx> fmt::Display for DepChain<'ctx> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut first = true; + for task in self.0.iter().rev() { + if !first { + write!(f, " -> ")?; + } + write!(f, "{}", task)?; + + first = false; + } + + Ok(()) + } +} + +impl<'ctx> From> for DepChain<'ctx> { + fn from(task: TaskRef<'ctx>) -> Self { + DepChain(vec![task]) + } +} + +impl<'ctx> From<&TaskRef<'ctx>> for DepChain<'ctx> { + fn from(task: &TaskRef<'ctx>) -> Self { + task.clone().into() + } +} + +impl<'ctx> From<&'ctx TaskID> for DepChain<'ctx> { + fn from(id: &'ctx TaskID) -> Self { + TaskRef { + id, + args: Rc::new(TaskArgs::default()), + } + .into() + } +} + +const MAX_ERRORS: usize = 100; + +#[derive(Debug)] +pub enum ErrorKind<'ctx> { + Context(context::Error<'ctx>), + OutputNotFound(&'ctx str), + DependencyCycle, + TooManyErrors, +} + +#[derive(Debug)] +pub struct Error<'ctx> { + pub dep_chain: DepChain<'ctx>, + pub kind: ErrorKind<'ctx>, +} + +impl<'ctx> Error<'ctx> { + fn output_not_found(task: &TaskRef<'ctx>, output: &'ctx str) -> Self { + Error { + dep_chain: task.into(), + kind: ErrorKind::OutputNotFound(output), + } + } + + fn dependency_cycle(task: &TaskRef<'ctx>) -> Self { + Error { + dep_chain: task.into(), + kind: ErrorKind::DependencyCycle, + } + } + + fn too_many_errors() -> Self { + Error { + dep_chain: DepChain::default(), + kind: ErrorKind::TooManyErrors, + } + } + + fn extend(&mut self, task: &TaskRef<'ctx>) { + self.dep_chain.0.push(task.clone()); + } +} + +impl<'ctx> fmt::Display for Error<'ctx> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let Error { dep_chain, kind } = self; + match kind { + ErrorKind::Context(err) => { + write!(f, "{}: ", err)?; + } + ErrorKind::OutputNotFound(output) => { + write!(f, "Output '{}' not found: ", output)?; + } + ErrorKind::DependencyCycle => { + write!(f, "Dependency Cycle: ")?; + } + ErrorKind::TooManyErrors => { + write!(f, "Too many errors, stopping.")?; + } + } + dep_chain.fmt(f) + } +} + +impl<'ctx> From> for Error<'ctx> { + fn from(err: context::Error<'ctx>) -> Self { + Error { + dep_chain: err.task.into(), + kind: ErrorKind::Context(err), + } + } +} + +impl<'ctx> std::error::Error for Error<'ctx> {} + +#[derive(Debug, PartialEq)] +enum ResolveState { + Resolving, + Resolved, +} + +pub fn runtime_depends<'ctx, I>( + ctx: &'ctx Context, + deps: I, +) -> Result, Vec> +where + I: IntoIterator>, +{ + fn add_dep<'ctx>( + ret: &mut HashSet>, + ctx: &'ctx Context, + dep: OutputRef<'ctx>, + ) -> Vec> { + if ret.contains(&dep) { + return Vec::new(); + } + + let task = &dep.task; + let task_def = match ctx.get(task) { + Ok(task) => task, + Err(err) => return vec![err.into()], + }; + + let output = match task_def.output.get(dep.output) { + Some(output) => output, + None => { + return vec![Error::output_not_found(task, dep.output)]; + } + }; + + ret.insert(dep.clone()); + + let mut errors = Vec::new(); + for runtime_dep in &output.runtime_depends { + match ctx.output_ref(runtime_dep, &task.args, false) { + Ok(output_ref) => { + for mut error in add_dep(ret, ctx, output_ref) { + error.extend(task); + errors.push(error); + } + } + Err(err) => { + let mut err: Error = err.into(); + err.extend(task); + errors.push(err); + } + }; + } + errors + } + + let mut ret = HashSet::new(); + let mut errors = Vec::new(); + + for dep in deps { + errors.extend(add_dep(&mut ret, ctx, dep)); + } + + if !errors.is_empty() { + return Err(errors); + } + + Ok(ret) +} + +pub fn get_dependent_outputs<'ctx>( + ctx: &'ctx Context, + task_ref: &TaskRef<'ctx>, +) -> Result>, Vec>> { + let deps: HashSet<_> = ctx + .get_build_depends(task_ref) + .map_err(|err| vec![err.into()])? + .into_iter() + .chain( + ctx.get_host_depends(task_ref) + .map_err(|err| vec![err.into()])?, + ) + .collect(); + runtime_depends(ctx, deps) +} + +pub fn get_dependent_tasks<'ctx>( + ctx: &'ctx Context, + task_ref: &TaskRef<'ctx>, +) -> Result>, Vec>> { + Ok(ctx + .get_parent_depend(task_ref) + .map_err(|err| vec![err.into()])? + .into_iter() + .chain( + get_dependent_outputs(ctx, task_ref)? + .into_iter() + .map(|dep| dep.task), + ) + .collect()) +} + +#[derive(Debug)] +pub struct Resolver<'ctx> { + ctx: &'ctx Context, + resolve_state: HashMap, ResolveState>, +} + +impl<'ctx> Resolver<'ctx> { + pub fn new(ctx: &'ctx Context) -> Self { + Resolver { + ctx, + resolve_state: HashMap::new(), + } + } + + fn tasks_resolved(&self) -> bool { + self.resolve_state + .values() + .all(|resolved| *resolved == ResolveState::Resolved) + } + + fn add_task(&mut self, task: &TaskRef<'ctx>, output: Option<&'ctx str>) -> Vec> { + match self.resolve_state.get(task) { + Some(ResolveState::Resolving) => return vec![Error::dependency_cycle(task)], + Some(ResolveState::Resolved) => return vec![], + None => (), + } + + let task_def = match self.ctx.get(task) { + Ok(task_def) => task_def, + Err(err) => return vec![err.into()], + }; + + if let Some(task_output) = output { + if !task_def.output.contains_key(task_output) { + return vec![Error::output_not_found(task, task_output)]; + } + } + + self.resolve_state + .insert(task.clone(), ResolveState::Resolving); + + let mut ret = Vec::new(); + let mut handle_errors = |errors: Vec>| -> Result<(), ()> { + for mut error in errors { + error.extend(task); + ret.push(error); + + if ret.len() > MAX_ERRORS { + ret.push(Error::too_many_errors()); + return Err(()); + } + } + Ok(()) + }; + + let _ = (|| -> Result<(), ()> { + match self.ctx.get_parent_depend(task) { + Ok(Some(parent)) => { + handle_errors(self.add_task(&parent, None))?; + } + Ok(None) => {} + Err(err) => { + handle_errors(vec![err.into()])?; + } + } + + match get_dependent_outputs(self.ctx, task) { + Ok(rdeps) => { + for rdep in rdeps { + handle_errors(self.add_task(&rdep.task, Some(rdep.output)))?; + } + } + Err(errors) => { + handle_errors(errors)?; + } + } + + Ok(()) + })(); + + if ret.is_empty() { + *self + .resolve_state + .get_mut(task) + .expect("Missing resolve_state") = ResolveState::Resolved; + } else { + self.resolve_state.remove(task); + } + + ret + } + + pub fn add_goal(&mut self, task: &TaskRef<'ctx>) -> Vec> { + let ret = self.add_task(task, None); + debug_assert!(self.tasks_resolved()); + ret + } + + pub fn into_taskset(self) -> HashSet> { + debug_assert!(self.tasks_resolved()); + + self.resolve_state + .into_iter() + .map(|entry| entry.0) + .collect() + } +} diff --git a/crates/rebel/src/task.rs b/crates/rebel/src/task.rs new file mode 100644 index 0000000..e84766e --- /dev/null +++ b/crates/rebel/src/task.rs @@ -0,0 +1,96 @@ +use std::collections::{HashMap, HashSet}; + +use serde::Deserialize; + +use common::{string_hash::StringHash, types::TaskID}; + +use crate::{ + args::{ArgMapping, ArgType, TaskArgs}, + recipe, +}; + +#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Hash)] +pub struct TaskDep { + #[serde(flatten, deserialize_with = "recipe::deserialize_task_id")] + pub id: TaskID, + #[serde(default)] + pub args: ArgMapping, +} + +#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Hash)] +pub struct Fetch { + pub name: String, + pub sha256: StringHash, +} + +fn default_output_name() -> String { + "default".to_string() +} + +#[derive(Clone, Debug, Deserialize)] +pub struct ParentDep { + #[serde(flatten)] + pub dep: TaskDep, +} + +#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Hash)] +pub struct OutputDep { + #[serde(flatten)] + pub dep: TaskDep, + #[serde(default)] + pub noinherit: bool, + #[serde(default = "default_output_name")] + pub output: String, +} + +#[derive(Clone, Debug, Deserialize, Default)] +pub struct Output { + pub path: Option, + #[serde(default)] + pub runtime_depends: HashSet, +} + +#[derive(Clone, Debug, Deserialize, Default)] +pub struct Action { + #[serde(default)] + pub run: String, +} + +impl Action { + pub fn is_empty(&self) -> bool { + self.run.is_empty() + } +} + +#[derive(Clone, Debug, Default)] +pub struct TaskMeta { + pub basename: String, + pub recipename: String, + pub recipe: String, + pub name: String, + pub version: Option, +} + +#[derive(Clone, Debug, Deserialize, Default)] +pub struct TaskDef { + #[serde(skip)] + pub meta: TaskMeta, + #[serde(default)] + pub args: HashMap, + #[serde(default)] + pub parent: Option, + #[serde(default)] + pub fetch: HashSet, + #[serde(default)] + pub build_depends: HashSet, + #[serde(default)] + pub depends: HashSet, + #[serde(default)] + pub output: HashMap, + #[serde(flatten)] + pub action: Action, + #[serde(default)] + pub priority: i32, + #[serde(skip)] + pub arg_match: TaskArgs, +} diff --git a/crates/rebel/src/template.rs b/crates/rebel/src/template.rs new file mode 100644 index 0000000..1a091ed --- /dev/null +++ b/crates/rebel/src/template.rs @@ -0,0 +1,42 @@ +use handlebars::Handlebars; +use lazy_static::lazy_static; + +use common::error::*; + +use crate::args::TaskArgs; + +fn escape_sh(s: &str) -> String { + format!("'{}'", s.replace('\'', "'\\''")) +} + +#[derive(Debug)] +pub struct TemplateEngine { + tpl: Handlebars<'static>, + tpl_sh: Handlebars<'static>, +} + +impl TemplateEngine { + pub fn new() -> Self { + let mut tpl = Handlebars::new(); + tpl.set_strict_mode(true); + tpl.register_escape_fn(handlebars::no_escape); + + let mut tpl_sh = Handlebars::new(); + tpl_sh.set_strict_mode(true); + tpl_sh.register_escape_fn(escape_sh); + + TemplateEngine { tpl, tpl_sh } + } + + pub fn eval(&self, input: &str, args: &TaskArgs) -> Result { + self.tpl.render_template(input, args).map_err(Error::new) + } + + pub fn eval_sh(&self, input: &str, args: &TaskArgs) -> Result { + self.tpl_sh.render_template(input, args).map_err(Error::new) + } +} + +lazy_static! { + pub static ref ENGINE: TemplateEngine = TemplateEngine::new(); +} diff --git a/crates/runner/Cargo.toml b/crates/runner/Cargo.toml deleted file mode 100644 index 0108e4b..0000000 --- a/crates/runner/Cargo.toml +++ /dev/null @@ -1,25 +0,0 @@ -[package] -name = "rebel-runner" -version = "0.1.0" -authors = ["Matthias Schiffer "] -license = "MIT" -edition = "2021" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -common = { path = "../common", package = "rebel-common" } - -bincode = "1.3.3" -blake3 = { version = "1.3.0", features = ["traits-preview"] } -capctl = "0.2.0" -digest = "0.10.1" -libc = "0.2.84" -nix = { version = "0.28.0", features = ["user", "fs", "process", "mount", "sched", "poll", "signal", "hostname", "resource"] } -olpc-cjson = "0.1.0" -serde = { version = "1", features = ["derive"] } -serde_json = "1.0.62" -tar = "0.4.32" -tee_readwrite = "0.2.0" -uds = "0.4.1" -walkdir = "2.3.2" diff --git a/crates/runner/src/init.rs b/crates/runner/src/init.rs deleted file mode 100644 index ede8fd8..0000000 --- a/crates/runner/src/init.rs +++ /dev/null @@ -1,68 +0,0 @@ -use nix::mount::{self, MsFlags}; - -use common::error::*; - -use crate::{paths, util::fs}; - -fn prepare_dev(path: &str) -> Result<()> { - fs::mkdir(path)?; - mount::mount::<_, _, str, str>(Some(path), path, None, MsFlags::MS_BIND, None) - .context("Failed to bind mount container /dev")?; - - for dir in ["pts", "shm"] { - fs::mkdir(paths::join(&[path, dir]))?; - } - - for (link, target) in [ - ("fd", "/proc/self/fd"), - ("stdin", "/proc/self/fd/0"), - ("stdout", "/proc/self/fd/1"), - ("stderr", "/proc/self/fd/2"), - ("ptmx", "pts/ptmx"), - ] { - let path = paths::join(&[path, link]); - std::os::unix::fs::symlink(target, &path) - .with_context(|| format!("Failed to create link {}", path))?; - } - - for dev in ["null", "zero", "full", "random", "urandom", "tty"] { - let source = paths::join(&["/dev", dev]); - let target = paths::join(&[path, dev]); - fs::create(&target)?; - mount::mount::(Some(&source), &target, None, MsFlags::MS_BIND, None) - .with_context(|| format!("Failed to bind mount {}", source))?; - } - - mount::mount::( - None, - path, - None, - MsFlags::MS_REMOUNT | MsFlags::MS_BIND | MsFlags::MS_RDONLY, - None, - ) - .context("Failed to mount container /dev read-only")?; - - Ok(()) -} - -pub fn init_runner() -> Result<()> { - fs::mkdir(paths::LAYER_STATE_DIR)?; - fs::mkdir(paths::OUTPUT_STATE_DIR)?; - - fs::ensure_removed(paths::TMP_DIR)?; - fs::mkdir(paths::TMP_DIR)?; - mount::mount::<_, _, str, str>( - Some(paths::TMP_DIR), - paths::TMP_DIR, - None, - MsFlags::MS_BIND, - None, - ) - .context("Failed to bind mount build tmpdir")?; - mount::mount::(None, paths::TMP_DIR, None, MsFlags::MS_PRIVATE, None) - .context("Failed to set MS_PRIVATE for build tmpdir")?; - - prepare_dev(paths::DEV_DIR)?; - - Ok(()) -} diff --git a/crates/runner/src/jobserver.rs b/crates/runner/src/jobserver.rs deleted file mode 100644 index b0b88cd..0000000 --- a/crates/runner/src/jobserver.rs +++ /dev/null @@ -1,79 +0,0 @@ -use std::{ - os::fd::{AsFd, AsRawFd, OwnedFd}, - slice, -}; - -use nix::{errno::Errno, fcntl::OFlag, poll, unistd}; - -use common::error::*; - -use super::util::unix; - -#[derive(Debug)] -pub struct Jobserver { - tokens: usize, - r: OwnedFd, - w: OwnedFd, -} - -impl Jobserver { - pub fn new(tokens: usize) -> Result { - let (r, w) = unistd::pipe2(OFlag::O_CLOEXEC | OFlag::O_NONBLOCK).context("pipe()")?; - - for _ in 0..tokens { - if unistd::write(w.as_fd(), b"+").is_err() { - break; - } - } - unix::set_blocking(&w, true)?; - - Ok(Jobserver { tokens, r, w }) - } - - pub fn wait(&mut self) -> u8 { - loop { - poll::poll( - &mut [poll::PollFd::new(self.r.as_fd(), poll::PollFlags::POLLIN)], - poll::PollTimeout::NONE, - ) - .expect("poll()"); - - let mut token = 0; - match unistd::read(self.r.as_raw_fd(), slice::from_mut(&mut token)) { - Ok(n) => { - assert!(n == 1); - return token; - } - Err(Errno::EAGAIN) => { - // Token was sniped by another task - continue; - } - error @ Err(_) => { - error.expect("read()"); - } - } - } - } - - pub fn post(&mut self, token: u8) { - let n = unistd::write(self.w.as_fd(), slice::from_ref(&token)).expect("write()"); - assert!(n == 1); - } - - pub fn set_cloexec(&mut self, cloexec: bool) -> Result<()> { - unix::set_cloexec(&self.r, cloexec)?; - unix::set_cloexec(&self.w, cloexec)?; - Ok(()) - } - - pub fn to_makeflags(&self) -> String { - format!( - " -j{} --jobserver-auth={},{}", - self.tokens, - self.r.as_raw_fd(), - self.w.as_raw_fd(), - ) - } -} - -// FIXME Log lost tokens on drop diff --git a/crates/runner/src/lib.rs b/crates/runner/src/lib.rs deleted file mode 100644 index ab90420..0000000 --- a/crates/runner/src/lib.rs +++ /dev/null @@ -1,217 +0,0 @@ -mod init; -mod jobserver; -mod ns; -mod paths; -mod tar; -mod task; -mod util; - -use std::{ - collections::HashSet, - fs::File, - net, - os::unix::{net::UnixStream, prelude::*}, - process, slice, -}; - -use capctl::prctl; -use nix::{ - errno::Errno, - fcntl::Flock, - poll, - sched::CloneFlags, - sys::{ - signal, - signalfd::{SfdFlags, SignalFd}, - stat, wait, - }, - unistd::{self, Gid, Pid, Uid}, -}; -use uds::UnixSeqpacketConn; - -use common::{error::*, types::*}; - -use jobserver::Jobserver; -use util::{checkable::Checkable, clone, steal::Steal, unix}; - -#[derive(Debug, Clone)] -pub struct Options { - pub jobs: Option, -} - -#[derive(Debug)] -struct RunnerContext { - socket: Steal, - jobserver: Jobserver, - tasks: HashSet, -} - -fn handle_sigchld(ctx: &mut RunnerContext) -> Result<()> { - loop { - let status = match wait::waitpid(Pid::from_raw(-1), Some(wait::WaitPidFlag::WNOHANG)) { - Ok(wait::WaitStatus::StillAlive) | Err(Errno::ECHILD) => return Ok(()), - res => res.expect("waitpid()"), - }; - let pid = status.pid().unwrap(); - if ctx.tasks.remove(&pid) { - status.check()?; - } - } -} - -fn handle_request(ctx: &mut RunnerContext, request_socket: UnixStream) { - let run = || { - ctx.socket.steal(); - - let task: Task = - bincode::deserialize_from(&request_socket).expect("Failed to decode task description"); - - prctl::set_name(&task.label).expect("prctl(PR_SET_NAME)"); - - let result = task::handle(task, &mut ctx.jobserver); - bincode::serialize_into(&request_socket, &result).expect("Failed to send task result"); - drop(request_socket); - }; - - let pid = unsafe { clone::spawn(None, run) }.expect("fork()"); - assert!(ctx.tasks.insert(pid)); -} - -fn handle_socket(ctx: &mut RunnerContext) -> bool { - let mut fd = 0; - - match ctx - .socket - .recv_fds(&mut [0], slice::from_mut(&mut fd)) - .expect("recv_fds()") - { - (1, _, n_fd) => { - assert!(n_fd == 1); - } - _ => return false, - } - - let request_socket = unsafe { UnixStream::from_raw_fd(fd) }; - handle_request(ctx, request_socket); - true -} - -fn borrow_socket_fd(socket: &UnixSeqpacketConn) -> BorrowedFd<'_> { - unsafe { BorrowedFd::borrow_raw(socket.as_raw_fd()) } -} - -fn runner( - uid: Uid, - gid: Gid, - socket: UnixSeqpacketConn, - _lockfile: Flock, - options: &Options, -) -> ! { - unistd::setsid().expect("setsid()"); - ns::mount_proc(); - ns::setup_userns(Uid::from_raw(0), Gid::from_raw(0), uid, gid); - - stat::umask(stat::Mode::from_bits_truncate(0o022)); - - init::init_runner().unwrap(); - - let jobs = options - .jobs - .unwrap_or_else(|| unix::nproc().expect("Failed to get number of available CPUs")); - let jobserver = Jobserver::new(jobs).expect("Failed to initialize jobserver pipe"); - let mut ctx = RunnerContext { - socket: socket.into(), - jobserver, - tasks: HashSet::new(), - }; - - let mut signals = signal::SigSet::empty(); - signals.add(signal::Signal::SIGCHLD); - signal::pthread_sigmask(signal::SigmaskHow::SIG_BLOCK, Some(&signals), None) - .expect("pthread_sigmask()"); - let mut signal_fd = SignalFd::with_flags(&signals, SfdFlags::SFD_CLOEXEC) - .expect("Failed to create signal file descriptor"); - - loop { - let socket_fd = borrow_socket_fd(&ctx.socket); - let mut pollfds = [ - poll::PollFd::new(signal_fd.as_fd(), poll::PollFlags::POLLIN), - poll::PollFd::new(socket_fd.as_fd(), poll::PollFlags::POLLIN), - ]; - poll::poll(&mut pollfds, poll::PollTimeout::NONE).expect("poll()"); - - let signal_events = pollfds[0] - .revents() - .expect("Unknown events in poll() return"); - let socket_events = pollfds[1] - .revents() - .expect("Unknown events in poll() return"); - - if signal_events.contains(poll::PollFlags::POLLIN) { - let _signal = signal_fd.read_signal().expect("read_signal()").unwrap(); - handle_sigchld(&mut ctx).expect("Task process exited abnormally"); - } else if signal_events.intersects(!poll::PollFlags::POLLIN) { - panic!("Unexpected error status for signal file descriptor"); - } - - if socket_events.contains(poll::PollFlags::POLLIN) { - if !handle_socket(&mut ctx) { - break; - } - } else if socket_events.intersects(!poll::PollFlags::POLLIN) { - panic!("Unexpected error status for socket file descriptor"); - } - } - - process::exit(0); -} - -pub struct Runner { - socket: UnixSeqpacketConn, -} - -impl Runner { - /// Creates a new container runner - /// - /// # Safety - /// - /// Do not call in multithreaded processes. - pub unsafe fn new(options: &Options) -> Result { - let lockfile = unix::lock(paths::LOCKFILE, true, false) - .context("Failed to get lock on build directory, is another instance running?")?; - - let uid = unistd::geteuid(); - let gid = unistd::getegid(); - - let (local, remote) = UnixSeqpacketConn::pair().expect("socketpair()"); - - match clone::clone( - CloneFlags::CLONE_NEWUSER | CloneFlags::CLONE_NEWNS | CloneFlags::CLONE_NEWPID, - ) - .expect("clone()") - { - unistd::ForkResult::Parent { .. } => Ok(Runner { socket: local }), - unistd::ForkResult::Child => { - drop(local); - runner(uid, gid, remote, lockfile, options); - } - } - } - - pub fn spawn(&self, task: &Task) -> UnixStream { - let (local, remote) = UnixStream::pair().expect("socketpair()"); - - self.socket - .send_fds(&[0], &[remote.as_raw_fd()]) - .expect("send()"); - - bincode::serialize_into(&local, task).expect("Task submission failed"); - local.shutdown(net::Shutdown::Write).expect("shutdown()"); - - local - } - - pub fn result(socket: &UnixStream) -> Result { - bincode::deserialize_from(socket).expect("Failed to read task result") - } -} diff --git a/crates/runner/src/ns.rs b/crates/runner/src/ns.rs deleted file mode 100644 index 4a8e3e7..0000000 --- a/crates/runner/src/ns.rs +++ /dev/null @@ -1,84 +0,0 @@ -use nix::{ - mount::{self, MntFlags, MsFlags}, - sched::CloneFlags, - unistd::{self, Gid, Pid, Uid}, -}; - -use common::error::*; - -use super::util::clone; - -pub fn mount_proc() { - mount::mount::<_, _, _, str>(Some("proc"), "/proc", Some("proc"), MsFlags::empty(), None) - .expect("Failed to mount /proc"); -} - -pub fn setup_userns(inner_uid: Uid, inner_gid: Gid, outer_uid: Uid, outer_gid: Gid) { - std::fs::write("/proc/self/setgroups", "deny").expect("Failed to write /proc/self/setgroups"); - std::fs::write( - "/proc/self/uid_map", - format!("{} {} 1", inner_uid, outer_uid), - ) - .expect("Failed to write /proc/self/uid_map"); - std::fs::write( - "/proc/self/gid_map", - format!("{} {} 1", inner_gid, outer_gid), - ) - .expect("Failed to write /proc/self/gid_map"); -} - -pub unsafe fn spawn(flags: CloneFlags, f: F) -> nix::Result -where - F: FnOnce(), -{ - assert!(flags.contains(CloneFlags::CLONE_NEWNS) || !flags.contains(CloneFlags::CLONE_NEWPID)); - - clone::spawn(Some(flags), || { - if flags.contains(CloneFlags::CLONE_NEWPID) { - mount_proc(); - } - f() - }) -} - -pub fn pivot_root(path: &str) { - (|| -> Result<()> { - unistd::chdir(path).context("chdir()")?; - mount::mount::<_, _, str, str>(Some("/proc"), "proc", None, MsFlags::MS_BIND, None) - .context("Failed to bind mount /proc")?; - unistd::pivot_root(".", ".").context("pivot_root()")?; - mount::umount2(".", MntFlags::MNT_DETACH).context("umount2()")?; - unistd::chdir("/").context("chdir(\"/\")")?; - Ok(()) - })() - .expect("Failed to pivot root"); -} - -pub fn container_mounts() -> Result<()> { - mount::mount( - Some("tmp"), - "/tmp", - Some("tmpfs"), - MsFlags::MS_NODEV | MsFlags::MS_NOSUID, - Some("mode=1777,size=1048576k"), - ) - .context("Failed to mount /tmp")?; - mount::mount( - Some("devpts"), - "/dev/pts", - Some("devpts"), - MsFlags::MS_NOSUID | MsFlags::MS_NOEXEC, - Some("newinstance,ptmxmode=0666,mode=0620"), - ) - .context("Failed to mount /dev/pts")?; - mount::mount( - Some("shm"), - "/dev/shm", - Some("tmpfs"), - MsFlags::MS_NOSUID | MsFlags::MS_NOEXEC | MsFlags::MS_NODEV, - Some("mode=1777,size=65536k"), - ) - .context("Failed to mount /dev/shm")?; - - Ok(()) -} diff --git a/crates/runner/src/paths.rs b/crates/runner/src/paths.rs deleted file mode 100644 index 4b3a126..0000000 --- a/crates/runner/src/paths.rs +++ /dev/null @@ -1,118 +0,0 @@ -//! Build directory structure used through rebel -//! -//! # Current structure -//! -//! ```text -//! build/ -//! ├── build.lock -//! ├── downloads/ -//! │   └── ... -//! ├── state/ -//! │   ├── output/ -//! │   │  ├── .tar.tmp # during packing -//! │   │  ├── .tar # files are renamed when packing is finished -//! │   │   └── ... -//! │   ├── layer/ -//! │   │ ├── / # overlayfs layer dir of finished tasks -//! │   │ └── ... -//! │   └── task/ -//! │     ├── / -//! │ │ ├── layer/ # overlayfs layer dir (moved to layer/ after build) -//! │ │ ├── work/ # overlayfs work dir (discarded after build) -//! │ │ ├── task.json.tmp # during write -//! │ │ ├── task.json # after write -//! │ │ ├── task.log # stdout/stderr output of the task -//! │ │ └── task.lock # task lockfile -//! │   └── ... -//! └── tmp/ # temporary files (cleaned on start) -//!    ├── dev/ # container /dev -//!    ├── depends/ # unpacked dependencies -//!    └── task/ -//!    └── / -//! ├── build/ # mount point for /build directory -//! │ ├── downloads/ # downloaded sources -//! │ ├── task/ # internal runner files -//! │ └── work/ # build overlay mountpoint -//! └── rootfs/ # rootfs overlay mountpoint -//! ``` - -use common::string_hash::*; - -pub const DOWNLOADS_DIR: &str = "build/downloads"; -pub const PIN_DIR: &str = "build/pinned"; - -pub const TMP_DIR: &str = "build/tmp"; -pub const DEV_DIR: &str = "build/tmp/dev"; -pub const DEPENDS_TMP_DIR: &str = "build/tmp/depends"; -pub const TASK_TMP_DIR: &str = "build/tmp/task"; - -pub const TASK_TMP_ROOTFS_SUBDIR: &str = "rootfs"; - -pub const LOCKFILE: &str = "build/build.lock"; -pub const OUTPUT_STATE_DIR: &str = "build/state/output"; -pub const TASK_STATE_DIR: &str = "build/state/task"; -pub const LAYER_STATE_DIR: &str = "build/state/layer"; - -pub const TASK_STATE_LAYER_SUBDIR: &str = "layer"; -pub const TASK_STATE_WORK_SUBDIR: &str = "work"; - -pub const TASK_BUILDDIR: &str = "build"; -pub const TASK_TASKDIR: &str = "build/task"; - -pub const TASK_RUN: &str = "run"; - -pub fn join(paths: &[&str]) -> String { - paths.join("/") -} - -pub fn task_tmp_dir(hash: &InputHash) -> String { - join(&[TASK_TMP_DIR, &hash.to_string()]) -} - -pub fn task_state_dir(hash: &InputHash) -> String { - join(&[TASK_STATE_DIR, &hash.to_string()]) -} - -pub fn task_cache_tmp_filename(hash: &InputHash) -> String { - join(&[TASK_STATE_DIR, &hash.to_string(), "task.json.tmp"]) -} - -pub fn task_cache_filename(hash: &InputHash) -> String { - join(&[TASK_STATE_DIR, &hash.to_string(), "task.json"]) -} - -pub fn task_log_filename(hash: &InputHash) -> String { - join(&[TASK_STATE_DIR, &hash.to_string(), "task.log"]) -} - -pub fn task_lock_filename(hash: &InputHash) -> String { - join(&[TASK_STATE_DIR, &hash.to_string(), "task.lock"]) -} - -pub fn layer_dir(hash: &LayerHash) -> String { - join(&[LAYER_STATE_DIR, &hash.to_string()]) -} - -pub fn depend_tmp_dir(hash: &ArchiveHash) -> String { - join(&[DEPENDS_TMP_DIR, &hash.to_string()]) -} - -pub fn depend_dir(hash: &ArchiveHash) -> String { - join(&[DEPENDS_TMP_DIR, &hash.to_string()]) -} - -pub fn depend_lock_filename(hash: &ArchiveHash) -> String { - join(&[DEPENDS_TMP_DIR, &format!("{}.lock", hash)]) -} - -pub fn archive_tmp_filename(hash: &InputHash) -> String { - join(&[OUTPUT_STATE_DIR, &format!("{}.tar.tmp", hash)]) -} - -pub fn archive_filename(hash: &ArchiveHash) -> String { - join(&[OUTPUT_STATE_DIR, &format!("{}.tar", hash)]) -} - -pub fn pinned_archive_filename(name: &str) -> String { - join(&[PIN_DIR, &format!("{}.tar", name)]) -} diff --git a/crates/runner/src/tar.rs b/crates/runner/src/tar.rs deleted file mode 100644 index 1a66408..0000000 --- a/crates/runner/src/tar.rs +++ /dev/null @@ -1,105 +0,0 @@ -use std::{ - io::{self, Read, Write}, - os::unix::prelude::CommandExt, - path::Path, - process::{self, Command, Stdio}, -}; - -use nix::{ - mount::{self, MsFlags}, - sched::CloneFlags, - sys::wait, -}; - -use common::{error::*, string_hash::ArchiveHash}; - -use super::{ - ns, - util::{checkable::Checkable, fs}, -}; -use crate::paths; - -pub fn pack>( - rootfs_hash: &ArchiveHash, - archive: &mut W, - source: P, -) -> Result<()> { - let rootfs = paths::depend_dir(rootfs_hash); - let _rootfs_mount = fs::mount(&rootfs, &rootfs, None, MsFlags::MS_BIND, None) - .with_context(|| format!("Failed to bind mount rootfs to {:?}", rootfs))?; - mount::mount::( - None, - &rootfs, - None, - MsFlags::MS_REMOUNT | MsFlags::MS_BIND | MsFlags::MS_RDONLY, - None, - ) - .context("Failed to mount container rootfs read-only")?; - - let (mut piper, pipew) = fs::pipe()?; - - let exec_tar = || -> Result<()> { - // We are in our own mount namespace, so mounting into the shared rootfs is fine - let dev_target = paths::join(&[&rootfs, "dev"]); - mount::mount::<_, _, str, str>( - Some(paths::DEV_DIR), - dev_target.as_str(), - None, - MsFlags::MS_BIND | MsFlags::MS_REC, - None, - )?; - let mount_target = paths::join(&[&rootfs, paths::TASK_BUILDDIR]); - mount::mount::<_, _, str, str>( - Some(source.as_ref()), - mount_target.as_str(), - None, - MsFlags::MS_BIND, - None, - )?; - - ns::pivot_root(&rootfs); - - let err = Command::new("tar") - .args([ - "-c", - "--sort=name", - "--numeric-owner", - "--owner=0", - "--group=0", - "--mtime=@0", - ".", - ]) - .stdin(Stdio::null()) - .stdout(pipew) - .current_dir(paths::TASK_BUILDDIR) - .env_clear() - .env("PATH", "/usr/sbin:/usr/bin:/sbin:/bin") - .exec(); - eprintln!("{}", err); - process::exit(127); - }; - - let pid = unsafe { ns::spawn(CloneFlags::CLONE_NEWNS, || exec_tar().unwrap()) } - .context("Failed to run tar")?; - - let result = io::copy(&mut piper, archive).context("Failed to write TAR archive"); - - wait::waitpid(pid, None)? - .check() - .context("tar did not exit successfully")?; - - result?; - Ok(()) -} - -pub fn unpack>(archive: R, dest: P) -> Result<()> { - fs::mkdir(&dest)?; - - let mut ar = tar::Archive::new(archive); - ar.set_preserve_permissions(true); - ar.set_preserve_mtime(true); - ar.set_unpack_xattrs(true); - ar.set_overwrite(false); - - ar.unpack(dest).context("Failed to unpack TAR archive") -} diff --git a/crates/runner/src/task.rs b/crates/runner/src/task.rs deleted file mode 100644 index 19b74f4..0000000 --- a/crates/runner/src/task.rs +++ /dev/null @@ -1,638 +0,0 @@ -use std::{ - collections::{BTreeMap, HashMap, HashSet}, - io::{self, BufWriter}, - os::unix::prelude::CommandExt, - path::{Path, PathBuf}, - process::{self, Command, Stdio}, - time::Instant, -}; - -use capctl::prctl; -use nix::{ - mount::{self, MsFlags}, - sched::{unshare, CloneFlags}, - sys::{resource, time::TimeVal, wait}, - unistd::{self, Gid, Uid}, -}; -use serde::Serialize; -use tee_readwrite::{TeeReader, TeeWriter}; - -use common::{error::*, string_hash::*, types::*}; -use walkdir::WalkDir; - -use super::{ - jobserver::Jobserver, - ns, tar, - util::{checkable::Checkable, cjson, fs}, -}; -use crate::{ - paths, - util::{stack::Stack, unix}, -}; - -const BUILD_UID: Uid = Uid::from_raw(1000); -const BUILD_GID: Gid = Gid::from_raw(1000); - -type InputHasher = blake3::Hasher; -type DependencyHasher = blake3::Hasher; -type LayerHasher = blake3::Hasher; -type ArchiveHasher = blake3::Hasher; - -type DependMap = BTreeMap>; - -fn dependency_hash(dep: &Dependency) -> DependencyHash { - DependencyHash(StringHash( - cjson::digest::(dep).unwrap().into(), - )) -} - -fn input_hash(task: &Task) -> InputHash { - #[derive(Debug, Serialize)] - struct HashInput<'a> { - pub command: &'a str, - pub workdir: &'a str, - pub rootfs: &'a ArchiveHash, - pub ancestors: &'a [LayerHash], - pub depends: HashMap, - pub outputs: &'a HashMap, - } - let input = HashInput { - command: &task.command, - workdir: &task.workdir, - rootfs: &task.rootfs, - ancestors: &task.ancestors, - depends: task - .depends - .iter() - .map(|dep| (dependency_hash(dep), dep)) - .collect(), - outputs: &task.outputs, - }; - - InputHash(StringHash( - cjson::digest::(&input).unwrap().into(), - )) -} - -fn init_task(input_hash: &InputHash, task: &Task) -> Result { - // Remove metadata first to ensure task invalidation - fs::ensure_removed(paths::task_cache_filename(input_hash))?; - - let task_state_dir = paths::task_state_dir(input_hash); - - let task_layer_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_LAYER_SUBDIR]); - fs::ensure_removed(&task_layer_dir)?; - fs::mkdir(&task_layer_dir)?; - fs::fixup_permissions(&task_layer_dir)?; - - let task_tmp_dir = paths::task_tmp_dir(input_hash); - - let taskdir = paths::join(&[&task_tmp_dir, paths::TASK_TASKDIR]); - fs::mkdir(&taskdir)?; - let runfile = paths::join(&[&taskdir, paths::TASK_RUN]); - std::fs::write(&runfile, &task.command) - .with_context(|| format!("Failed to write {}", runfile))?; - - let mount_target = paths::join(&[&task_tmp_dir, &task.workdir]); - let mount = if task.ancestors.is_empty() { - fs::mount(task_layer_dir, &mount_target, None, MsFlags::MS_BIND, None) - .with_context(|| format!("Failed to bind mount to {:?}", mount_target))? - } else { - let task_work_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_WORK_SUBDIR]); - fs::ensure_removed(&task_work_dir)?; - fs::mkdir(&task_work_dir)?; - fs::fixup_permissions(&task_work_dir)?; - - let lower = task - .ancestors - .iter() - .rev() - .map(paths::layer_dir) - .collect::>() - .join(":"); - let options = format!( - "xino=off,index=off,metacopy=off,lowerdir={lower},upperdir={upper},workdir={work}", - lower = lower, - upper = task_layer_dir, - work = task_work_dir - ); - fs::mount( - "overlay", - &mount_target, - Some("overlay"), - MsFlags::empty(), - Some(&options), - ) - .with_context(|| format!("Failed to mount work overlay to {:?}", mount_target))? - }; - - Ok(mount) -} - -fn get_contents, P2: AsRef>( - path: P1, - prefix: P2, -) -> Result<(HashSet, HashSet)> { - let mut dirs = HashSet::new(); - let mut files = HashSet::new(); - - let root: PathBuf = Path::new("/").join(prefix.as_ref()); - - for result in WalkDir::new(path.as_ref()).min_depth(1).into_iter() { - let entry = result - .with_context(|| format!("Failed to list contents of directory {:?}", path.as_ref()))?; - let is_dir = entry.file_type().is_dir(); - let entry_path = root.join(entry.into_path().strip_prefix(path.as_ref()).unwrap()); - if is_dir { - dirs.insert(entry_path); - } else { - files.insert(entry_path); - } - } - - dirs.insert(root); - - Ok((dirs, files)) -} - -fn check_conflicts( - dirs1: &HashSet, - files1: &HashSet, - dirs2: &HashSet, - files2: &HashSet, -) -> Result<()> { - let mut conflicts = Vec::new(); - - conflicts.extend(files1.intersection(files2)); - conflicts.extend(dirs1.intersection(files2)); - conflicts.extend(files1.intersection(dirs2)); - - if !conflicts.is_empty() { - let mut conflict_strings: Box<[_]> = conflicts - .into_iter() - .map(|path| path.to_string_lossy().to_string()) - .collect(); - conflict_strings.sort(); - return Err(Error::new(format!( - "Found the following file conflicts in dependencies:\n{}", - conflict_strings.join("\n") - ))); - } - - Ok(()) -} - -fn init_task_rootfs(input_hash: &InputHash, task: &Task) -> Result> { - let task_tmp_dir = paths::task_tmp_dir(input_hash); - let mount_target = paths::join(&[&task_tmp_dir, paths::TASK_TMP_ROOTFS_SUBDIR]); - let rootfs = paths::depend_dir(&task.rootfs); - - let depends = unpack_dependencies(input_hash, task).context("Failed to unpack dependencies")?; - - let mut mounts = Stack::new(); - - mounts.push( - fs::mount(rootfs, &mount_target, None, MsFlags::MS_BIND, None) - .with_context(|| format!("Failed to bind mount rootfs to {:?}", mount_target))?, - ); - mount::mount::( - None, - &mount_target, - None, - MsFlags::MS_REMOUNT | MsFlags::MS_BIND | MsFlags::MS_RDONLY, - None, - ) - .context("Failed to mount container rootfs read-only")?; - - let (mut dirs, mut files) = get_contents(&mount_target, "")?; - - for (path, dep_hashes) in depends { - assert!(!dep_hashes.is_empty()); - - if !path.is_empty() && !path.starts_with('/') { - return Err(Error::new(format!( - "Dependency path {:?} must be absolute", - path - ))); - } - - let dep_target = mount_target.clone() + &path; - let dep_paths: Box<[_]> = dep_hashes.iter().map(paths::depend_dir).collect(); - - for dep in dep_paths.iter() { - let (dep_dirs, dep_files) = get_contents(dep, &path)?; - check_conflicts(&dirs, &files, &dep_dirs, &dep_files)?; - dirs.extend(dep_dirs); - files.extend(dep_files); - } - - let options = format!( - "xino=off,index=off,metacopy=off,lowerdir={lower}:{base}", - lower = dep_paths.join(":"), - base = dep_target, - ); - - mounts.push( - fs::mount( - "overlay", - dep_target.as_str(), - Some("overlay"), - MsFlags::MS_RDONLY, - Some(&options), - ) - .with_context(|| format!("Failed to mount overlay to {:?}", dep_target))?, - ); - } - - Ok(mounts) -} - -fn cleanup_task(input_hash: &InputHash) -> Result<()> { - let task_tmp_dir = paths::task_tmp_dir(input_hash); - - fs::ensure_removed(task_tmp_dir).context("Failed to remove task tmp dir")?; - - let task_state_dir = paths::task_state_dir(input_hash); - let task_work_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_WORK_SUBDIR]); - fs::ensure_removed(task_work_dir).context("Failed to remove overlayfs workdir")?; - - Ok(()) -} - -fn get_archive_filename(task: &Task, hash: &ArchiveHash) -> String { - if let Some(pinned_name) = task.pins.get(hash) { - paths::pinned_archive_filename(pinned_name) - } else { - paths::archive_filename(hash) - } -} - -fn unpack_dependency(task: &Task, hash: &ArchiveHash) -> Result<()> { - let _lock = unix::lock(paths::depend_lock_filename(hash), true, true); - - let filename = get_archive_filename(task, hash); - - let dest = paths::depend_dir(hash); - if Path::new(&dest).is_dir() { - return Ok(()); - } - - (|| -> Result<()> { - let tmp_dest = paths::depend_tmp_dir(hash); - fs::ensure_removed(&tmp_dest)?; - - let file = fs::open(&filename)?; - let hasher = ArchiveHasher::new(); - let buffered_hasher = BufWriter::with_capacity(16 * 1024 * 1024, hasher); - let mut reader = TeeReader::new(file, buffered_hasher, false); - - tar::unpack(&mut reader, &tmp_dest)?; - - // Read file to end to get the correct hash - io::copy(&mut reader, &mut io::sink())?; - - let (_, buffered_hasher) = reader.into_inner(); - let hasher = buffered_hasher.into_inner()?; - - let actual_hash = ArchiveHash(StringHash(hasher.finalize().into())); - - if &actual_hash != hash { - return Err(Error::new(format!( - "Incorrect file hash for {:?} (expected: {}, actual: {})", - filename, hash, actual_hash, - ))); - } - - fs::rename(&tmp_dest, &dest)?; - - Ok(()) - })() - .with_context(|| format!("Failed to unpack {:?}", filename)) -} - -fn unpack_dependencies(input_hash: &InputHash, task: &Task) -> Result { - let task_tmp_dir = paths::task_tmp_dir(input_hash); - - unpack_dependency(task, &task.rootfs)?; - - let mut ret = DependMap::new(); - - for dep in &task.depends { - match dep { - Dependency::Fetch { - name, target_dir, .. - } => { - let path = paths::join(&[&task_tmp_dir, target_dir]); - fs::mkdir(&path)?; - fs::copy( - paths::join(&[paths::DOWNLOADS_DIR, name]), - paths::join(&[&path, name]), - )?; - } - Dependency::Task { output, path } => { - unpack_dependency(task, output)?; - ret.entry(path.clone()).or_default().push(*output); - } - } - } - - Ok(ret) -} - -fn collect_output(input_hash: &InputHash, task: &Task, path: &str) -> Result> { - let source = paths::join(&[&paths::task_tmp_dir(input_hash), path]); - if !Path::new(&source).is_dir() { - return Ok(None); - } - - let filename = paths::archive_tmp_filename(input_hash); - - let hash = (|| -> Result { - let file = fs::create(&filename)?; - let hasher = ArchiveHasher::new(); - let writer = TeeWriter::new(file, hasher); - let mut buffered_writer = BufWriter::with_capacity(16 * 1024 * 1024, writer); - - super::tar::pack(&task.rootfs, &mut buffered_writer, &source)?; - - let writer = buffered_writer.into_inner()?; - let (file, hasher) = writer.into_inner(); - file.sync_all()?; - drop(file); - - Ok(ArchiveHash(StringHash(hasher.finalize().into()))) - })() - .with_context(|| format!("Failed to pack {:?} to {:?}", source, filename))?; - - fs::rename(filename, paths::archive_filename(&hash))?; - - Ok(Some(hash)) -} - -fn collect_outputs(input_hash: &InputHash, task: &Task) -> Result> { - let mut ret = HashMap::new(); - - for (name, path) in &task.outputs { - if let Some(hash) = collect_output(input_hash, task, path)? { - ret.insert(name.clone(), hash); - } - } - - Ok(ret) -} - -fn run_task(input_hash: &InputHash, task: &Task, jobserver: &mut Jobserver) -> Result<()> { - let _workdir_mount = init_task(input_hash, task).context("Failed to initialize task")?; - let _rootfs_mounts = - init_task_rootfs(input_hash, task).context("Failed to initialize task rootfs")?; - - let task_tmp_dir = paths::task_tmp_dir(input_hash); - let rootfs = paths::join(&[&task_tmp_dir, paths::TASK_TMP_ROOTFS_SUBDIR]); - - let builddir_source = paths::join(&[&task_tmp_dir, paths::TASK_BUILDDIR]); - let builddir_target = paths::join(&[&rootfs, paths::TASK_BUILDDIR]); - - let log_filename = paths::task_log_filename(input_hash); - - let mut exec_cmd = || -> Result<()> { - let log = fs::create(&log_filename)?; - - let log_stdout = log - .try_clone() - .context("Failed to duplicate log file descriptor")?; - let log_stderr = log - .try_clone() - .context("Failed to duplicate log file descriptor")?; - - mount::mount::<_, _, str, str>( - Some(paths::DEV_DIR), - paths::join(&[&rootfs, "dev"]).as_str(), - None, - MsFlags::MS_BIND | MsFlags::MS_REC, - None, - ) - .expect("Failed to bind mount /dev directory"); - mount::mount::<_, _, str, str>( - Some(builddir_source.as_str()), - builddir_target.as_str(), - None, - MsFlags::MS_BIND | MsFlags::MS_REC, - None, - ) - .expect("Failed to bind mount build directory"); - - ns::pivot_root(&rootfs); - mount::mount::( - None, - "/", - None, - MsFlags::MS_PRIVATE | MsFlags::MS_REC, - None, - ) - .context("Failed to set MS_PRIVATE for container root")?; - ns::container_mounts().context("Failed to set up container mounts")?; - - unistd::sethostname("rebel-builder").context("Failed to set hostname")?; - - prctl::set_no_new_privs().context("set_no_new_privs()")?; - - unshare(CloneFlags::CLONE_NEWUSER | CloneFlags::CLONE_NEWNS) - .context("Failed to create user namespace")?; - ns::setup_userns(BUILD_UID, BUILD_GID, Uid::from_raw(0), Gid::from_raw(0)); - - jobserver - .set_cloexec(false) - .context("Failed to unset O_CLOEXEC on jobserver pipe")?; - - let err = Command::new("sh") - .args(["-ex", &paths::join(&[paths::TASK_TASKDIR, paths::TASK_RUN])]) - .stdin(Stdio::null()) - .stdout(log_stdout) - .stderr(log_stderr) - .env_clear() - .env("PATH", "/usr/sbin:/usr/bin:/sbin:/bin") - .env("HOME", "/build") - .env("MAKEFLAGS", jobserver.to_makeflags()) - .exec(); - eprintln!("{}", err); - process::exit(127); - }; - - let pid = unsafe { - ns::spawn( - CloneFlags::CLONE_NEWNS - | CloneFlags::CLONE_NEWPID - | CloneFlags::CLONE_NEWIPC - | CloneFlags::CLONE_NEWNET - | CloneFlags::CLONE_NEWUTS, - || exec_cmd().unwrap(), - ) - } - .context("Failed to run task container")?; - - let status = wait::waitpid(pid, None)?; - - if let Err(err) = status.check() { - return Err(Error::new(format!( - "Task failed: {}\nOutput: {}", - err, log_filename - ))); - } - - Ok(()) -} - -fn hash_layer(input_hash: &InputHash, task: &Task) -> Result> { - let task_state_dir = paths::task_state_dir(input_hash); - let task_layer_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_LAYER_SUBDIR]); - - (|| -> Result> { - if fs::is_dir_empty(&task_layer_dir)? { - return Ok(None); - } - - let hasher = LayerHasher::new(); - let mut buffered_writer = BufWriter::with_capacity(16 * 1024 * 1024, hasher); - - tar::pack(&task.rootfs, &mut buffered_writer, &task_layer_dir)?; - - let hasher = buffered_writer.into_inner()?; - Ok(Some(LayerHash(StringHash(hasher.finalize().into())))) - })() - .with_context(|| format!("Failed to hash layer directory {:?}", task_layer_dir)) -} - -fn move_layer(input_hash: &InputHash, hash: &Option) -> Result<()> { - let task_state_dir = paths::task_state_dir(input_hash); - let task_layer_dir = paths::join(&[&task_state_dir, paths::TASK_STATE_LAYER_SUBDIR]); - - if let Some(hash) = hash { - let layer_dir = paths::layer_dir(hash); - - let err = match std::fs::rename(&task_layer_dir, &layer_dir) { - Ok(_) => return Ok(()), - Err(err) => err, - }; - - if !matches!( - err.raw_os_error(), - Some(libc::EEXIST) | Some(libc::ENOTEMPTY) - ) { - return Err(err).with_context(|| { - format!("Failed to rename {:?} to {:?}", task_layer_dir, layer_dir) - }); - } - } - - fs::ensure_removed(&task_layer_dir) -} - -fn run_and_hash_task( - input_hash: &InputHash, - task: &Task, - jobserver: &mut Jobserver, -) -> Result { - run_task(input_hash, task, jobserver)?; - - let outputs = collect_outputs(input_hash, task)?; - - let layer = hash_layer(input_hash, task)?; - move_layer(input_hash, &layer)?; - - Ok(TaskOutput { - input_hash: Some(*input_hash), - layer, - outputs, - }) -} - -fn load_cached(input_hash: &InputHash) -> Result { - let filename = paths::task_cache_filename(input_hash); - let file = fs::open(&filename)?; - - serde_json::from_reader(file) - .with_context(|| format!("Failed to read task cache data from {}", filename)) -} - -fn save_cached(input_hash: &InputHash, output: &TaskOutput) -> Result<()> { - fs::mkdir(paths::task_state_dir(input_hash))?; - - let tmp_filename = paths::task_cache_tmp_filename(input_hash); - let filename = paths::task_cache_filename(input_hash); - - cjson::to_file(&tmp_filename, output) - .with_context(|| format!("Failed to write task cache data to {}", tmp_filename))?; - - fs::rename(tmp_filename, filename)?; - - Ok(()) -} - -trait AsSecsF32 { - fn as_secs_f32(&self) -> f32; -} - -impl AsSecsF32 for TimeVal { - fn as_secs_f32(&self) -> f32 { - self.tv_sec() as f32 + 1e-6 * (self.tv_usec() as f32) - } -} - -fn get_usage(total: f32) -> String { - let usage = resource::getrusage(resource::UsageWho::RUSAGE_CHILDREN).expect("getrusage()"); - - let user = usage.user_time().as_secs_f32(); - let system = usage.system_time().as_secs_f32(); - let cpu = (100.0 * (user + system) / total).round(); - - format!("{user:.2}s user {system:.2}s system {cpu:.0}% cpu") -} - -pub fn handle(task: Task, jobserver: &mut Jobserver) -> Result { - let input_hash = input_hash(&task); - - let _lock = unix::lock(paths::task_lock_filename(&input_hash), true, true) - .context("Failed to get task lock")?; - - let cached_output = load_cached(&input_hash); - if !task.force_run { - if let Ok(task_output) = cached_output { - return Ok(task_output); - } - } - - let token = jobserver.wait(); - - let start_time = Instant::now(); - println!("Starting task {} ({})", task.label, input_hash); - - let task_ret = run_and_hash_task(&input_hash, &task, jobserver); - let cleanup_ret = cleanup_task(&input_hash); - - jobserver.post(token); - - let task_output = task_ret?; - cleanup_ret.context("Failed to clean up after task")?; - - save_cached(&input_hash, &task_output)?; - - let duration = Instant::now().duration_since(start_time).as_secs_f32(); - let usage = get_usage(duration); - println!( - "Finished task {} ({}) in {:.2}s ({})", - task.label, input_hash, duration, usage, - ); - - if let Ok(cached_output) = cached_output { - if cached_output.outputs != task_output.outputs { - println!( - "WARNING: Output hashes for task {} do not match cached result\n Cached output: {}\n New output: {}", - task.label, - cjson::to_string(&cached_output.outputs).unwrap(), - cjson::to_string(&task_output.outputs).unwrap(), - ); - } - } - - Ok(task_output) -} diff --git a/crates/runner/src/util/checkable.rs b/crates/runner/src/util/checkable.rs deleted file mode 100644 index 8528d29..0000000 --- a/crates/runner/src/util/checkable.rs +++ /dev/null @@ -1,37 +0,0 @@ -use std::{ - io::{Error, ErrorKind, Result}, - process::ExitStatus, -}; - -use nix::sys::wait; - -pub trait Checkable { - fn check(&self) -> Result<()>; -} - -impl Checkable for ExitStatus { - fn check(&self) -> Result<()> { - if self.success() { - Ok(()) - } else { - Err(Error::new( - ErrorKind::Other, - format!("Process exited with {}", self), - )) - } - } -} - -impl Checkable for wait::WaitStatus { - fn check(&self) -> Result<()> { - let message = match self { - wait::WaitStatus::Exited(_, 0) => return Ok(()), - wait::WaitStatus::Exited(_, code) => format!("Process exited with exit code: {}", code), - wait::WaitStatus::Signaled(_, signal, _) => { - format!("Process exited with signal: {}", signal) - } - _ => format!("Process in unexpected status: {:?}", self), - }; - Err(Error::new(ErrorKind::Other, message)) - } -} diff --git a/crates/runner/src/util/cjson.rs b/crates/runner/src/util/cjson.rs deleted file mode 100644 index e3840ce..0000000 --- a/crates/runner/src/util/cjson.rs +++ /dev/null @@ -1,37 +0,0 @@ -use std::{ - fs::File, - io::{self, Write}, - path::Path, -}; - -use digest::{self, Digest}; -use olpc_cjson::CanonicalFormatter; -use serde::Serialize; -use serde_json::error::Result; - -pub fn new_serializer(writer: W) -> serde_json::Serializer { - serde_json::Serializer::with_formatter(writer, CanonicalFormatter::new()) -} - -pub fn to_writer(writer: W, value: &T) -> Result<()> { - let mut ser = new_serializer(writer); - value.serialize(&mut ser) -} - -pub fn to_file, T: ?Sized + Serialize>(path: P, value: &T) -> io::Result<()> { - let file = File::create(path)?; - to_writer(&file, value)?; - file.sync_all() -} - -pub fn to_string(value: &T) -> Result { - let mut ret = Vec::new(); - to_writer(&mut ret, value)?; - Ok(String::from_utf8(ret).unwrap()) -} - -pub fn digest(value: &T) -> Result> { - let mut digest = ::new(); - to_writer(&mut digest, value)?; - Ok(digest.finalize()) -} diff --git a/crates/runner/src/util/clone.rs b/crates/runner/src/util/clone.rs deleted file mode 100644 index 51a31c3..0000000 --- a/crates/runner/src/util/clone.rs +++ /dev/null @@ -1,59 +0,0 @@ -use std::{mem, process}; - -use nix::{ - errno, sched, - unistd::{self, Pid}, -}; - -#[repr(C)] -#[derive(Debug, Default)] -struct CloneArgs { - flags: u64, - pidfd: u64, - child_tid: u64, - parent_tid: u64, - exit_signal: u64, - stack: u64, - stack_size: u64, - tls: u64, -} - -pub unsafe fn clone(flags: sched::CloneFlags) -> nix::Result { - let mut args = CloneArgs { - flags: flags.bits() as u64, - exit_signal: libc::SIGCHLD as u64, - ..CloneArgs::default() - }; - let size = mem::size_of_val(&args) as libc::size_t; - - let pid = libc::syscall(libc::SYS_clone3, &mut args, size); - - #[allow(clippy::comparison_chain)] - if pid < 0 { - Err(errno::Errno::last()) - } else if pid == 0 { - Ok(unistd::ForkResult::Child) - } else { - Ok(unistd::ForkResult::Parent { - child: Pid::from_raw(pid as libc::pid_t), - }) - } -} - -pub unsafe fn spawn(flags: Option, f: F) -> nix::Result -where - F: FnOnce(), -{ - let res = if let Some(flags) = flags { - clone(flags) - } else { - unistd::fork() - }; - match res? { - unistd::ForkResult::Parent { child } => Ok(child), - unistd::ForkResult::Child => { - f(); - process::exit(0) - } - } -} diff --git a/crates/runner/src/util/fs.rs b/crates/runner/src/util/fs.rs deleted file mode 100644 index 5efd159..0000000 --- a/crates/runner/src/util/fs.rs +++ /dev/null @@ -1,127 +0,0 @@ -use std::{ - fs::{self, File}, - io, - os::unix::prelude::*, - path::{Path, PathBuf}, -}; - -use nix::{ - fcntl::OFlag, - mount::{self, MsFlags}, - unistd, -}; - -use common::error::*; - -pub fn open>(path: P) -> Result { - fs::File::open(path.as_ref()) - .with_context(|| format!("Failed to open file {:?} for reading", path.as_ref())) -} - -pub fn create>(path: P) -> Result { - fs::File::create(path.as_ref()) - .with_context(|| format!("Failed to open file {:?} for writing", path.as_ref())) -} - -pub fn rename, P2: AsRef>(from: P1, to: P2) -> Result<()> { - fs::rename(from.as_ref(), to.as_ref()) - .with_context(|| format!("Failed to rename {:?} to {:?}", from.as_ref(), to.as_ref())) -} - -// Unlike fs::copy, this doesn't preserve file flags -pub fn copy, P2: AsRef>(from: P1, to: P2) -> Result<()> { - (|| -> Result<()> { - let mut src = open(from.as_ref())?; - let mut dest = create(to.as_ref())?; - io::copy(&mut src, &mut dest)?; - dest.sync_all()?; - Ok(()) - })() - .with_context(|| format!("Failed to copy {:?} to {:?}", from.as_ref(), to.as_ref())) -} - -pub fn mkdir>(path: P) -> Result<()> { - let mut builder = fs::DirBuilder::new(); - builder.recursive(true); - builder - .create(path.as_ref()) - .with_context(|| format!("Failed to create directory {:?}", path.as_ref())) -} - -pub fn ensure_removed>(path: P) -> Result<()> { - let result = if path.as_ref().is_dir() { - fs::remove_dir_all(path.as_ref()) - } else { - fs::remove_file(path.as_ref()) - }; - result - .or_else(|err| match err.kind() { - io::ErrorKind::NotFound => Ok(()), - _ => Err(err), - }) - .with_context(|| format!("Failed to delete {:?}", path.as_ref())) -} - -pub fn is_dir_empty>(path: P) -> Result { - Ok(fs::read_dir(path)?.next().is_none()) -} - -/// Fixes up weirdness of set-group-ID or bsdgroups -pub fn fixup_permissions>(path: P) -> Result<()> { - let path = path.as_ref(); - let gid = unistd::getegid(); - - let metadata = path - .metadata() - .with_context(|| format!("Failed to get metadata of {:?}", path))?; - - if metadata.gid() != gid.as_raw() { - unistd::chown(path, None, Some(gid)) - .with_context(|| format!("Failed to set group of {:?}", path))?; - } - - let mut perms = metadata.permissions(); - let mode = perms.mode(); - if (mode & 0o777) != mode { - perms.set_mode(mode & 0o777); - std::fs::set_permissions(path, perms) - .with_context(|| format!("Failed to set mode of {:?}", path))?; - } - - Ok(()) -} - -#[must_use] -pub struct Mount(PathBuf); - -impl Drop for Mount { - fn drop(&mut self) { - mount::umount(&self.0) - .with_context(|| format!("Failed to unmount {:?}", self.0)) - .unwrap(); - } -} - -pub fn mount, P2: AsRef>( - source: P1, - target: P2, - fstype: Option<&str>, - flags: MsFlags, - data: Option<&str>, -) -> Result { - mkdir(target.as_ref()).with_context(|| format!("Failed to create {:?}", target.as_ref()))?; - - let canon_target = target - .as_ref() - .canonicalize() - .with_context(|| format!("Failed to get absolute path for {:?}", target.as_ref()))?; - mount::mount(Some(source.as_ref()), &canon_target, fstype, flags, data) - .with_context(|| format!("Failed to mount {:?}", canon_target))?; - Ok(Mount(canon_target)) -} - -pub fn pipe() -> Result<(File, File)> { - unistd::pipe2(OFlag::O_CLOEXEC) - .context("pipe2()") - .map(|(piper, pipew)| (File::from(piper), File::from(pipew))) -} diff --git a/crates/runner/src/util/mod.rs b/crates/runner/src/util/mod.rs deleted file mode 100644 index 0fbe3b5..0000000 --- a/crates/runner/src/util/mod.rs +++ /dev/null @@ -1,7 +0,0 @@ -pub mod checkable; -pub mod cjson; -pub mod clone; -pub mod fs; -pub mod stack; -pub mod steal; -pub mod unix; diff --git a/crates/runner/src/util/stack.rs b/crates/runner/src/util/stack.rs deleted file mode 100644 index 15d5daf..0000000 --- a/crates/runner/src/util/stack.rs +++ /dev/null @@ -1,25 +0,0 @@ -use std::mem; - -/// Simple inefficient datastructure with guaranteed drop order -#[derive(Debug)] -pub enum Stack { - Cons(Box<(T, Stack)>), - Empty, -} - -impl Default for Stack { - fn default() -> Self { - Self::Empty - } -} - -impl Stack { - pub fn new() -> Self { - Self::Empty - } - - pub fn push(&mut self, value: T) { - let tmp = mem::take(self); - *self = Stack::Cons(Box::new((value, tmp))); - } -} diff --git a/crates/runner/src/util/steal.rs b/crates/runner/src/util/steal.rs deleted file mode 100644 index 91b2cdf..0000000 --- a/crates/runner/src/util/steal.rs +++ /dev/null @@ -1,40 +0,0 @@ -use std::ops::{Deref, DerefMut}; - -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] -pub struct Steal(pub Option); - -impl Steal { - pub fn new(value: T) -> Steal { - Steal(Some(value)) - } - - pub fn steal(&mut self) -> T { - self.0 - .take() - .expect("Attempted to steal already stoken value") - } -} - -impl From for Steal { - fn from(value: T) -> Self { - Steal::new(value) - } -} - -impl Deref for Steal { - type Target = T; - - fn deref(&self) -> &Self::Target { - self.0 - .as_ref() - .expect("Attempted to dereference stolen value") - } -} - -impl DerefMut for Steal { - fn deref_mut(&mut self) -> &mut Self::Target { - self.0 - .as_mut() - .expect("Attempted to dereference stolen value") - } -} diff --git a/crates/runner/src/util/unix.rs b/crates/runner/src/util/unix.rs deleted file mode 100644 index 08884ec..0000000 --- a/crates/runner/src/util/unix.rs +++ /dev/null @@ -1,86 +0,0 @@ -use std::{fs::File, os::unix::prelude::*, path::Path}; - -use nix::{ - fcntl::{self, FcntlArg, FdFlag, Flock, OFlag}, - sched, - unistd::Pid, -}; - -use common::error::*; - -use super::fs; - -pub fn set_blocking(fd: &Fd, blocking: bool) -> Result<()> { - let raw_fd = fd.as_fd().as_raw_fd(); - - let flags = - OFlag::from_bits_retain(fcntl::fcntl(raw_fd, FcntlArg::F_GETFL).context("fcntl(F_GETFL)")?); - - let new_flags = if blocking { - flags & !OFlag::O_NONBLOCK - } else { - flags | OFlag::O_NONBLOCK - }; - - if new_flags != flags { - fcntl::fcntl(raw_fd, FcntlArg::F_SETFL(new_flags)).context("fcntl(F_SETFL)")?; - } - - Ok(()) -} - -pub fn set_cloexec(fd: &Fd, cloexec: bool) -> Result<()> { - let raw_fd = fd.as_fd().as_raw_fd(); - - let flags = FdFlag::from_bits_retain( - fcntl::fcntl(raw_fd, FcntlArg::F_GETFD).context("fcntl(F_GETFD)")?, - ); - - let new_flags = if cloexec { - flags | FdFlag::FD_CLOEXEC - } else { - flags & !FdFlag::FD_CLOEXEC - }; - - if new_flags != flags { - fcntl::fcntl(raw_fd, FcntlArg::F_SETFD(new_flags)).context("fcntl(F_SETFD)")?; - } - - Ok(()) -} - -pub fn nproc() -> Result { - const MAXCPU: usize = sched::CpuSet::count(); - - let affinity = sched::sched_getaffinity(Pid::from_raw(0)).context("sched_getaffinity()")?; - - let mut count = 0; - - for cpu in 0..MAXCPU { - if affinity.is_set(cpu).unwrap() { - count += 1; - } - } - - Ok(count) -} - -pub fn lock>(path: P, exclusive: bool, blocking: bool) -> Result> { - use fcntl::FlockArg::*; - - if let Some(parent) = path.as_ref().parent() { - fs::mkdir(parent)?; - } - - let arg = match (exclusive, blocking) { - (true, true) => LockExclusive, - (true, false) => LockExclusiveNonblock, - (false, true) => LockShared, - (false, false) => LockSharedNonblock, - }; - - let file = fs::create(path.as_ref())?; - fcntl::Flock::lock(file, arg) - .map_err(|(_, errno)| errno) - .with_context(|| format!("flock failed on {:?}", path.as_ref())) -} -- cgit v1.2.3