Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
314 changes: 272 additions & 42 deletions Cargo.lock

Large diffs are not rendered by default.

1,340 changes: 1,137 additions & 203 deletions Cargo.nix

Large diffs are not rendered by default.

8 changes: 5 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[workspace]
members = ["rust/crd", "rust/operator-binary"]
members = ["rust/operator-binary"]
resolver = "2"

[workspace.package]
Expand All @@ -10,6 +10,10 @@ edition = "2021"
repository = "https://github.com/stackabletech/nifi-operator"

[workspace.dependencies]
stackable-versioned = { git = "https://github.com/stackabletech/operator-rs.git", features = ["k8s"], tag = "stackable-versioned-0.5.0" }
stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-operator-0.85.0" }
product-config = { git = "https://github.com/stackabletech/product-config.git", tag = "0.7.0" }

anyhow = "1.0"
built = { version = "0.7", features = ["chrono", "git2"] }
clap = "4.5"
Expand All @@ -18,15 +22,13 @@ fnv = "1.0"
futures = { version = "0.3", features = ["compat"] }
indoc = "2.0"
pin-project = "1.1"
product-config = { git = "https://github.com/stackabletech/product-config.git", tag = "0.7.0" }
rand = "0.8"
rstest = "0.24"
semver = "1.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_yaml = "0.9"
snafu = "0.8"
stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-operator-0.85.0" }
strum = { version = "0.26", features = ["derive"] }
tokio = { version = "1.40", features = ["full"] }
tracing = "0.1"
Expand Down
3 changes: 3 additions & 0 deletions crate-hashes.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion deploy/helm/nifi-operator/crds/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ spec:
name: v1alpha1
schema:
openAPIV3Schema:
description: Auto-generated derived type for NifiSpec via `CustomResource`
description: Auto-generated derived type for NifiClusterSpec via `CustomResource`
properties:
spec:
description: A NiFi cluster stacklet. This resource is managed by the Stackable operator for Apache NiFi. Find more information on how to use it and the resources that the operator generates in the [operator documentation](https://docs.stackable.tech/home/nightly/nifi/).
Expand Down
22 changes: 0 additions & 22 deletions rust/crd/Cargo.toml

This file was deleted.

6 changes: 3 additions & 3 deletions rust/operator-binary/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ repository.workspace = true
publish = false

[dependencies]
stackable-nifi-crd = { path = "../crd" }
stackable-versioned.workspace = true
stackable-operator.workspace = true
product-config.workspace = true

anyhow.workspace = true
clap.workspace = true
Expand All @@ -23,8 +25,6 @@ semver.workspace = true
serde.workspace = true
serde_json.workspace = true
snafu.workspace = true
stackable-operator.workspace = true
product-config.workspace = true
strum.workspace = true
tokio.workspace = true
tracing.workspace = true
Expand Down
6 changes: 4 additions & 2 deletions rust/operator-binary/src/config/jvm.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use snafu::{OptionExt, ResultExt, Snafu};
use stackable_nifi_crd::{NifiConfig, NifiConfigFragment};
use stackable_operator::{
memory::{BinaryMultiple, MemoryQuantity},
role_utils::{self, GenericRoleConfig, JavaCommonConfig, JvmArgumentOverrides, Role},
};

use super::{JVM_SECURITY_PROPERTIES_FILE, NIFI_CONFIG_DIRECTORY};
use crate::{
config::{JVM_SECURITY_PROPERTIES_FILE, NIFI_CONFIG_DIRECTORY},
crd::{NifiConfig, NifiConfigFragment},
};

// Part of memory resources allocated for Java heap
const JAVA_HEAP_FACTOR: f32 = 0.8;
Expand Down
18 changes: 9 additions & 9 deletions rust/operator-binary/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@ use std::{
use jvm::build_merged_jvm_config;
use product_config::{types::PropertyNameKind, ProductConfigManager};
use snafu::{ResultExt, Snafu};
use stackable_nifi_crd::{
NifiCluster, NifiConfig, NifiConfigFragment, NifiRole, NifiSpec, NifiStorageConfig, HTTPS_PORT,
PROTOCOL_PORT,
};
use stackable_operator::{
commons::resources::Resources,
memory::MemoryQuantity,
Expand All @@ -22,6 +18,10 @@ use stackable_operator::{
use strum::{Display, EnumIter};

use crate::{
crd::{
v1alpha1, NifiConfig, NifiConfigFragment, NifiRole, NifiStorageConfig, HTTPS_PORT,
PROTOCOL_PORT,
},
operations::graceful_shutdown::graceful_shutdown_config_properties,
security::{
authentication::{
Expand Down Expand Up @@ -135,7 +135,7 @@ pub fn build_bootstrap_conf(

/// Create the NiFi nifi.properties
pub fn build_nifi_properties(
spec: &NifiSpec,
spec: &v1alpha1::NifiClusterSpec,
resource_config: &Resources<NifiStorageConfig>,
proxy_hosts: &str,
auth_config: &NifiAuthenticationConfig,
Expand Down Expand Up @@ -616,7 +616,7 @@ pub fn build_state_management_xml() -> String {
/// * `product_config` - The product config to validate and complement the user config.
///
pub fn validated_product_config(
resource: &NifiCluster,
resource: &v1alpha1::NifiCluster,
version: &str,
role: &Role<NifiConfigFragment, GenericRoleConfig, JavaCommonConfig>,
product_config: &ProductConfigManager,
Expand Down Expand Up @@ -667,10 +667,9 @@ fn storage_quantity_to_nifi(quantity: MemoryQuantity) -> String {
#[cfg(test)]
mod tests {
use indoc::indoc;
use stackable_nifi_crd::NifiCluster;

use super::*;
use crate::config::build_bootstrap_conf;
use crate::{config::build_bootstrap_conf, crd::v1alpha1};

#[test]
fn test_build_bootstrap_conf_defaults() {
Expand Down Expand Up @@ -792,7 +791,8 @@ mod tests {
}

fn construct_bootstrap_conf(nifi_cluster: &str) -> String {
let nifi: NifiCluster = serde_yaml::from_str(nifi_cluster).expect("illegal test input");
let nifi: v1alpha1::NifiCluster =
serde_yaml::from_str(nifi_cluster).expect("illegal test input");

let nifi_role = NifiRole::Node;
let role = nifi.spec.nodes.as_ref().unwrap();
Expand Down
58 changes: 30 additions & 28 deletions rust/operator-binary/src/controller.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//! Ensures that `Pod`s are configured and running for each [`NifiCluster`]
//! Ensures that `Pod`s are configured and running for each [`v1alpha1::NifiCluster`].

use std::{
borrow::Cow,
collections::{BTreeMap, HashMap, HashSet},
Expand All @@ -13,12 +14,6 @@ use product_config::{
ProductConfigManager,
};
use snafu::{OptionExt, ResultExt, Snafu};
use stackable_nifi_crd::{
authentication::AuthenticationClassResolved, Container, CurrentlySupportedListenerClasses,
NifiCluster, NifiConfig, NifiConfigFragment, NifiRole, NifiStatus, APP_NAME, BALANCE_PORT,
BALANCE_PORT_NAME, HTTPS_PORT, HTTPS_PORT_NAME, METRICS_PORT, METRICS_PORT_NAME, PROTOCOL_PORT,
PROTOCOL_PORT_NAME, STACKABLE_LOG_CONFIG_DIR, STACKABLE_LOG_DIR,
};
use stackable_operator::{
builder::{
self,
Expand Down Expand Up @@ -84,6 +79,13 @@ use crate::{
validated_product_config, NifiRepository, JVM_SECURITY_PROPERTIES_FILE,
NIFI_BOOTSTRAP_CONF, NIFI_CONFIG_DIRECTORY, NIFI_PROPERTIES, NIFI_STATE_MANAGEMENT_XML,
},
crd::{
authentication::AuthenticationClassResolved, v1alpha1, Container,
CurrentlySupportedListenerClasses, NifiConfig, NifiConfigFragment, NifiRole, NifiStatus,
APP_NAME, BALANCE_PORT, BALANCE_PORT_NAME, HTTPS_PORT, HTTPS_PORT_NAME, METRICS_PORT,
METRICS_PORT_NAME, PROTOCOL_PORT, PROTOCOL_PORT_NAME, STACKABLE_LOG_CONFIG_DIR,
STACKABLE_LOG_DIR,
},
operations::{graceful_shutdown::add_graceful_shutdown_config, pdb::add_pdbs},
product_logging::{extend_role_group_config_map, resolve_vector_aggregator_address},
reporting_task::{self, build_maybe_reporting_task, build_reporting_task_service_name},
Expand Down Expand Up @@ -159,13 +161,13 @@ pub enum Error {
#[snafu(display("failed to apply Service for {}", rolegroup))]
ApplyRoleGroupService {
source: stackable_operator::cluster_resources::Error,
rolegroup: RoleGroupRef<NifiCluster>,
rolegroup: RoleGroupRef<v1alpha1::NifiCluster>,
},

#[snafu(display("failed to build ConfigMap for {}", rolegroup))]
BuildRoleGroupConfig {
source: stackable_operator::builder::configmap::Error,
rolegroup: RoleGroupRef<NifiCluster>,
rolegroup: RoleGroupRef<v1alpha1::NifiCluster>,
},

#[snafu(display("object has no nodes defined"))]
Expand All @@ -174,13 +176,13 @@ pub enum Error {
#[snafu(display("failed to apply ConfigMap for {}", rolegroup))]
ApplyRoleGroupConfig {
source: stackable_operator::cluster_resources::Error,
rolegroup: RoleGroupRef<NifiCluster>,
rolegroup: RoleGroupRef<v1alpha1::NifiCluster>,
},

#[snafu(display("failed to apply StatefulSet for {}", rolegroup))]
ApplyRoleGroupStatefulSet {
source: stackable_operator::cluster_resources::Error,
rolegroup: RoleGroupRef<NifiCluster>,
rolegroup: RoleGroupRef<v1alpha1::NifiCluster>,
},

#[snafu(display("failed to apply create ReportingTask service"))]
Expand Down Expand Up @@ -210,7 +212,7 @@ pub enum Error {
#[snafu(display("Failed to find any nodes in cluster {obj_ref}",))]
MissingNodes {
source: stackable_operator::client::Error,
obj_ref: ObjectRef<NifiCluster>,
obj_ref: ObjectRef<v1alpha1::NifiCluster>,
},

#[snafu(display("Failed to find service {obj_ref}"))]
Expand All @@ -235,7 +237,7 @@ pub enum Error {
BuildProductConfig {
#[snafu(source(from(config::Error, Box::new)))]
source: Box<config::Error>,
rolegroup: RoleGroupRef<NifiCluster>,
rolegroup: RoleGroupRef<v1alpha1::NifiCluster>,
},

#[snafu(display("illegal container name: [{container_name}]"))]
Expand All @@ -247,11 +249,11 @@ pub enum Error {
#[snafu(display("failed to validate resources for {rolegroup}"))]
ResourceValidation {
source: fragment::ValidationError,
rolegroup: RoleGroupRef<NifiCluster>,
rolegroup: RoleGroupRef<v1alpha1::NifiCluster>,
},

#[snafu(display("failed to resolve and merge config for role and role group"))]
FailedToResolveConfig { source: stackable_nifi_crd::Error },
FailedToResolveConfig { source: crate::crd::Error },

#[snafu(display("failed to resolve the Vector aggregator address"))]
ResolveVectorAggregatorAddress {
Expand Down Expand Up @@ -295,7 +297,7 @@ pub enum Error {

#[snafu(display("Failed to resolve NiFi Authentication Configuration"))]
FailedResolveNifiAuthenticationConfig {
source: stackable_nifi_crd::authentication::Error,
source: crate::crd::authentication::Error,
},

#[snafu(display("failed to create PodDisruptionBudget"))]
Expand Down Expand Up @@ -365,7 +367,7 @@ pub enum VersionChangeState {
}

pub async fn reconcile_nifi(
nifi: Arc<DeserializeGuard<NifiCluster>>,
nifi: Arc<DeserializeGuard<v1alpha1::NifiCluster>>,
ctx: Arc<Ctx>,
) -> Result<Action> {
tracing::info!("Starting reconcile");
Expand Down Expand Up @@ -686,7 +688,7 @@ pub async fn reconcile_nifi(
/// The node-role service is the primary endpoint that should be used by clients that do not
/// perform internal load balancing including targets outside of the cluster.
pub fn build_node_role_service(
nifi: &NifiCluster,
nifi: &v1alpha1::NifiCluster,
resolved_product_image: &ResolvedProductImage,
) -> Result<Service> {
let role_name = NifiRole::Node.to_string();
Expand Down Expand Up @@ -732,11 +734,11 @@ pub fn build_node_role_service(
/// The rolegroup [`ConfigMap`] configures the rolegroup based on the configuration given by the administrator
#[allow(clippy::too_many_arguments)]
async fn build_node_rolegroup_config_map(
nifi: &NifiCluster,
nifi: &v1alpha1::NifiCluster,
resolved_product_image: &ResolvedProductImage,
nifi_auth_config: &NifiAuthenticationConfig,
role: &Role<NifiConfigFragment, GenericRoleConfig, JavaCommonConfig>,
rolegroup: &RoleGroupRef<NifiCluster>,
rolegroup: &RoleGroupRef<v1alpha1::NifiCluster>,
rolegroup_config: &HashMap<PropertyNameKind, BTreeMap<String, String>>,
merged_config: &NifiConfig,
vector_aggregator_address: Option<&str>,
Expand Down Expand Up @@ -846,9 +848,9 @@ async fn build_node_rolegroup_config_map(
///
/// This is mostly useful for internal communication between peers, or for clients that perform client-side load balancing.
fn build_node_rolegroup_service(
nifi: &NifiCluster,
nifi: &v1alpha1::NifiCluster,
resolved_product_image: &ResolvedProductImage,
rolegroup: &RoleGroupRef<NifiCluster>,
rolegroup: &RoleGroupRef<v1alpha1::NifiCluster>,
) -> Result<Service> {
Ok(Service {
metadata: ObjectMetaBuilder::new()
Expand Down Expand Up @@ -903,10 +905,10 @@ const USERDATA_MOUNTPOINT: &str = "/stackable/userdata";
/// corresponding [`Service`] (from [`build_node_rolegroup_service`]).
#[allow(clippy::too_many_arguments)]
async fn build_node_rolegroup_statefulset(
nifi: &NifiCluster,
nifi: &v1alpha1::NifiCluster,
resolved_product_image: &ResolvedProductImage,
cluster_info: &KubernetesClusterInfo,
rolegroup_ref: &RoleGroupRef<NifiCluster>,
rolegroup_ref: &RoleGroupRef<v1alpha1::NifiCluster>,
role: &Role<NifiConfigFragment, GenericRoleConfig, JavaCommonConfig>,
rolegroup_config: &HashMap<PropertyNameKind, BTreeMap<String, String>>,
merged_config: &NifiConfig,
Expand Down Expand Up @@ -1476,7 +1478,7 @@ fn zookeeper_env_var(name: &str, configmap_name: &str) -> EnvVar {

async fn get_proxy_hosts(
client: &Client,
nifi: &NifiCluster,
nifi: &v1alpha1::NifiCluster,
nifi_service: &Service,
) -> Result<String> {
let host_header_check = nifi.spec.cluster_config.host_header_check.clone();
Expand Down Expand Up @@ -1541,7 +1543,7 @@ async fn get_proxy_hosts(
}

pub fn error_policy(
_obj: Arc<DeserializeGuard<NifiCluster>>,
_obj: Arc<DeserializeGuard<v1alpha1::NifiCluster>>,
error: &Error,
_ctx: Arc<Ctx>,
) -> Action {
Expand All @@ -1554,11 +1556,11 @@ pub fn error_policy(
}

pub fn build_recommended_labels<'a>(
owner: &'a NifiCluster,
owner: &'a v1alpha1::NifiCluster,
app_version: &'a str,
role: &'a str,
role_group: &'a str,
) -> ObjectLabels<'a, NifiCluster> {
) -> ObjectLabels<'a, v1alpha1::NifiCluster> {
ObjectLabels {
owner,
app_name: APP_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use stackable_operator::{
k8s_openapi::api::core::v1::PodAntiAffinity,
};

use crate::{NifiRole, APP_NAME};
use crate::crd::{NifiRole, APP_NAME};

pub fn get_affinity(cluster_name: &str, role: &NifiRole) -> StackableAffinityFragment {
StackableAffinityFragment {
Expand Down Expand Up @@ -32,7 +32,7 @@ mod tests {
};

use super::*;
use crate::NifiCluster;
use crate::crd::v1alpha1;

#[test]
fn test_affinity_defaults() {
Expand All @@ -57,7 +57,7 @@ mod tests {
replicas: 1
"#;
let deserializer = serde_yaml::Deserializer::from_str(input);
let nifi: NifiCluster =
let nifi: v1alpha1::NifiCluster =
serde_yaml::with::singleton_map_recursive::deserialize(deserializer).unwrap();
let merged_config = nifi.merged_config(&NifiRole::Node, "default").unwrap();

Expand Down
Loading