diff --git a/docs/docs/resources/architecture/components-hierarchy.md b/docs/docs/resources/architecture/components-hierarchy.md index 9a86b9320..f22126c42 100644 --- a/docs/docs/resources/architecture/components-hierarchy.md +++ b/docs/docs/resources/architecture/components-hierarchy.md @@ -5,6 +5,7 @@ flowchart BT StreamsBootstrap --> HelmApp StreamsApp --> StreamsBootstrap ProducerApp --> StreamsBootstrap + ConsumerApp --> StreamsBootstrap KafkaConnector --> PipelineComponent KafkaSourceConnector --> KafkaConnector KafkaSinkConnector --> KafkaConnector @@ -14,6 +15,7 @@ flowchart BT click StreamsBootstrap "./../streams-bootstrap" click StreamsApp "./../streams-app" click ProducerApp "./../producer-app" + click ConsumerApp "./../consumer-app" click KafkaConnector "./../kafka-connector" click KafkaSourceConnector "./../kafka-source-connector" click KafkaSinkConnector "./../kafka-sink-connector" diff --git a/docs/docs/resources/pipeline-components/consumer-app.yaml b/docs/docs/resources/pipeline-components/consumer-app.yaml new file mode 100644 index 000000000..5d62d8020 --- /dev/null +++ b/docs/docs/resources/pipeline-components/consumer-app.yaml @@ -0,0 +1,95 @@ +# Holds configuration to use as values for the streams bootstrap consumer-app Helm +# chart. +# More documentation on ConsumerApp: +# https://github.com/bakdata/streams-bootstrap +- type: consumer-app + name: consumer-app # required + # Pipeline prefix that will prefix every component name. If you wish to not + # have any prefix you can specify an empty string. + prefix: ${pipeline.name}- + from: # Must not be null + topics: # read from topic + ${pipeline.name}-input-topic: + type: input # Implied when role is NOT specified + ${pipeline.name}-extra-topic: + role: topic-role # Implies `type` to be extra + ${pipeline.name}-input-pattern-topic: + type: pattern # Implied to be an input pattern if `role` is undefined + ${pipeline.name}-extra-pattern-topic: + type: pattern # Implied to be an extra pattern if `role` is defined + role: some-role + components: # read from specific component + account-producer: + type: input # Implied when role is NOT specified + other-producer: + role: some-role # Implies `type` to be extra + component-as-input-pattern: + type: pattern # Implied to be an input pattern if `role` is undefined + component-as-extra-pattern: + type: pattern # Implied to be an extra pattern if `role` is defined + role: some-role + # to: # While the consumer-app does inherit from kafka-app, it does not need a + # `to` section, hence it does not support it. + namespace: namespace # required + # Allowed configs: + # https://github.com/bakdata/streams-bootstrap/tree/master/charts/consumer-app + values: # required + kafka: # required, consumer-app-specific + bootstrapServers: ${config.kafka_brokers} + schemaRegistryUrl: ${config.schema_registry.url} + groupId: consumer-group-id + inputTopics: + - topic1 + - topic2 + inputPattern: input-pattern + labeledInputTopics: + input_role1: + - input_topic1 + - input_topic2 + input_role2: + - input_topic3 + - input_topic4 + labeledInputPatterns: + pattern_role1: input_pattern1 + config: + my.consumer.config: my.value + nameOverride: override-with-this-name # kafka-app-specific + fullnameOverride: override-with-this-name # kafka-app-specific + autoscaling: # consumer-app-specific + enabled: false # Whether to enable auto-scaling using KEDA. + lagThreshold: 0 # Average target value to trigger scaling actions. + # This is the interval to check each trigger on. + # https://keda.sh/docs/2.9/concepts/scaling-deployments/#pollinginterval + pollingInterval: 30 + # The period to wait after the last trigger reported active before scaling + # the resource back to 0. https://keda.sh/docs/2.9/concepts/scaling-deployments/#cooldownperiod + cooldownPeriod: 300 + # The offset reset policy for the consumer if the the consumer group is + # not yet subscribed to a partition. + offsetResetPolicy: earliest + # This setting is passed to the HPA definition that KEDA will create for a + # given resource and holds the maximum number of replicas of the target resouce. + # https://keda.sh/docs/2.9/concepts/scaling-deployments/#maxreplicacount + maxReplicas: 1 + # Minimum number of replicas KEDA will scale the resource down to. + # https://keda.sh/docs/2.7/concepts/scaling-deployments/#minreplicacount + minReplicas: 0 + # If this property is set, KEDA will scale the resource down to this + # number of replicas. + # https://keda.sh/docs/2.9/concepts/scaling-deployments/#idlereplicacount + idleReplicas: 0 + topics: # List of topics used by the consumer app. + - topic1 + - topic2 + additionalTriggers: [] # List of additional KEDA triggers. + # Helm repository configuration (optional) + # If not set the helm repo add will not be called. Useful when using local Helm charts + repo_config: + repository_name: bakdata-streams-bootstrap # required + url: https://bakdata.github.io/streams-bootstrap/ # required + repo_auth_flags: + username: user + password: pass + ca_file: /home/user/path/to/ca-file + insecure_skip_tls_verify: false + version: "2.12.0" # Helm chart version diff --git a/docs/docs/resources/pipeline-components/dependencies/defaults_pipeline_component_dependencies.yaml b/docs/docs/resources/pipeline-components/dependencies/defaults_pipeline_component_dependencies.yaml index e31f0946f..0e3be65f8 100644 --- a/docs/docs/resources/pipeline-components/dependencies/defaults_pipeline_component_dependencies.yaml +++ b/docs/docs/resources/pipeline-components/dependencies/defaults_pipeline_component_dependencies.yaml @@ -1,3 +1,6 @@ +consumer-app.yaml: +- to-consumer-app.yaml +- values-consumer-app.yaml helm-app.yaml: - values-helm-app.yaml - repo_config-helm-app.yaml diff --git a/docs/docs/resources/pipeline-components/dependencies/kpops_structure.yaml b/docs/docs/resources/pipeline-components/dependencies/kpops_structure.yaml index 20157432f..5f922be36 100644 --- a/docs/docs/resources/pipeline-components/dependencies/kpops_structure.yaml +++ b/docs/docs/resources/pipeline-components/dependencies/kpops_structure.yaml @@ -1,4 +1,15 @@ kpops_components_fields: + consumer-app: + - name + - enabled + - prefix + - from_ + - to + - namespace + - values + - repo_config + - diff_config + - version helm-app: - name - enabled @@ -128,6 +139,16 @@ kpops_components_fields: - diff_config - version kpops_components_inheritance_ref: + consumer-app: + bases: + - streams-bootstrap + parents: + - streams-bootstrap + - kafka-app + - helm-app + - kubernetes-app + - pipeline-component + - base-defaults-component helm-app: bases: - kubernetes-app diff --git a/docs/docs/resources/pipeline-components/dependencies/pipeline_component_dependencies.yaml b/docs/docs/resources/pipeline-components/dependencies/pipeline_component_dependencies.yaml index 4e5c7b800..c517d87e3 100644 --- a/docs/docs/resources/pipeline-components/dependencies/pipeline_component_dependencies.yaml +++ b/docs/docs/resources/pipeline-components/dependencies/pipeline_component_dependencies.yaml @@ -1,3 +1,11 @@ +consumer-app.yaml: +- prefix.yaml +- from_.yaml +- to-consumer-app.yaml +- namespace.yaml +- values-consumer-app.yaml +- repo_config-helm-app.yaml +- version-kafka-app.yaml helm-app.yaml: - prefix.yaml - from_.yaml diff --git a/docs/docs/resources/pipeline-components/headers/consumer-app.yaml b/docs/docs/resources/pipeline-components/headers/consumer-app.yaml new file mode 100644 index 000000000..9d0df0953 --- /dev/null +++ b/docs/docs/resources/pipeline-components/headers/consumer-app.yaml @@ -0,0 +1,6 @@ +# Holds configuration to use as values for the streams bootstrap consumer-app Helm +# chart. +# More documentation on ConsumerApp: +# https://github.com/bakdata/streams-bootstrap +- type: consumer-app + name: consumer-app # required diff --git a/docs/docs/resources/pipeline-components/pipeline.yaml b/docs/docs/resources/pipeline-components/pipeline.yaml index e73fa8de3..b6ad9a726 100644 --- a/docs/docs/resources/pipeline-components/pipeline.yaml +++ b/docs/docs/resources/pipeline-components/pipeline.yaml @@ -1,3 +1,98 @@ +# Holds configuration to use as values for the streams bootstrap consumer-app Helm +# chart. +# More documentation on ConsumerApp: +# https://github.com/bakdata/streams-bootstrap +- type: consumer-app + name: consumer-app # required + # Pipeline prefix that will prefix every component name. If you wish to not + # have any prefix you can specify an empty string. + prefix: ${pipeline.name}- + from: # Must not be null + topics: # read from topic + ${pipeline.name}-input-topic: + type: input # Implied when role is NOT specified + ${pipeline.name}-extra-topic: + role: topic-role # Implies `type` to be extra + ${pipeline.name}-input-pattern-topic: + type: pattern # Implied to be an input pattern if `role` is undefined + ${pipeline.name}-extra-pattern-topic: + type: pattern # Implied to be an extra pattern if `role` is defined + role: some-role + components: # read from specific component + account-producer: + type: input # Implied when role is NOT specified + other-producer: + role: some-role # Implies `type` to be extra + component-as-input-pattern: + type: pattern # Implied to be an input pattern if `role` is undefined + component-as-extra-pattern: + type: pattern # Implied to be an extra pattern if `role` is defined + role: some-role + # to: # While the consumer-app does inherit from kafka-app, it does not need a + # `to` section, hence it does not support it. + namespace: namespace # required + # Allowed configs: + # https://github.com/bakdata/streams-bootstrap/tree/master/charts/consumer-app + values: # required + kafka: # required, consumer-app-specific + bootstrapServers: ${config.kafka_brokers} + schemaRegistryUrl: ${config.schema_registry.url} + groupId: consumer-group-id + inputTopics: + - topic1 + - topic2 + inputPattern: input-pattern + labeledInputTopics: + input_role1: + - input_topic1 + - input_topic2 + input_role2: + - input_topic3 + - input_topic4 + labeledInputPatterns: + pattern_role1: input_pattern1 + config: + my.consumer.config: my.value + nameOverride: override-with-this-name # kafka-app-specific + fullnameOverride: override-with-this-name # kafka-app-specific + autoscaling: # consumer-app-specific + enabled: false # Whether to enable auto-scaling using KEDA. + lagThreshold: 0 # Average target value to trigger scaling actions. + # This is the interval to check each trigger on. + # https://keda.sh/docs/2.9/concepts/scaling-deployments/#pollinginterval + pollingInterval: 30 + # The period to wait after the last trigger reported active before scaling + # the resource back to 0. https://keda.sh/docs/2.9/concepts/scaling-deployments/#cooldownperiod + cooldownPeriod: 300 + # The offset reset policy for the consumer if the the consumer group is + # not yet subscribed to a partition. + offsetResetPolicy: earliest + # This setting is passed to the HPA definition that KEDA will create for a + # given resource and holds the maximum number of replicas of the target resouce. + # https://keda.sh/docs/2.9/concepts/scaling-deployments/#maxreplicacount + maxReplicas: 1 + # Minimum number of replicas KEDA will scale the resource down to. + # https://keda.sh/docs/2.7/concepts/scaling-deployments/#minreplicacount + minReplicas: 0 + # If this property is set, KEDA will scale the resource down to this + # number of replicas. + # https://keda.sh/docs/2.9/concepts/scaling-deployments/#idlereplicacount + idleReplicas: 0 + topics: # List of topics used by the consumer app. + - topic1 + - topic2 + additionalTriggers: [] # List of additional KEDA triggers. + # Helm repository configuration (optional) + # If not set the helm repo add will not be called. Useful when using local Helm charts + repo_config: + repository_name: bakdata-streams-bootstrap # required + url: https://bakdata.github.io/streams-bootstrap/ # required + repo_auth_flags: + username: user + password: pass + ca_file: /home/user/path/to/ca-file + insecure_skip_tls_verify: false + version: "2.12.0" # Helm chart version # Kubernetes app managed through Helm with an associated Helm chart - type: helm-app name: helm-app # required diff --git a/docs/docs/resources/pipeline-components/sections/to-consumer-app.yaml b/docs/docs/resources/pipeline-components/sections/to-consumer-app.yaml new file mode 100644 index 000000000..029af0d48 --- /dev/null +++ b/docs/docs/resources/pipeline-components/sections/to-consumer-app.yaml @@ -0,0 +1,2 @@ + # to: # While the consumer-app does inherit from kafka-app, it does not need a + # `to` section, hence it does not support it. diff --git a/docs/docs/resources/pipeline-components/sections/values-consumer-app.yaml b/docs/docs/resources/pipeline-components/sections/values-consumer-app.yaml new file mode 100644 index 000000000..03904ade8 --- /dev/null +++ b/docs/docs/resources/pipeline-components/sections/values-consumer-app.yaml @@ -0,0 +1,51 @@ + # Allowed configs: + # https://github.com/bakdata/streams-bootstrap/tree/master/charts/consumer-app + values: # required + kafka: # required, consumer-app-specific + bootstrapServers: ${config.kafka_brokers} + schemaRegistryUrl: ${config.schema_registry.url} + groupId: consumer-group-id + inputTopics: + - topic1 + - topic2 + inputPattern: input-pattern + labeledInputTopics: + input_role1: + - input_topic1 + - input_topic2 + input_role2: + - input_topic3 + - input_topic4 + labeledInputPatterns: + pattern_role1: input_pattern1 + config: + my.consumer.config: my.value + nameOverride: override-with-this-name # kafka-app-specific + fullnameOverride: override-with-this-name # kafka-app-specific + autoscaling: # consumer-app-specific + enabled: false # Whether to enable auto-scaling using KEDA. + lagThreshold: 0 # Average target value to trigger scaling actions. + # This is the interval to check each trigger on. + # https://keda.sh/docs/2.9/concepts/scaling-deployments/#pollinginterval + pollingInterval: 30 + # The period to wait after the last trigger reported active before scaling + # the resource back to 0. https://keda.sh/docs/2.9/concepts/scaling-deployments/#cooldownperiod + cooldownPeriod: 300 + # The offset reset policy for the consumer if the the consumer group is + # not yet subscribed to a partition. + offsetResetPolicy: earliest + # This setting is passed to the HPA definition that KEDA will create for a + # given resource and holds the maximum number of replicas of the target resouce. + # https://keda.sh/docs/2.9/concepts/scaling-deployments/#maxreplicacount + maxReplicas: 1 + # Minimum number of replicas KEDA will scale the resource down to. + # https://keda.sh/docs/2.7/concepts/scaling-deployments/#minreplicacount + minReplicas: 0 + # If this property is set, KEDA will scale the resource down to this + # number of replicas. + # https://keda.sh/docs/2.9/concepts/scaling-deployments/#idlereplicacount + idleReplicas: 0 + topics: # List of topics used by the consumer app. + - topic1 + - topic2 + additionalTriggers: [] # List of additional KEDA triggers. diff --git a/docs/docs/resources/pipeline-defaults/defaults-consumer-app.yaml b/docs/docs/resources/pipeline-defaults/defaults-consumer-app.yaml new file mode 100644 index 000000000..19182f6f0 --- /dev/null +++ b/docs/docs/resources/pipeline-defaults/defaults-consumer-app.yaml @@ -0,0 +1,58 @@ +# Consumer app component that configures a streams-bootstrap consumer app. +# +# Child of: KafkaApp +# More documentation on ConsumerApp: https://github.com/bakdata/streams-bootstrap +consumer-app: + # to: # While the consumer-app does inherit from kafka-app, it does not need a + # `to` section, hence it does not support it. + # Allowed configs: + # https://github.com/bakdata/streams-bootstrap/tree/master/charts/consumer-app + values: # required + kafka: # required, consumer-app-specific + bootstrapServers: ${config.kafka_brokers} + schemaRegistryUrl: ${config.schema_registry.url} + groupId: consumer-group-id + inputTopics: + - topic1 + - topic2 + inputPattern: input-pattern + labeledInputTopics: + input_role1: + - input_topic1 + - input_topic2 + input_role2: + - input_topic3 + - input_topic4 + labeledInputPatterns: + pattern_role1: input_pattern1 + config: + my.consumer.config: my.value + nameOverride: override-with-this-name # kafka-app-specific + fullnameOverride: override-with-this-name # kafka-app-specific + autoscaling: # consumer-app-specific + enabled: false # Whether to enable auto-scaling using KEDA. + lagThreshold: 0 # Average target value to trigger scaling actions. + # This is the interval to check each trigger on. + # https://keda.sh/docs/2.9/concepts/scaling-deployments/#pollinginterval + pollingInterval: 30 + # The period to wait after the last trigger reported active before scaling + # the resource back to 0. https://keda.sh/docs/2.9/concepts/scaling-deployments/#cooldownperiod + cooldownPeriod: 300 + # The offset reset policy for the consumer if the the consumer group is + # not yet subscribed to a partition. + offsetResetPolicy: earliest + # This setting is passed to the HPA definition that KEDA will create for a + # given resource and holds the maximum number of replicas of the target resouce. + # https://keda.sh/docs/2.9/concepts/scaling-deployments/#maxreplicacount + maxReplicas: 1 + # Minimum number of replicas KEDA will scale the resource down to. + # https://keda.sh/docs/2.7/concepts/scaling-deployments/#minreplicacount + minReplicas: 0 + # If this property is set, KEDA will scale the resource down to this + # number of replicas. + # https://keda.sh/docs/2.9/concepts/scaling-deployments/#idlereplicacount + idleReplicas: 0 + topics: # List of topics used by the consumer app. + - topic1 + - topic2 + additionalTriggers: [] # List of additional KEDA triggers. diff --git a/docs/docs/resources/pipeline-defaults/defaults-producer-app.yaml b/docs/docs/resources/pipeline-defaults/defaults-producer-app.yaml index 2d9258b37..6dbe0e19a 100644 --- a/docs/docs/resources/pipeline-defaults/defaults-producer-app.yaml +++ b/docs/docs/resources/pipeline-defaults/defaults-producer-app.yaml @@ -1,5 +1,4 @@ -# Holds configuration to use as values for the streams bootstrap producer-app Helm -# chart. +# Producer app component that configures a streams-bootstrap producer app. # # Child of: KafkaApp # More documentation on ProducerApp: https://github.com/bakdata/streams-bootstrap diff --git a/docs/docs/resources/pipeline-defaults/defaults-streams-app.yaml b/docs/docs/resources/pipeline-defaults/defaults-streams-app.yaml index 02d047dec..35b652a8f 100644 --- a/docs/docs/resources/pipeline-defaults/defaults-streams-app.yaml +++ b/docs/docs/resources/pipeline-defaults/defaults-streams-app.yaml @@ -1,4 +1,4 @@ -# StreamsApp component that configures a streams bootstrap app. +# Streams app component that configures a streams bootstrap app. # # Child of: KafkaApp # More documentation on StreamsApp: https://github.com/bakdata/streams-bootstrap diff --git a/docs/docs/resources/pipeline-defaults/defaults.yaml b/docs/docs/resources/pipeline-defaults/defaults.yaml index f3ddc2724..e1e16c56c 100644 --- a/docs/docs/resources/pipeline-defaults/defaults.yaml +++ b/docs/docs/resources/pipeline-defaults/defaults.yaml @@ -1,3 +1,61 @@ +# Consumer app component that configures a streams-bootstrap consumer app. +# +# Child of: KafkaApp +# More documentation on ConsumerApp: https://github.com/bakdata/streams-bootstrap +consumer-app: + # to: # While the consumer-app does inherit from kafka-app, it does not need a + # `to` section, hence it does not support it. + # Allowed configs: + # https://github.com/bakdata/streams-bootstrap/tree/master/charts/consumer-app + values: # required + kafka: # required, consumer-app-specific + bootstrapServers: ${config.kafka_brokers} + schemaRegistryUrl: ${config.schema_registry.url} + groupId: consumer-group-id + inputTopics: + - topic1 + - topic2 + inputPattern: input-pattern + labeledInputTopics: + input_role1: + - input_topic1 + - input_topic2 + input_role2: + - input_topic3 + - input_topic4 + labeledInputPatterns: + pattern_role1: input_pattern1 + config: + my.consumer.config: my.value + nameOverride: override-with-this-name # kafka-app-specific + fullnameOverride: override-with-this-name # kafka-app-specific + autoscaling: # consumer-app-specific + enabled: false # Whether to enable auto-scaling using KEDA. + lagThreshold: 0 # Average target value to trigger scaling actions. + # This is the interval to check each trigger on. + # https://keda.sh/docs/2.9/concepts/scaling-deployments/#pollinginterval + pollingInterval: 30 + # The period to wait after the last trigger reported active before scaling + # the resource back to 0. https://keda.sh/docs/2.9/concepts/scaling-deployments/#cooldownperiod + cooldownPeriod: 300 + # The offset reset policy for the consumer if the the consumer group is + # not yet subscribed to a partition. + offsetResetPolicy: earliest + # This setting is passed to the HPA definition that KEDA will create for a + # given resource and holds the maximum number of replicas of the target resouce. + # https://keda.sh/docs/2.9/concepts/scaling-deployments/#maxreplicacount + maxReplicas: 1 + # Minimum number of replicas KEDA will scale the resource down to. + # https://keda.sh/docs/2.7/concepts/scaling-deployments/#minreplicacount + minReplicas: 0 + # If this property is set, KEDA will scale the resource down to this + # number of replicas. + # https://keda.sh/docs/2.9/concepts/scaling-deployments/#idlereplicacount + idleReplicas: 0 + topics: # List of topics used by the consumer app. + - topic1 + - topic2 + additionalTriggers: [] # List of additional KEDA triggers. # Kubernetes app managed through Helm with an associated Helm chart # # Parent of: KafkaApp @@ -184,8 +242,7 @@ kubernetes-app: image: exampleImage # Example debug: false # Example commandLine: {} # Example -# Holds configuration to use as values for the streams bootstrap producer-app Helm -# chart. +# Producer app component that configures a streams-bootstrap producer app. # # Child of: KafkaApp # More documentation on ProducerApp: https://github.com/bakdata/streams-bootstrap @@ -204,7 +261,7 @@ producer-app: output_role2: output_topic2 nameOverride: override-with-this-name # kafka-app-specific fullnameOverride: override-with-this-name # kafka-app-specific -# StreamsApp component that configures a streams bootstrap app. +# Streams app component that configures a streams bootstrap app. # # Child of: KafkaApp # More documentation on StreamsApp: https://github.com/bakdata/streams-bootstrap diff --git a/docs/docs/resources/pipeline-defaults/headers/defaults-consumer-app.yaml b/docs/docs/resources/pipeline-defaults/headers/defaults-consumer-app.yaml new file mode 100644 index 000000000..606cc6b60 --- /dev/null +++ b/docs/docs/resources/pipeline-defaults/headers/defaults-consumer-app.yaml @@ -0,0 +1,5 @@ +# Consumer app component that configures a streams-bootstrap consumer app. +# +# Child of: KafkaApp +# More documentation on ConsumerApp: https://github.com/bakdata/streams-bootstrap +consumer-app: diff --git a/docs/docs/resources/pipeline-defaults/headers/defaults-producer-app.yaml b/docs/docs/resources/pipeline-defaults/headers/defaults-producer-app.yaml index f6480d5b9..99b48c0c8 100644 --- a/docs/docs/resources/pipeline-defaults/headers/defaults-producer-app.yaml +++ b/docs/docs/resources/pipeline-defaults/headers/defaults-producer-app.yaml @@ -1,5 +1,4 @@ -# Holds configuration to use as values for the streams bootstrap producer-app Helm -# chart. +# Producer app component that configures a streams-bootstrap producer app. # # Child of: KafkaApp # More documentation on ProducerApp: https://github.com/bakdata/streams-bootstrap diff --git a/docs/docs/resources/pipeline-defaults/headers/defaults-streams-app.yaml b/docs/docs/resources/pipeline-defaults/headers/defaults-streams-app.yaml index e7692e826..ab9205c8f 100644 --- a/docs/docs/resources/pipeline-defaults/headers/defaults-streams-app.yaml +++ b/docs/docs/resources/pipeline-defaults/headers/defaults-streams-app.yaml @@ -1,4 +1,4 @@ -# StreamsApp component that configures a streams bootstrap app. +# Streams app component that configures a streams bootstrap app. # # Child of: KafkaApp # More documentation on StreamsApp: https://github.com/bakdata/streams-bootstrap diff --git a/docs/docs/schema/defaults.json b/docs/docs/schema/defaults.json index c1dc4dbea..e18ff92d6 100644 --- a/docs/docs/schema/defaults.json +++ b/docs/docs/schema/defaults.json @@ -51,6 +51,666 @@ "title": "ConnectorNewState", "type": "string" }, + "ConsumerApp": { + "additionalProperties": true, + "description": "StreamsApp component that configures a streams-bootstrap app.", + "properties": { + "diff_config": { + "$ref": "#/$defs/HelmDiffConfig", + "default": { + "ignore": null + }, + "description": "Helm diff config" + }, + "enabled": { + "default": true, + "description": "Whether the component is enabled and should be included in the pipeline", + "title": "Enabled", + "type": "boolean" + }, + "from": { + "anyOf": [ + { + "$ref": "#/$defs/FromSection" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Topic(s) and/or components from which the component will read input", + "title": "From" + }, + "name": { + "description": "Component name", + "title": "Name", + "type": "string" + }, + "namespace": { + "description": "Kubernetes namespace in which the component shall be deployed", + "title": "Namespace", + "type": "string" + }, + "prefix": { + "default": "${pipeline.name}-", + "description": "Pipeline prefix that will prefix every component name. If you wish to not have any prefix you can specify an empty string.", + "title": "Prefix", + "type": "string" + }, + "repo_config": { + "$ref": "#/$defs/HelmRepoConfig", + "default": { + "repo_auth_flags": { + "ca_file": null, + "cert_file": null, + "insecure_skip_tls_verify": false, + "password": null, + "username": null + }, + "repository_name": "bakdata-streams-bootstrap", + "url": "https://bakdata.github.io/streams-bootstrap/" + }, + "description": "Configuration of the Helm chart repo to be used for deploying the component" + }, + "to": { + "default": null, + "title": "To", + "type": "null" + }, + "type": { + "const": "consumer-app", + "title": "Type", + "type": "string" + }, + "values": { + "$ref": "#/$defs/ConsumerAppValues", + "description": "streams-bootstrap Helm values" + }, + "version": { + "default": "3.6.1", + "description": "Helm chart version", + "pattern": "^(\\d+)\\.(\\d+)\\.(\\d+)(-[a-zA-Z]+(\\.[a-zA-Z]+)?)?$", + "title": "Version", + "type": "string" + } + }, + "required": [ + "name", + "namespace", + "values", + "type" + ], + "title": "ConsumerApp", + "type": "object" + }, + "ConsumerAppValues": { + "additionalProperties": true, + "description": "consumer-app configurations.\nThe attributes correspond to keys and values that are used as values for the streams bootstrap helm chart.", + "properties": { + "affinity": { + "anyOf": [ + { + "$ref": "#/$defs/Affinity" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Map to configure pod affinities https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/#affinity-and-anti-affinity." + }, + "autoscaling": { + "anyOf": [ + { + "$ref": "#/$defs/kpops__components__streams_bootstrap__common__model__StreamsAppAutoScaling" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Kubernetes event-driven autoscaling config" + }, + "commandLine": { + "anyOf": [ + { + "additionalProperties": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "boolean" + }, + { + "type": "integer" + }, + { + "type": "number" + } + ] + }, + "type": "object" + }, + { + "type": "null" + } + ], + "default": {}, + "description": "Map of command line arguments passed to the streams app.", + "title": "Commandline" + }, + "configurationEnvPrefix": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Prefix for environment variables to use that should be parsed as command line arguments.", + "title": "Configurationenvprefix" + }, + "env": { + "anyOf": [ + { + "additionalProperties": { + "type": "string" + }, + "type": "object" + }, + { + "type": "null" + } + ], + "default": {}, + "description": "Custom environment variables.", + "title": "Env" + }, + "files": { + "anyOf": [ + { + "type": "object" + }, + { + "type": "null" + } + ], + "default": {}, + "description": "Map of files to mount for the app. File will be mounted as $value.mountPath/$key. $value.content denotes file content (recommended to be used with --set-file).", + "title": "Files" + }, + "fullnameOverride": { + "anyOf": [ + { + "maxLength": 63, + "type": "string" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Helm chart fullname override, assigned automatically", + "title": "FullnameOverride" + }, + "image": { + "description": "Docker image of the Kafka producer app.", + "title": "Image", + "type": "string" + }, + "imagePullPolicy": { + "anyOf": [ + { + "$ref": "#/$defs/ImagePullPolicy" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Docker image pull policy." + }, + "imagePullSecrets": { + "anyOf": [ + { + "items": { + "additionalProperties": { + "type": "string" + }, + "type": "object" + }, + "type": "array" + }, + { + "type": "null" + } + ], + "default": [], + "description": "Secrets to be used for private registries.", + "title": "Imagepullsecrets" + }, + "imageTag": { + "anyOf": [ + { + "pattern": "^[a-zA-Z0-9_][a-zA-Z0-9._-]{0,127}$", + "type": "string" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Docker image tag of the streams-bootstrap app.", + "title": "Imagetag" + }, + "javaOptions": { + "anyOf": [ + { + "$ref": "#/$defs/JavaOptions" + }, + { + "type": "null" + } + ], + "default": null + }, + "jmx": { + "anyOf": [ + { + "$ref": "#/$defs/JMXConfig" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Configuration for JMX Exporter." + }, + "kafka": { + "$ref": "#/$defs/ConsumerConfig", + "default": { + "bootstrapServers": "${config.kafka_brokers}", + "config": null, + "groupId": null, + "inputPattern": null, + "inputTopics": null, + "labeledInputPatterns": null, + "labeledInputTopics": null, + "labeledOutputTopics": null, + "outputTopic": null, + "schemaRegistryUrl": null + }, + "description": "consumer-app kafka section" + }, + "livenessProbe": { + "anyOf": [ + { + "type": "object" + }, + { + "type": "null" + } + ], + "default": {}, + "description": "See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.25/#probe-v1-core", + "title": "Livenessprobe" + }, + "nameOverride": { + "anyOf": [ + { + "maxLength": 63, + "type": "string" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Helm chart name override, assigned automatically", + "title": "NameOverride" + }, + "persistence": { + "anyOf": [ + { + "$ref": "#/$defs/PersistenceConfig" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Configuration for persistent volume to store the state of the consumer app." + }, + "podAnnotations": { + "anyOf": [ + { + "additionalProperties": { + "type": "string" + }, + "type": "object" + }, + { + "type": "null" + } + ], + "default": {}, + "description": "Map of custom annotations to attach to the pod spec.", + "title": "Podannotations" + }, + "podLabels": { + "anyOf": [ + { + "additionalProperties": { + "type": "string" + }, + "type": "object" + }, + { + "type": "null" + } + ], + "default": {}, + "description": "Map of custom labels to attach to the pod spec.", + "title": "Podlabels" + }, + "ports": { + "anyOf": [ + { + "items": { + "$ref": "#/$defs/PortConfig" + }, + "type": "array" + }, + { + "type": "null" + } + ], + "default": [], + "title": "Ports" + }, + "prometheus": { + "anyOf": [ + { + "$ref": "#/$defs/PrometheusExporterConfig" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Configuration for Prometheus JMX Exporter." + }, + "readinessProbe": { + "anyOf": [ + { + "type": "object" + }, + { + "type": "null" + } + ], + "default": {}, + "description": "See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.25/#probe-v1-core", + "title": "Readinessprobe" + }, + "resources": { + "anyOf": [ + { + "$ref": "#/$defs/Resources" + }, + { + "type": "null" + } + ], + "default": null, + "description": "See https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/" + }, + "secretFilesRefs": { + "anyOf": [ + { + "items": { + "type": "object" + }, + "type": "array" + }, + { + "type": "null" + } + ], + "default": [], + "description": "Mount existing secrets as volumes", + "title": "Secretfilesrefs" + }, + "secretRefs": { + "anyOf": [ + { + "type": "object" + }, + { + "type": "null" + } + ], + "default": {}, + "description": "Inject existing secrets as environment variables. Map key is used as environment variable name. Value consists of secret name and key.", + "title": "Secretrefs" + }, + "secrets": { + "anyOf": [ + { + "additionalProperties": { + "type": "string" + }, + "type": "object" + }, + { + "type": "null" + } + ], + "default": {}, + "description": "Custom secret environment variables. Prefix with configurationEnvPrefix in order to pass secrets to command line or prefix with KAFKA_ to pass secrets to Kafka Streams configuration.", + "title": "Secrets" + }, + "service": { + "anyOf": [ + { + "$ref": "#/$defs/ServiceConfig" + }, + { + "type": "null" + } + ], + "default": null + }, + "statefulSet": { + "default": false, + "description": "Whether to use a StatefulSet instead of a Deployment to deploy the consumer app.", + "title": "Statefulset", + "type": "boolean" + }, + "terminationGracePeriodSeconds": { + "anyOf": [ + { + "type": "integer" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Delay for graceful application shutdown in seconds: https://pracucci.com/graceful-shutdown-of-kubernetes-pods.html", + "title": "Terminationgraceperiodseconds" + }, + "tolerations": { + "anyOf": [ + { + "items": { + "$ref": "#/$defs/Toleration" + }, + "type": "array" + }, + { + "type": "null" + } + ], + "default": [], + "description": "Array containing taint references. When defined, pods can run on nodes, which would otherwise deny scheduling.", + "title": "Tolerations" + } + }, + "required": [ + "image" + ], + "title": "ConsumerAppValues", + "type": "object" + }, + "ConsumerConfig": { + "additionalProperties": true, + "description": "consumer app kafka section.", + "properties": { + "bootstrapServers": { + "default": "${config.kafka_brokers}", + "description": "Brokers", + "title": "Bootstrapservers", + "type": "string" + }, + "config": { + "anyOf": [ + { + "type": "object" + }, + { + "type": "null" + } + ], + "default": {}, + "description": "Configuration", + "title": "Config" + }, + "groupId": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Unique consumer group ID for Kafka Streams. Required for auto-scaling.", + "title": "Unique consumer group ID" + }, + "inputPattern": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Input pattern", + "title": "Inputpattern" + }, + "inputTopics": { + "anyOf": [ + { + "items": { + "type": "string" + }, + "type": "array" + }, + { + "type": "null" + } + ], + "default": [], + "description": "Input topics", + "title": "Inputtopics" + }, + "labeledInputPatterns": { + "anyOf": [ + { + "additionalProperties": { + "type": "string" + }, + "type": "object" + }, + { + "type": "null" + } + ], + "default": {}, + "description": "Extra input patterns", + "title": "Labeledinputpatterns" + }, + "labeledInputTopics": { + "anyOf": [ + { + "additionalProperties": { + "items": { + "type": "string" + }, + "type": "array" + }, + "type": "object" + }, + { + "type": "null" + } + ], + "default": {}, + "description": "Extra input topics", + "title": "Labeledinputtopics" + }, + "labeledOutputTopics": { + "anyOf": [ + { + "additionalProperties": { + "type": "string" + }, + "type": "object" + }, + { + "type": "null" + } + ], + "default": {}, + "description": "Extra output topics", + "title": "Labeledoutputtopics" + }, + "outputTopic": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Output topic" + }, + "schemaRegistryUrl": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "default": null, + "description": "URL of the Schema Registry", + "title": "Schema Registry URL" + } + }, + "title": "ConsumerConfig", + "type": "object" + }, "Effects": { "enum": [ "NoExecute", @@ -3152,7 +3812,7 @@ "autoscaling": { "anyOf": [ { - "$ref": "#/$defs/kpops__components__streams_bootstrap__streams__model__StreamsAppAutoScaling" + "$ref": "#/$defs/kpops__components__streams_bootstrap__common__model__StreamsAppAutoScaling" }, { "type": "null" @@ -4374,7 +5034,7 @@ "title": "WeightedPodAffinityTerm", "type": "object" }, - "kpops__components__streams_bootstrap__streams__model__StreamsAppAutoScaling": { + "kpops__components__streams_bootstrap__common__model__StreamsAppAutoScaling": { "additionalProperties": true, "description": "Kubernetes Event-driven Autoscaling config.", "properties": { @@ -4915,6 +5575,9 @@ } }, "properties": { + "consumer-app": { + "$ref": "#/$defs/ConsumerApp" + }, "helm-app": { "$ref": "#/$defs/HelmApp" }, @@ -4963,6 +5626,7 @@ "kafka-source-connector", "kubernetes-app", "pipeline-component", + "consumer-app", "producer-app", "streams-app", "streams-bootstrap", diff --git a/docs/docs/schema/pipeline.json b/docs/docs/schema/pipeline.json index 77fd51ce2..779d3131c 100644 --- a/docs/docs/schema/pipeline.json +++ b/docs/docs/schema/pipeline.json @@ -51,6 +51,666 @@ "title": "ConnectorNewState", "type": "string" }, + "ConsumerApp": { + "additionalProperties": true, + "description": "StreamsApp component that configures a streams-bootstrap app.", + "properties": { + "diff_config": { + "$ref": "#/$defs/HelmDiffConfig", + "default": { + "ignore": null + }, + "description": "Helm diff config" + }, + "enabled": { + "default": true, + "description": "Whether the component is enabled and should be included in the pipeline", + "title": "Enabled", + "type": "boolean" + }, + "from": { + "anyOf": [ + { + "$ref": "#/$defs/FromSection" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Topic(s) and/or components from which the component will read input", + "title": "From" + }, + "name": { + "description": "Component name", + "title": "Name", + "type": "string" + }, + "namespace": { + "description": "Kubernetes namespace in which the component shall be deployed", + "title": "Namespace", + "type": "string" + }, + "prefix": { + "default": "${pipeline.name}-", + "description": "Pipeline prefix that will prefix every component name. If you wish to not have any prefix you can specify an empty string.", + "title": "Prefix", + "type": "string" + }, + "repo_config": { + "$ref": "#/$defs/HelmRepoConfig", + "default": { + "repo_auth_flags": { + "ca_file": null, + "cert_file": null, + "insecure_skip_tls_verify": false, + "password": null, + "username": null + }, + "repository_name": "bakdata-streams-bootstrap", + "url": "https://bakdata.github.io/streams-bootstrap/" + }, + "description": "Configuration of the Helm chart repo to be used for deploying the component" + }, + "to": { + "default": null, + "title": "To", + "type": "null" + }, + "type": { + "const": "consumer-app", + "title": "Type", + "type": "string" + }, + "values": { + "$ref": "#/$defs/ConsumerAppValues", + "description": "streams-bootstrap Helm values" + }, + "version": { + "default": "3.6.1", + "description": "Helm chart version", + "pattern": "^(\\d+)\\.(\\d+)\\.(\\d+)(-[a-zA-Z]+(\\.[a-zA-Z]+)?)?$", + "title": "Version", + "type": "string" + } + }, + "required": [ + "name", + "namespace", + "values", + "type" + ], + "title": "ConsumerApp", + "type": "object" + }, + "ConsumerAppValues": { + "additionalProperties": true, + "description": "consumer-app configurations.\nThe attributes correspond to keys and values that are used as values for the streams bootstrap helm chart.", + "properties": { + "affinity": { + "anyOf": [ + { + "$ref": "#/$defs/Affinity" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Map to configure pod affinities https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/#affinity-and-anti-affinity." + }, + "autoscaling": { + "anyOf": [ + { + "$ref": "#/$defs/kpops__components__streams_bootstrap__common__model__StreamsAppAutoScaling" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Kubernetes event-driven autoscaling config" + }, + "commandLine": { + "anyOf": [ + { + "additionalProperties": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "boolean" + }, + { + "type": "integer" + }, + { + "type": "number" + } + ] + }, + "type": "object" + }, + { + "type": "null" + } + ], + "default": {}, + "description": "Map of command line arguments passed to the streams app.", + "title": "Commandline" + }, + "configurationEnvPrefix": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Prefix for environment variables to use that should be parsed as command line arguments.", + "title": "Configurationenvprefix" + }, + "env": { + "anyOf": [ + { + "additionalProperties": { + "type": "string" + }, + "type": "object" + }, + { + "type": "null" + } + ], + "default": {}, + "description": "Custom environment variables.", + "title": "Env" + }, + "files": { + "anyOf": [ + { + "type": "object" + }, + { + "type": "null" + } + ], + "default": {}, + "description": "Map of files to mount for the app. File will be mounted as $value.mountPath/$key. $value.content denotes file content (recommended to be used with --set-file).", + "title": "Files" + }, + "fullnameOverride": { + "anyOf": [ + { + "maxLength": 63, + "type": "string" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Helm chart fullname override, assigned automatically", + "title": "FullnameOverride" + }, + "image": { + "description": "Docker image of the Kafka producer app.", + "title": "Image", + "type": "string" + }, + "imagePullPolicy": { + "anyOf": [ + { + "$ref": "#/$defs/ImagePullPolicy" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Docker image pull policy." + }, + "imagePullSecrets": { + "anyOf": [ + { + "items": { + "additionalProperties": { + "type": "string" + }, + "type": "object" + }, + "type": "array" + }, + { + "type": "null" + } + ], + "default": [], + "description": "Secrets to be used for private registries.", + "title": "Imagepullsecrets" + }, + "imageTag": { + "anyOf": [ + { + "pattern": "^[a-zA-Z0-9_][a-zA-Z0-9._-]{0,127}$", + "type": "string" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Docker image tag of the streams-bootstrap app.", + "title": "Imagetag" + }, + "javaOptions": { + "anyOf": [ + { + "$ref": "#/$defs/JavaOptions" + }, + { + "type": "null" + } + ], + "default": null + }, + "jmx": { + "anyOf": [ + { + "$ref": "#/$defs/JMXConfig" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Configuration for JMX Exporter." + }, + "kafka": { + "$ref": "#/$defs/ConsumerConfig", + "default": { + "bootstrapServers": "${config.kafka_brokers}", + "config": null, + "groupId": null, + "inputPattern": null, + "inputTopics": null, + "labeledInputPatterns": null, + "labeledInputTopics": null, + "labeledOutputTopics": null, + "outputTopic": null, + "schemaRegistryUrl": null + }, + "description": "consumer-app kafka section" + }, + "livenessProbe": { + "anyOf": [ + { + "type": "object" + }, + { + "type": "null" + } + ], + "default": {}, + "description": "See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.25/#probe-v1-core", + "title": "Livenessprobe" + }, + "nameOverride": { + "anyOf": [ + { + "maxLength": 63, + "type": "string" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Helm chart name override, assigned automatically", + "title": "NameOverride" + }, + "persistence": { + "anyOf": [ + { + "$ref": "#/$defs/PersistenceConfig" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Configuration for persistent volume to store the state of the consumer app." + }, + "podAnnotations": { + "anyOf": [ + { + "additionalProperties": { + "type": "string" + }, + "type": "object" + }, + { + "type": "null" + } + ], + "default": {}, + "description": "Map of custom annotations to attach to the pod spec.", + "title": "Podannotations" + }, + "podLabels": { + "anyOf": [ + { + "additionalProperties": { + "type": "string" + }, + "type": "object" + }, + { + "type": "null" + } + ], + "default": {}, + "description": "Map of custom labels to attach to the pod spec.", + "title": "Podlabels" + }, + "ports": { + "anyOf": [ + { + "items": { + "$ref": "#/$defs/PortConfig" + }, + "type": "array" + }, + { + "type": "null" + } + ], + "default": [], + "title": "Ports" + }, + "prometheus": { + "anyOf": [ + { + "$ref": "#/$defs/PrometheusExporterConfig" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Configuration for Prometheus JMX Exporter." + }, + "readinessProbe": { + "anyOf": [ + { + "type": "object" + }, + { + "type": "null" + } + ], + "default": {}, + "description": "See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.25/#probe-v1-core", + "title": "Readinessprobe" + }, + "resources": { + "anyOf": [ + { + "$ref": "#/$defs/Resources" + }, + { + "type": "null" + } + ], + "default": null, + "description": "See https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/" + }, + "secretFilesRefs": { + "anyOf": [ + { + "items": { + "type": "object" + }, + "type": "array" + }, + { + "type": "null" + } + ], + "default": [], + "description": "Mount existing secrets as volumes", + "title": "Secretfilesrefs" + }, + "secretRefs": { + "anyOf": [ + { + "type": "object" + }, + { + "type": "null" + } + ], + "default": {}, + "description": "Inject existing secrets as environment variables. Map key is used as environment variable name. Value consists of secret name and key.", + "title": "Secretrefs" + }, + "secrets": { + "anyOf": [ + { + "additionalProperties": { + "type": "string" + }, + "type": "object" + }, + { + "type": "null" + } + ], + "default": {}, + "description": "Custom secret environment variables. Prefix with configurationEnvPrefix in order to pass secrets to command line or prefix with KAFKA_ to pass secrets to Kafka Streams configuration.", + "title": "Secrets" + }, + "service": { + "anyOf": [ + { + "$ref": "#/$defs/ServiceConfig" + }, + { + "type": "null" + } + ], + "default": null + }, + "statefulSet": { + "default": false, + "description": "Whether to use a StatefulSet instead of a Deployment to deploy the consumer app.", + "title": "Statefulset", + "type": "boolean" + }, + "terminationGracePeriodSeconds": { + "anyOf": [ + { + "type": "integer" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Delay for graceful application shutdown in seconds: https://pracucci.com/graceful-shutdown-of-kubernetes-pods.html", + "title": "Terminationgraceperiodseconds" + }, + "tolerations": { + "anyOf": [ + { + "items": { + "$ref": "#/$defs/Toleration" + }, + "type": "array" + }, + { + "type": "null" + } + ], + "default": [], + "description": "Array containing taint references. When defined, pods can run on nodes, which would otherwise deny scheduling.", + "title": "Tolerations" + } + }, + "required": [ + "image" + ], + "title": "ConsumerAppValues", + "type": "object" + }, + "ConsumerConfig": { + "additionalProperties": true, + "description": "consumer app kafka section.", + "properties": { + "bootstrapServers": { + "default": "${config.kafka_brokers}", + "description": "Brokers", + "title": "Bootstrapservers", + "type": "string" + }, + "config": { + "anyOf": [ + { + "type": "object" + }, + { + "type": "null" + } + ], + "default": {}, + "description": "Configuration", + "title": "Config" + }, + "groupId": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Unique consumer group ID for Kafka Streams. Required for auto-scaling.", + "title": "Unique consumer group ID" + }, + "inputPattern": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Input pattern", + "title": "Inputpattern" + }, + "inputTopics": { + "anyOf": [ + { + "items": { + "type": "string" + }, + "type": "array" + }, + { + "type": "null" + } + ], + "default": [], + "description": "Input topics", + "title": "Inputtopics" + }, + "labeledInputPatterns": { + "anyOf": [ + { + "additionalProperties": { + "type": "string" + }, + "type": "object" + }, + { + "type": "null" + } + ], + "default": {}, + "description": "Extra input patterns", + "title": "Labeledinputpatterns" + }, + "labeledInputTopics": { + "anyOf": [ + { + "additionalProperties": { + "items": { + "type": "string" + }, + "type": "array" + }, + "type": "object" + }, + { + "type": "null" + } + ], + "default": {}, + "description": "Extra input topics", + "title": "Labeledinputtopics" + }, + "labeledOutputTopics": { + "anyOf": [ + { + "additionalProperties": { + "type": "string" + }, + "type": "object" + }, + { + "type": "null" + } + ], + "default": {}, + "description": "Extra output topics", + "title": "Labeledoutputtopics" + }, + "outputTopic": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Output topic" + }, + "schemaRegistryUrl": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "default": null, + "description": "URL of the Schema Registry", + "title": "Schema Registry URL" + } + }, + "title": "ConsumerConfig", + "type": "object" + }, "Effects": { "enum": [ "NoExecute", @@ -2783,7 +3443,7 @@ "autoscaling": { "anyOf": [ { - "$ref": "#/$defs/kpops__components__streams_bootstrap__streams__model__StreamsAppAutoScaling" + "$ref": "#/$defs/kpops__components__streams_bootstrap__common__model__StreamsAppAutoScaling" }, { "type": "null" @@ -3390,7 +4050,7 @@ "title": "WeightedPodAffinityTerm", "type": "object" }, - "kpops__components__streams_bootstrap__streams__model__StreamsAppAutoScaling": { + "kpops__components__streams_bootstrap__common__model__StreamsAppAutoScaling": { "additionalProperties": true, "description": "Kubernetes Event-driven Autoscaling config.", "properties": { @@ -3933,6 +4593,7 @@ "items": { "discriminator": { "mapping": { + "consumer-app": "#/$defs/ConsumerApp", "helm-app": "#/$defs/HelmApp", "kafka-sink-connector": "#/$defs/KafkaSinkConnector", "kafka-source-connector": "#/$defs/KafkaSourceConnector", @@ -3953,6 +4614,9 @@ { "$ref": "#/$defs/KafkaSourceConnector" }, + { + "$ref": "#/$defs/ConsumerApp" + }, { "$ref": "#/$defs/ProducerApp" }, diff --git a/docs/docs/user/core-concepts/components/consumer-app.md b/docs/docs/user/core-concepts/components/consumer-app.md new file mode 100644 index 000000000..ad43254a5 --- /dev/null +++ b/docs/docs/user/core-concepts/components/consumer-app.md @@ -0,0 +1,43 @@ +# ConsumerApp + +Subclass of [_StreamsBootstrap_](streams-bootstrap.md). + +### Usage + +Configures a +[streams-bootstrap](https://github.com/bakdata/streams-bootstrap){target=_blank} +[Kafka consumer app](https://github.com/bakdata/streams-bootstrap#kafka-consumer){target=_blank} + +### Configuration + + + +??? example "`pipeline.yaml`" + + ```yaml + --8<-- + ./docs/resources/pipeline-components/consumer-app.yaml + --8<-- + ``` + + + +### Operations + +#### deploy + +Identical to [StreamsBootstrap's `deploy`](streams-bootstrap.md#deploy). The consumer app has no `to` section, so no topics are created and no schemas are submitted. + +#### destroy + +Uninstall Helm release. + +#### reset + +- Delete the consumer group offsets + +#### clean + +Similar to [`reset`](#reset) with an additional step: + +- Delete persistent volume claims if `statefulSet` is enabled and `persistence` is enabled diff --git a/docs/docs/user/core-concepts/components/helm-app.md b/docs/docs/user/core-concepts/components/helm-app.md index 4a7af609b..c878fa04a 100644 --- a/docs/docs/user/core-concepts/components/helm-app.md +++ b/docs/docs/user/core-concepts/components/helm-app.md @@ -22,11 +22,11 @@ Can be used to deploy any app in Kubernetes using Helm, for example, a REST serv #### deploy -Deploy using Helm. +Identical to [HelmApp's `deploy`](helm-app.md#deploy). #### destroy -Uninstall Helm release. +Identical to [HelmApp's `destroy`](helm-app.md#deploy). #### reset diff --git a/docs/docs/user/core-concepts/components/producer-app.md b/docs/docs/user/core-concepts/components/producer-app.md index f9a0f5088..d50da06ee 100644 --- a/docs/docs/user/core-concepts/components/producer-app.md +++ b/docs/docs/user/core-concepts/components/producer-app.md @@ -24,7 +24,7 @@ Configures a [streams-bootstrap](https://github.com/bakdata/streams-bootstrap){t #### deploy -In addition to [KubernetesApp's `deploy`](kubernetes-app.md#deploy): +In addition to [StreamsBootstrap's `deploy`](streams-bootstrap.md#deploy): - Create topics if provided (optional) - Submit Avro schemas to the registry if provided (optional) diff --git a/docs/docs/user/core-concepts/components/streams-app.md b/docs/docs/user/core-concepts/components/streams-app.md index a84e71b31..fe8b6f8c7 100644 --- a/docs/docs/user/core-concepts/components/streams-app.md +++ b/docs/docs/user/core-concepts/components/streams-app.md @@ -26,7 +26,7 @@ Configures a #### deploy -In addition to [KubernetesApp's `deploy`](kubernetes-app.md#deploy): +In addition to [StreamsBootstrap's `deploy`](streams-bootstrap.md#deploy): - Create topics if provided (optional) - Submit Avro schemas to the registry if provided (optional) diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index 698a5e9dd..c4e49ae63 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -115,6 +115,7 @@ nav: - StreamsBootstrap: user/core-concepts/components/streams-bootstrap.md - StreamsApp: user/core-concepts/components/streams-app.md - ProducerApp: user/core-concepts/components/producer-app.md + - ConsumerApp: user/core-concepts/components/consumer-app.md - KafkaConnector: user/core-concepts/components/kafka-connector.md - KafkaSinkConnector: user/core-concepts/components/kafka-sink-connector.md - KafkaSourceConnector: user/core-concepts/components/kafka-source-connector.md diff --git a/kpops/components/common/app_type.py b/kpops/components/common/app_type.py index 982ad07fa..54445b258 100644 --- a/kpops/components/common/app_type.py +++ b/kpops/components/common/app_type.py @@ -4,5 +4,7 @@ class AppType(Enum): STREAMS_APP = "streams-app" PRODUCER_APP = "producer-app" + CONSUMER_APP = "consumer-app" CLEANUP_STREAMS_APP = "streams-app-cleanup-job" CLEANUP_PRODUCER_APP = "producer-app-cleanup-job" + CLEANUP_CONSUMER_APP = "consumer-app-cleanup-job" diff --git a/kpops/components/streams_bootstrap/__init__.py b/kpops/components/streams_bootstrap/__init__.py index 32f40e540..49e1da54c 100644 --- a/kpops/components/streams_bootstrap/__init__.py +++ b/kpops/components/streams_bootstrap/__init__.py @@ -1,6 +1,7 @@ from kpops.components.streams_bootstrap.base import StreamsBootstrap +from .consumer.consumer_app import ConsumerApp from .producer.producer_app import ProducerApp from .streams.streams_app import StreamsApp -__all__ = ("ProducerApp", "StreamsApp", "StreamsBootstrap") +__all__ = ("ConsumerApp", "ProducerApp", "StreamsApp", "StreamsBootstrap") diff --git a/kpops/components/streams_bootstrap/common/__init__.py b/kpops/components/streams_bootstrap/common/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/kpops/components/streams_bootstrap/common/model.py b/kpops/components/streams_bootstrap/common/model.py new file mode 100644 index 000000000..111c8e581 --- /dev/null +++ b/kpops/components/streams_bootstrap/common/model.py @@ -0,0 +1,189 @@ +from __future__ import annotations + +from enum import StrEnum +from typing import ClassVar, Self + +import pydantic +from pydantic import ConfigDict, Field + +from kpops.components.common.kubernetes_model import ImagePullPolicy, Resources +from kpops.components.common.topic import KafkaTopic +from kpops.utils.pydantic import ( + CamelCaseConfigModel, + DescConfigModel, + SerializeAsOptional, + SerializeAsOptionalModel, +) + + +def serialize_topics(topics: list[KafkaTopic]) -> list[str]: + return [topic.name for topic in topics] + + +def serialize_labeled_input_topics( + labeled_input_topics: dict[str, list[KafkaTopic]], +) -> dict[str, list[str]]: + return { + label: serialize_topics(topics) + for label, topics in labeled_input_topics.items() + } + + +class JmxRuleType(StrEnum): + GAUGE = "GAUGE" + COUNTER = "COUNTER" + UNTYPED = "UNTYPED" + + +class JMXRule(SerializeAsOptionalModel, CamelCaseConfigModel, DescConfigModel): + """JMX rule. + + :param pattern: Regex pattern to match against each bean attribute. The pattern is not anchored. Capture groups can be used in other options. Defaults to matching everything. + :param name: The metric name to set. Capture groups from the pattern can be used. If not specified, the default format will be used. If it evaluates to empty, processing of this attribute stops with no output. An Additional suffix may be added to this name (e.g _total for type COUNTER) + :param value: Value for the metric. Static values and capture groups from the pattern can be used. If not specified the scraped mBean value will be used. + :param value_factor: Optional number that value (or the scraped mBean value if value is not specified) is multiplied by, mainly used to convert mBean values from milliseconds to seconds. + :param help: Help text for the metric. Capture groups from pattern can be used. name must be set to use this. Defaults to the mBean attribute description, domain, and name of the attribute. + :param attr_name_snake_case: Converts the attribute name to snake case. This is seen in the names matched by the pattern and the default format. For example, anAttrName to an_attr_name. + :param cache: Whether to cache bean name expressions to rule computation (match and mismatch). Not recommended for rules matching on bean value, as only the value from the first scrape will be cached and re-used. This can increase performance when collecting a lot of mbeans. + :param type: The type of the metric. name must be set to use this. + :param labels: A map of label name to label value pairs. Capture groups from pattern can be used in each. name must be set to use this. Empty names and values are ignored. If not specified and the default format is not being used, no labels are set. + """ + + pattern: str | None = None + name: str | None = None + value: str | bool | int | float | None = None + value_factor: float | None = None + help: str | None = None + attr_name_snake_case: bool | None = None + cache: bool | None = None + type: JmxRuleType | None = None + labels: SerializeAsOptional[dict[str, str]] = {} + + +class PrometheusExporterConfig(CamelCaseConfigModel, DescConfigModel): + """Prometheus JMX exporter configuration. + + :param jmx: The prometheus JMX exporter configuration. + + """ + + class PrometheusJMXExporterConfig( + SerializeAsOptionalModel, CamelCaseConfigModel, DescConfigModel + ): + """Prometheus JMX exporter configuration. + + :param enabled: Whether to install Prometheus JMX Exporter as a sidecar container and expose JMX metrics to Prometheus. + :param image: Docker Image for Prometheus JMX Exporter container. + :param image_tag: Docker Image Tag for Prometheus JMX Exporter container. + :param image_pull_policy: Docker Image Pull Policy for Prometheus JMX Exporter container. + :param port: JMX Exporter Port which exposes metrics in Prometheus format for scraping. + :param resources: JMX Exporter resources configuration. + :param metric_rules: List of JMX metric rules. + """ + + enabled: bool | None = None + image: str | None = None + image_tag: str | None = None + image_pull_policy: ImagePullPolicy | None = None + port: int | None = None + resources: Resources | None = None + metric_rules: SerializeAsOptional[list[JMXRule]] = [] + + jmx: PrometheusJMXExporterConfig | None = None + + +class JMXConfig(CamelCaseConfigModel, DescConfigModel): + """JMX configuration options. + + :param enabled: Whether or not to open JMX port for remote access (e.g., for debugging) + :param host: The host to use for JMX remote access. + :param port: The JMX port which JMX style metrics are exposed. + """ + + enabled: bool | None = None + host: str | None = None + port: int | None = None + + +class StreamsAppAutoScaling( + SerializeAsOptionalModel, CamelCaseConfigModel, DescConfigModel +): + """Kubernetes Event-driven Autoscaling config. + + :param enabled: Whether to enable auto-scaling using KEDA., defaults to False + :param lag_threshold: Average target value to trigger scaling actions. + Mandatory to set when auto-scaling is enabled. + :param polling_interval: This is the interval to check each trigger on. + https://keda.sh/docs/2.9/concepts/scaling-deployments/#pollinginterval, + defaults to 30 + :param cooldown_period: The period to wait after the last trigger reported + active before scaling the resource back to 0. + https://keda.sh/docs/2.9/concepts/scaling-deployments/#cooldownperiod, + defaults to 300 + :param offset_reset_policy: The offset reset policy for the consumer if the + consumer group is not yet subscribed to a partition., + defaults to "earliest" + :param min_replicas: Minimum number of replicas KEDA will scale the resource down to. + "https://keda.sh/docs/2.9/concepts/scaling-deployments/#minreplicacount", + defaults to 0 + :param max_replicas: This setting is passed to the HPA definition that KEDA + will create for a given resource and holds the maximum number of replicas + of the target resouce. + https://keda.sh/docs/2.9/concepts/scaling-deployments/#maxreplicacount, + defaults to 1 + :param idle_replicas: If this property is set, KEDA will scale the resource + down to this number of replicas. + https://keda.sh/docs/2.9/concepts/scaling-deployments/#idlereplicacount, + defaults to None + :param internal_topics: List of auto-generated Kafka Streams topics used by the streams app, defaults to [] + :param topics: List of topics used by the streams app, defaults to [] + :param additional_triggers: List of additional KEDA triggers, + see https://keda.sh/docs/latest/scalers/, + defaults to [] + """ + + enabled: bool = False + lag_threshold: int | None = None + polling_interval: int | None = None + cooldown_period: int | None = None + offset_reset_policy: str | None = None + min_replicas: int | None = Field( + default=None, + title="Min replica count", + ) + max_replicas: int | None = Field( + default=None, + title="Max replica count", + ) + idle_replicas: int | None = Field( + default=None, + title="Idle replica count", + ) + internal_topics: SerializeAsOptional[list[str]] = [] + topics: SerializeAsOptional[list[str]] = [] + additional_triggers: SerializeAsOptional[list[str]] = [] + + model_config: ClassVar[ConfigDict] = ConfigDict(extra="allow") + + +class PersistenceConfig(CamelCaseConfigModel, DescConfigModel): + """streams-bootstrap persistence configurations. + + :param enabled: Whether to use a persistent volume to store the state of the streams app. + :param size: The size of the PersistentVolume to allocate to each streams pod in the StatefulSet. + :param storage_class: Storage class to use for the persistent volume. + """ + + enabled: bool = False + size: str | None = None + storage_class: str | None = None + + @pydantic.model_validator(mode="after") + def validate_mandatory_fields_are_set(self) -> Self: + if self.enabled and self.size is None: + msg = ( + "If app.persistence.enabled is set to true, " + "the field app.persistence.size needs to be set." + ) + raise ValueError(msg) + return self diff --git a/kpops/components/streams_bootstrap/consumer/__init__.py b/kpops/components/streams_bootstrap/consumer/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/kpops/components/streams_bootstrap/consumer/consumer_app.py b/kpops/components/streams_bootstrap/consumer/consumer_app.py new file mode 100644 index 000000000..81e8874da --- /dev/null +++ b/kpops/components/streams_bootstrap/consumer/consumer_app.py @@ -0,0 +1,180 @@ +import logging +from functools import cached_property + +from pydantic import Field, ValidationError +from typing_extensions import override + +from kpops.component_handlers.kubernetes.pvc_handler import PVCHandler +from kpops.components.base_components.helm_app import HelmApp +from kpops.components.common.app_type import AppType +from kpops.components.common.topic import KafkaTopic +from kpops.components.streams_bootstrap.base import ( + StreamsBootstrap, + StreamsBootstrapCleaner, +) +from kpops.components.streams_bootstrap.consumer.model import ConsumerAppValues +from kpops.config import get_config +from kpops.const.file_type import DEFAULTS_YAML, PIPELINE_YAML +from kpops.core.operation import OperationMode +from kpops.manifests.argo import ArgoHook, enrich_annotations +from kpops.manifests.kubernetes import KubernetesManifest + +log = logging.getLogger("ConsumerApp") + + +class ConsumerAppCleaner(StreamsBootstrapCleaner, StreamsBootstrap): + values: ConsumerAppValues # pyright: ignore[reportIncompatibleVariableOverride] + + @property + @override + def helm_chart(self) -> str: + return ( + f"{self.repo_config.repository_name}/{AppType.CLEANUP_CONSUMER_APP.value}" + ) + + @override + async def reset(self, dry_run: bool) -> None: + await super().clean(dry_run) + + @override + async def clean(self, dry_run: bool) -> None: + await super().clean(dry_run) + + if ( + self.values.stateful_set + and self.values.persistence + and self.values.persistence.enabled + ): + await self.clean_pvcs(dry_run) + + @override + def manifest_deploy(self) -> tuple[KubernetesManifest, ...]: + values = self.to_helm_values() + if get_config().operation_mode is OperationMode.ARGO: + post_delete = ArgoHook.POST_DELETE + values = enrich_annotations(values, post_delete.key, post_delete.value) + return self._helm.template( + self.helm_release_name, + self.helm_chart, + self.namespace, + values, + self.template_flags, + ) + + @override + def manifest_reset(self) -> tuple[KubernetesManifest, ...]: + values = self.to_helm_values() + + return self._helm.template( + self.helm_release_name, + self.helm_chart, + self.namespace, + values, + self.template_flags, + ) + + async def clean_pvcs(self, dry_run: bool) -> None: + app_full_name = super(HelmApp, self).full_name + pvc_handler = PVCHandler(app_full_name, self.namespace) + await pvc_handler.delete_pvcs(dry_run) + + +class ConsumerApp(StreamsBootstrap): + """StreamsApp component that configures a streams-bootstrap app. + + :param values: streams-bootstrap Helm values + """ + + values: ConsumerAppValues # pyright: ignore[reportIncompatibleVariableOverride] + to: None = Field( # pyright: ignore[reportIncompatibleVariableOverride] + default=None, + alias="to", + title="To", + ) + + @cached_property + def _cleaner(self) -> ConsumerAppCleaner: + return ConsumerAppCleaner.from_parent(self) + + @property + @override + def input_topics(self) -> list[KafkaTopic]: + return self.values.kafka.input_topics + + @property + @override + def extra_input_topics(self) -> dict[str, list[KafkaTopic]]: + return self.values.kafka.labeled_input_topics + + @override + def add_input_topics(self, topics: list[KafkaTopic]) -> None: + self.values.kafka.add_input_topics(topics) + + @override + def add_extra_input_topics(self, label: str, topics: list[KafkaTopic]) -> None: + self.values.kafka.add_labeled_input_topics(label, topics) + + @override + def set_input_pattern(self, name: str) -> None: + self.values.kafka.input_pattern = name + + @override + def add_extra_input_pattern(self, label: str, topic: str) -> None: + self.values.kafka.labeled_input_patterns[label] = topic + + @property + @override + def helm_chart(self) -> str: + return f"{self.repo_config.repository_name}/{AppType.CONSUMER_APP.value}" + + @override + async def destroy(self, dry_run: bool) -> None: + cluster_values = await self._helm.get_values( + self.namespace, self.helm_release_name + ) + if cluster_values: + log.debug("Fetched Helm chart values from cluster") + name_override = self._cleaner.helm_name_override + try: + self._cleaner.values = self.values.model_validate(cluster_values) + self._cleaner.values.name_override = name_override + self._cleaner.values.fullname_override = name_override + except ValidationError as validation_error: + warning_msg = f"The values in the cluster are invalid with the current model. Falling back to the enriched values of {PIPELINE_YAML} and {DEFAULTS_YAML}" + log.warning(warning_msg) + debug_msg = f"Cluster values: {cluster_values}" + log.debug(debug_msg) + debug_msg = f"Validation error: {validation_error}" + log.debug(debug_msg) + + await super().destroy(dry_run) + + @override + async def reset(self, dry_run: bool) -> None: + """Destroy and reset.""" + await super().reset(dry_run) + await self._cleaner.reset(dry_run) + + @override + async def clean(self, dry_run: bool) -> None: + """Destroy and clean.""" + await super().clean(dry_run) + await self._cleaner.clean(dry_run) + + @override + def manifest_deploy(self) -> tuple[KubernetesManifest, ...]: + manifests = super().manifest_deploy() + if get_config().operation_mode is OperationMode.ARGO: + manifests = manifests + self._cleaner.manifest_deploy() + + return manifests + + @override + def manifest_reset(self) -> tuple[KubernetesManifest, ...]: + return self._cleaner.manifest_reset() + + @override + def manifest_clean(self) -> tuple[KubernetesManifest, ...]: + if get_config().operation_mode is OperationMode.MANIFEST: + return self._cleaner.manifest_deploy() + return () diff --git a/kpops/components/streams_bootstrap/consumer/model.py b/kpops/components/streams_bootstrap/consumer/model.py new file mode 100644 index 000000000..ce87074e7 --- /dev/null +++ b/kpops/components/streams_bootstrap/consumer/model.py @@ -0,0 +1,114 @@ +from typing import Annotated, Any, ClassVar + +import pydantic +from pydantic import ConfigDict, Field + +from kpops.components.common.topic import KafkaTopic, KafkaTopicStr +from kpops.components.streams_bootstrap.common.model import ( + JMXConfig, + PersistenceConfig, + PrometheusExporterConfig, + StreamsAppAutoScaling, + serialize_labeled_input_topics, + serialize_topics, +) +from kpops.components.streams_bootstrap.model import KafkaConfig, StreamsBootstrapValues +from kpops.utils.pydantic import ( + SerializeAsOptional, +) + + +class ConsumerConfig(KafkaConfig): + """consumer app kafka section. + + :param group_id: Unique consumer group ID for Kafka Streams. Required for auto-scaling. + :param input_topics: Input topics, defaults to [] + :param input_pattern: Input pattern, defaults to None + :param labeled_input_topics: Extra input topics, defaults to {} + :param labeled_input_patterns: Extra input patterns, defaults to {} + :param config: Configuration, defaults to {} + """ + + group_id: str | None = Field(default=None, title="Unique consumer group ID") + input_topics: SerializeAsOptional[ + Annotated[ + list[KafkaTopicStr], + pydantic.PlainSerializer(serialize_topics), + ] + ] = [] + input_pattern: str | None = None + labeled_input_topics: SerializeAsOptional[ + Annotated[ + dict[str, list[KafkaTopicStr]], + pydantic.PlainSerializer(serialize_labeled_input_topics), + ] + ] = {} + labeled_input_patterns: SerializeAsOptional[dict[str, str]] = {} + config: SerializeAsOptional[dict[str, Any]] = {} + + @pydantic.field_validator("input_topics", mode="before") + @classmethod + def deserialize_input_topics( + cls, input_topics: list[str] | Any + ) -> list[KafkaTopic] | Any: + if isinstance(input_topics, list): + return [KafkaTopic(name=topic_name) for topic_name in input_topics] + return input_topics + + @pydantic.field_validator("labeled_input_topics", mode="before") + @classmethod + def deserialize_labeled_input_topics( + cls, labeled_input_topics: dict[str, list[str]] | Any + ) -> dict[str, list[KafkaTopic]] | Any: + if isinstance(labeled_input_topics, dict): + return { + label: [KafkaTopic(name=topic_name) for topic_name in topics] + for label, topics in labeled_input_topics.items() + } + return labeled_input_topics + + def add_input_topics(self, topics: list[KafkaTopic]) -> None: + """Add given topics to the list of input topics. + + Ensures no duplicate topics in the list. + + :param topics: Input topics + """ + self.input_topics = KafkaTopic.deduplicate(self.input_topics + topics) + + def add_labeled_input_topics(self, label: str, topics: list[KafkaTopic]) -> None: + """Add given labeled topics that share a label to the list of extra input topics. + + Ensures no duplicate topics in the list. + + :param topics: Extra input topics + :param label: Topic label + """ + self.labeled_input_topics[label] = KafkaTopic.deduplicate( + self.labeled_input_topics.get(label, []) + topics + ) + + +class ConsumerAppValues(StreamsBootstrapValues): + """consumer-app configurations. + + The attributes correspond to keys and values that are used as values for the streams bootstrap helm chart. + + :param kafka: consumer-app kafka section + :param autoscaling: Kubernetes event-driven autoscaling config, defaults to None + :param stateful_set: Whether to use a StatefulSet instead of a Deployment to deploy the consumer app. + :param persistence: Configuration for persistent volume to store the state of the consumer app. + :param prometheus: Configuration for Prometheus JMX Exporter. + :param jmx: Configuration for JMX Exporter. + :param termination_grace_period_seconds: Delay for graceful application shutdown in seconds: https://pracucci.com/graceful-shutdown-of-kubernetes-pods.html + """ + + kafka: ConsumerConfig = ConsumerConfig() # pyright: ignore[reportIncompatibleVariableOverride] + autoscaling: StreamsAppAutoScaling | None = None + stateful_set: bool = False + persistence: PersistenceConfig | None = None + prometheus: PrometheusExporterConfig | None = None + jmx: JMXConfig | None = None + termination_grace_period_seconds: int | None = None + + model_config: ClassVar[ConfigDict] = ConfigDict(extra="allow") diff --git a/kpops/components/streams_bootstrap/streams/model.py b/kpops/components/streams_bootstrap/streams/model.py index fd5fe1cd1..bc0146669 100644 --- a/kpops/components/streams_bootstrap/streams/model.py +++ b/kpops/components/streams_bootstrap/streams/model.py @@ -1,41 +1,28 @@ from __future__ import annotations -from enum import StrEnum -from typing import Annotated, Any, ClassVar, Self +from typing import Annotated, Any, ClassVar import pydantic from pydantic import ConfigDict, Field -from kpops.components.common.kubernetes_model import ( - ImagePullPolicy, - Resources, -) from kpops.components.common.topic import KafkaTopic, KafkaTopicStr +from kpops.components.streams_bootstrap.common.model import ( + JMXConfig, + PersistenceConfig, + PrometheusExporterConfig, + StreamsAppAutoScaling, + serialize_labeled_input_topics, + serialize_topics, +) from kpops.components.streams_bootstrap.model import ( KafkaConfig, StreamsBootstrapValues, ) from kpops.utils.pydantic import ( - CamelCaseConfigModel, - DescConfigModel, SerializeAsOptional, - SerializeAsOptionalModel, ) -def serialize_topics(topics: list[KafkaTopic]) -> list[str]: - return [topic.name for topic in topics] - - -def serialize_labeled_input_topics( - labeled_input_topics: dict[str, list[KafkaTopic]], -) -> dict[str, list[str]]: - return { - label: serialize_topics(topics) - for label, topics in labeled_input_topics.items() - } - - class StreamsConfig(KafkaConfig): """streams-bootstrap kafka section. @@ -111,166 +98,6 @@ def add_labeled_input_topics(self, label: str, topics: list[KafkaTopic]) -> None ) -class StreamsAppAutoScaling( - SerializeAsOptionalModel, CamelCaseConfigModel, DescConfigModel -): - """Kubernetes Event-driven Autoscaling config. - - :param enabled: Whether to enable auto-scaling using KEDA., defaults to False - :param lag_threshold: Average target value to trigger scaling actions. - Mandatory to set when auto-scaling is enabled. - :param polling_interval: This is the interval to check each trigger on. - https://keda.sh/docs/2.9/concepts/scaling-deployments/#pollinginterval, - defaults to 30 - :param cooldown_period: The period to wait after the last trigger reported - active before scaling the resource back to 0. - https://keda.sh/docs/2.9/concepts/scaling-deployments/#cooldownperiod, - defaults to 300 - :param offset_reset_policy: The offset reset policy for the consumer if the - consumer group is not yet subscribed to a partition., - defaults to "earliest" - :param min_replicas: Minimum number of replicas KEDA will scale the resource down to. - "https://keda.sh/docs/2.9/concepts/scaling-deployments/#minreplicacount", - defaults to 0 - :param max_replicas: This setting is passed to the HPA definition that KEDA - will create for a given resource and holds the maximum number of replicas - of the target resouce. - https://keda.sh/docs/2.9/concepts/scaling-deployments/#maxreplicacount, - defaults to 1 - :param idle_replicas: If this property is set, KEDA will scale the resource - down to this number of replicas. - https://keda.sh/docs/2.9/concepts/scaling-deployments/#idlereplicacount, - defaults to None - :param internal_topics: List of auto-generated Kafka Streams topics used by the streams app, defaults to [] - :param topics: List of topics used by the streams app, defaults to [] - :param additional_triggers: List of additional KEDA triggers, - see https://keda.sh/docs/latest/scalers/, - defaults to [] - """ - - enabled: bool = False - lag_threshold: int | None = None - polling_interval: int | None = None - cooldown_period: int | None = None - offset_reset_policy: str | None = None - min_replicas: int | None = Field( - default=None, - title="Min replica count", - ) - max_replicas: int | None = Field( - default=None, - title="Max replica count", - ) - idle_replicas: int | None = Field( - default=None, - title="Idle replica count", - ) - internal_topics: SerializeAsOptional[list[str]] = [] - topics: SerializeAsOptional[list[str]] = [] - additional_triggers: SerializeAsOptional[list[str]] = [] - - model_config: ClassVar[ConfigDict] = ConfigDict(extra="allow") - - -class PersistenceConfig(CamelCaseConfigModel, DescConfigModel): - """streams-bootstrap persistence configurations. - - :param enabled: Whether to use a persistent volume to store the state of the streams app. - :param size: The size of the PersistentVolume to allocate to each streams pod in the StatefulSet. - :param storage_class: Storage class to use for the persistent volume. - """ - - enabled: bool = False - size: str | None = None - storage_class: str | None = None - - @pydantic.model_validator(mode="after") - def validate_mandatory_fields_are_set(self) -> Self: - if self.enabled and self.size is None: - msg = ( - "If app.persistence.enabled is set to true, " - "the field app.persistence.size needs to be set." - ) - raise ValueError(msg) - return self - - -class JmxRuleType(StrEnum): - GAUGE = "GAUGE" - COUNTER = "COUNTER" - UNTYPED = "UNTYPED" - - -class JMXRule(SerializeAsOptionalModel, CamelCaseConfigModel, DescConfigModel): - """JMX rule. - - :param pattern: Regex pattern to match against each bean attribute. The pattern is not anchored. Capture groups can be used in other options. Defaults to matching everything. - :param name: The metric name to set. Capture groups from the pattern can be used. If not specified, the default format will be used. If it evaluates to empty, processing of this attribute stops with no output. An Additional suffix may be added to this name (e.g _total for type COUNTER) - :param value: Value for the metric. Static values and capture groups from the pattern can be used. If not specified the scraped mBean value will be used. - :param value_factor: Optional number that value (or the scraped mBean value if value is not specified) is multiplied by, mainly used to convert mBean values from milliseconds to seconds. - :param help: Help text for the metric. Capture groups from pattern can be used. name must be set to use this. Defaults to the mBean attribute description, domain, and name of the attribute. - :param attr_name_snake_case: Converts the attribute name to snake case. This is seen in the names matched by the pattern and the default format. For example, anAttrName to an_attr_name. - :param cache: Whether to cache bean name expressions to rule computation (match and mismatch). Not recommended for rules matching on bean value, as only the value from the first scrape will be cached and re-used. This can increase performance when collecting a lot of mbeans. - :param type: The type of the metric. name must be set to use this. - :param labels: A map of label name to label value pairs. Capture groups from pattern can be used in each. name must be set to use this. Empty names and values are ignored. If not specified and the default format is not being used, no labels are set. - """ - - pattern: str | None = None - name: str | None = None - value: str | bool | int | float | None = None - value_factor: float | None = None - help: str | None = None - attr_name_snake_case: bool | None = None - cache: bool | None = None - type: JmxRuleType | None = None - labels: SerializeAsOptional[dict[str, str]] = {} - - -class PrometheusExporterConfig(CamelCaseConfigModel, DescConfigModel): - """Prometheus JMX exporter configuration. - - :param jmx: The prometheus JMX exporter configuration. - - """ - - class PrometheusJMXExporterConfig( - SerializeAsOptionalModel, CamelCaseConfigModel, DescConfigModel - ): - """Prometheus JMX exporter configuration. - - :param enabled: Whether to install Prometheus JMX Exporter as a sidecar container and expose JMX metrics to Prometheus. - :param image: Docker Image for Prometheus JMX Exporter container. - :param image_tag: Docker Image Tag for Prometheus JMX Exporter container. - :param image_pull_policy: Docker Image Pull Policy for Prometheus JMX Exporter container. - :param port: JMX Exporter Port which exposes metrics in Prometheus format for scraping. - :param resources: JMX Exporter resources configuration. - :param metric_rules: List of JMX metric rules. - """ - - enabled: bool | None = None - image: str | None = None - image_tag: str | None = None - image_pull_policy: ImagePullPolicy | None = None - port: int | None = None - resources: Resources | None = None - metric_rules: SerializeAsOptional[list[JMXRule]] = [] - - jmx: PrometheusJMXExporterConfig | None = None - - -class JMXConfig(CamelCaseConfigModel, DescConfigModel): - """JMX configuration options. - - :param enabled: Whether or not to open JMX port for remote access (e.g., for debugging) - :param host: The host to use for JMX remote access. - :param port: The JMX port which JMX style metrics are exposed. - """ - - enabled: bool | None = None - host: str | None = None - port: int | None = None - - class StreamsAppValues(StreamsBootstrapValues): """streams-bootstrap app configurations. diff --git a/tests/api/test_registry.py b/tests/api/test_registry.py index f314694bd..2d5859ac7 100644 --- a/tests/api/test_registry.py +++ b/tests/api/test_registry.py @@ -17,6 +17,7 @@ from kpops.components.base_components.kubernetes_app import KubernetesApp from kpops.components.base_components.pipeline_component import PipelineComponent from kpops.components.streams_bootstrap import ( + ConsumerApp, ProducerApp, StreamsApp, StreamsBootstrap, @@ -107,6 +108,7 @@ def test_registry(): "pipeline-component": PipelineComponent, "producer-app-v2": ProducerAppV2, "producer-app": ProducerApp, + "consumer-app": ConsumerApp, "streams-app-v2": StreamsAppV2, "streams-app": StreamsApp, "streams-bootstrap-v2": StreamsBootstrapV2, diff --git a/tests/components/streams_bootstrap/test_consumer_app.py b/tests/components/streams_bootstrap/test_consumer_app.py new file mode 100644 index 000000000..9ed864282 --- /dev/null +++ b/tests/components/streams_bootstrap/test_consumer_app.py @@ -0,0 +1,873 @@ +import logging +import re +from collections.abc import AsyncIterator +from pathlib import Path +from unittest.mock import ANY, MagicMock + +import pytest +from lightkube.models.core_v1 import ( + PersistentVolumeClaim, + PersistentVolumeClaimSpec, + PersistentVolumeClaimStatus, +) +from lightkube.models.meta_v1 import ObjectMeta +from pydantic import ValidationError +from pytest_mock import MockerFixture + +from kpops.component_handlers.helm_wrapper.helm import Helm +from kpops.component_handlers.helm_wrapper.model import ( + HelmUpgradeInstallFlags, +) +from kpops.component_handlers.helm_wrapper.utils import create_helm_release_name +from kpops.component_handlers.kubernetes.pvc_handler import PVCHandler +from kpops.components.base_components.models import TopicName +from kpops.components.base_components.models.to_section import ( + ToSection, +) +from kpops.components.common.topic import ( + KafkaTopic, + OutputTopicTypes, + TopicConfig, +) +from kpops.components.streams_bootstrap import ConsumerApp +from kpops.components.streams_bootstrap.consumer.consumer_app import ( + ConsumerAppCleaner, +) +from kpops.components.streams_bootstrap.streams.model import ( + PersistenceConfig, + StreamsAppAutoScaling, +) + +RESOURCES_PATH = Path(__file__).parent / "resources" + +NAMESPACE = "test-namespace" +PREFIX = "${pipeline.name}-" +CONSUMER_APP_NAME = "test-consumer-app-with-long-name-0123456789abcdefghijklmnop" +CONSUMER_APP_FULL_NAME = PREFIX + CONSUMER_APP_NAME +CONSUMER_APP_HELM_NAME_OVERRIDE = ( + PREFIX + "test-consumer-app-with-long-name-0123456-6f3a6" +) +CONSUMER_APP_RELEASE_NAME = create_helm_release_name(CONSUMER_APP_FULL_NAME) +CONSUMER_APP_CLEAN_FULL_NAME = CONSUMER_APP_FULL_NAME + "-clean" +CONSUMER_APP_CLEAN_HELM_NAME_OVERRIDE = ( + PREFIX + "test-consumer-app-with-long-name-0-7034d-clean" +) +CONSUMER_APP_CLEAN_RELEASE_NAME = create_helm_release_name( + CONSUMER_APP_CLEAN_FULL_NAME, "-clean" +) + +log = logging.getLogger("TestConsumerApp") + + +@pytest.mark.usefixtures("mock_env") +class TestConsumerApp: + def test_release_name(self): + assert CONSUMER_APP_CLEAN_RELEASE_NAME.endswith("-clean") + + @pytest.fixture() + def consumer_app(self) -> ConsumerApp: + return ConsumerApp.model_validate( + { + "name": CONSUMER_APP_NAME, + "namespace": NAMESPACE, + "values": { + "image": "consumerApp", + "kafka": {"bootstrapServers": "fake-broker:9092"}, + }, + }, + ) + + @pytest.fixture() + def stateful_consumer_app(self) -> ConsumerApp: + return ConsumerApp.model_validate( + { + "name": CONSUMER_APP_NAME, + "namespace": NAMESPACE, + "values": { + "image": "consumerApp", + "statefulSet": True, + "persistence": { + "enabled": True, + "size": "5Gi", + "storageClass": "foo", + }, + "kafka": { + "bootstrapServers": "fake-broker:9092", + }, + }, + }, + ) + + @pytest.fixture() + def dry_run_handler_mock(self, mocker: MockerFixture) -> MagicMock: + return mocker.patch( + "kpops.components.base_components.helm_app.DryRunHandler" + ).return_value + + @pytest.fixture(autouse=True) + def empty_helm_get_values(self, mocker: MockerFixture) -> MagicMock: + return mocker.patch.object(Helm, "get_values", return_value=None) + + def test_cleaner(self, consumer_app: ConsumerApp): + cleaner = consumer_app._cleaner + assert isinstance(cleaner, ConsumerAppCleaner) + assert not hasattr(cleaner, "_cleaner") + + def test_cleaner_inheritance(self, consumer_app: ConsumerApp): + consumer_app.values.kafka.group_id = "test-group-id" + consumer_app.values.autoscaling = StreamsAppAutoScaling( + enabled=True, + lag_threshold=100, + idle_replicas=1, + ) + assert consumer_app._cleaner.values == consumer_app.values + + def test_cleaner_helm_release_name(self, consumer_app: ConsumerApp): + assert ( + consumer_app._cleaner.helm_release_name + == "${pipeline.name}-test-consumer-app-with-l-7034d-clean" + ) + + def test_cleaner_helm_name_override(self, consumer_app: ConsumerApp): + assert ( + consumer_app._cleaner.to_helm_values()["nameOverride"] + == CONSUMER_APP_CLEAN_HELM_NAME_OVERRIDE + ) + assert ( + consumer_app._cleaner.to_helm_values()["fullnameOverride"] + == CONSUMER_APP_CLEAN_HELM_NAME_OVERRIDE + ) + + def test_set_topics(self): + consumer_app = ConsumerApp.model_validate( + { + "name": CONSUMER_APP_NAME, + "namespace": NAMESPACE, + "values": { + "image": "consumerApp", + "kafka": {"bootstrapServers": "fake-broker:9092"}, + }, + "from": { + "topics": { + "example-input": {"type": "input"}, + "b": {"type": "input"}, + "a": {"type": "input"}, + "topic-extra2": {"label": "role2"}, + "topic-extra3": {"label": "role2"}, + "topic-extra": {"label": "role1"}, + ".*": {"type": "pattern"}, + "example.*": { + "type": "pattern", + "label": "another-pattern", + }, + } + }, + }, + ) + assert consumer_app.values.kafka.input_topics == [ + KafkaTopic(name="example-input"), + KafkaTopic(name="b"), + KafkaTopic(name="a"), + ] + assert consumer_app.values.kafka.labeled_input_topics == { + "role1": [KafkaTopic(name="topic-extra")], + "role2": [KafkaTopic(name="topic-extra2"), KafkaTopic(name="topic-extra3")], + } + assert consumer_app.values.kafka.input_pattern == ".*" + assert consumer_app.values.kafka.labeled_input_patterns == { + "another-pattern": "example.*" + } + + helm_values = consumer_app.to_helm_values() + kafka_config = helm_values["kafka"] + assert kafka_config["inputTopics"] + assert "labeledInputTopics" in kafka_config + assert "inputPattern" in kafka_config + assert "labeledInputPatterns" in kafka_config + + def test_no_empty_input_topic(self): + consumer_app = ConsumerApp.model_validate( + { + "name": CONSUMER_APP_NAME, + "namespace": NAMESPACE, + "values": { + "image": "consumerApp", + "kafka": {"bootstrapServers": "fake-broker:9092"}, + }, + "from": { + "topics": { + ".*": {"type": "pattern"}, + } + }, + }, + ) + assert not consumer_app.values.kafka.labeled_input_topics + assert not consumer_app.values.kafka.input_topics + assert consumer_app.values.kafka.input_pattern == ".*" + assert not consumer_app.values.kafka.labeled_input_patterns + + helm_values = consumer_app.to_helm_values() + streams_config = helm_values["kafka"] + assert "inputTopics" not in streams_config + assert "extraInputTopics" not in streams_config + assert "inputPattern" in streams_config + assert "extraInputPatterns" not in streams_config + + def test_should_validate(self): + # An exception should be raised when both label and type are defined and type is input + with pytest.raises( + ValueError, match="Define label only if `type` is `pattern` or `None`" + ): + assert ConsumerApp.model_validate( + { + "name": CONSUMER_APP_NAME, + "namespace": NAMESPACE, + "values": { + "kafka": {"bootstrapServers": "fake-broker:9092"}, + }, + "from": { + "topics": { + "topic-input": { + "type": "input", + "label": "role", + } + } + }, + }, + ) + + def test_weave_inputs_from_prev_component(self): + consumer_app = ConsumerApp.model_validate( + { + "name": CONSUMER_APP_NAME, + "namespace": NAMESPACE, + "values": { + "image": "consumerApp", + "kafka": {"bootstrapServers": "fake-broker:9092"}, + }, + }, + ) + + consumer_app.weave_from_topics( + ToSection( + topics={ + TopicName("prev-output-topic"): TopicConfig( + type=OutputTopicTypes.OUTPUT, partitions_count=10 + ), + TopicName("b"): TopicConfig( + type=OutputTopicTypes.OUTPUT, partitions_count=10 + ), + TopicName("a"): TopicConfig( + type=OutputTopicTypes.OUTPUT, partitions_count=10 + ), + TopicName("prev-error-topic"): TopicConfig( + type=OutputTopicTypes.ERROR, partitions_count=10 + ), + } + ) + ) + + assert consumer_app.values.kafka.input_topics == [ + KafkaTopic(name="prev-output-topic"), + KafkaTopic(name="b"), + KafkaTopic(name="a"), + ] + + async def test_deploy_order_when_dry_run_is_false(self, mocker: MockerFixture): + consumer_app = ConsumerApp.model_validate( + { + "name": CONSUMER_APP_NAME, + "namespace": NAMESPACE, + "values": { + "image": "consumerApp", + "kafka": {"bootstrapServers": "fake-broker:9092"}, + }, + }, + ) + mock_helm_upgrade_install = mocker.patch.object( + consumer_app._helm, "upgrade_install" + ) + + mock = mocker.AsyncMock() + mock.attach_mock(mock_helm_upgrade_install, "helm_upgrade_install") + + dry_run = False + await consumer_app.deploy(dry_run=dry_run) + + # Ensure no outputs are attached to a ConsumerApp deployment + assert consumer_app.to is None + + assert mock.mock_calls == [ + mocker.call.helm_upgrade_install( + CONSUMER_APP_RELEASE_NAME, + "bakdata-streams-bootstrap/consumer-app", + dry_run, + NAMESPACE, + { + "nameOverride": CONSUMER_APP_HELM_NAME_OVERRIDE, + "fullnameOverride": CONSUMER_APP_HELM_NAME_OVERRIDE, + "image": "consumerApp", + "kafka": { + "bootstrapServers": "fake-broker:9092", + }, + }, + HelmUpgradeInstallFlags( + create_namespace=False, + force=False, + username=None, + password=None, + ca_file=None, + insecure_skip_tls_verify=False, + timeout="5m0s", + version="3.6.1", + wait=True, + wait_for_jobs=False, + ), + ), + ] + + async def test_destroy( + self, + consumer_app: ConsumerApp, + mocker: MockerFixture, + ): + mock_helm_uninstall = mocker.patch.object(consumer_app._helm, "uninstall") + + await consumer_app.destroy(dry_run=True) + + mock_helm_uninstall.assert_called_once_with( + NAMESPACE, CONSUMER_APP_RELEASE_NAME, True + ) + + async def test_reset_when_dry_run_is_false( + self, + consumer_app: ConsumerApp, + empty_helm_get_values: MockerFixture, + mocker: MockerFixture, + ): + mock = mocker.MagicMock() + mock_helm_upgrade_install = mocker.patch.object(Helm, "upgrade_install") + mock.attach_mock(mock_helm_upgrade_install, "helm_upgrade_install") + mock_helm_uninstall = mocker.patch.object(Helm, "uninstall") + mock.attach_mock(mock_helm_uninstall, "helm_uninstall") + + dry_run = False + await consumer_app.reset(dry_run=dry_run) + + assert mock.mock_calls == [ + mocker.call.helm_uninstall(NAMESPACE, CONSUMER_APP_RELEASE_NAME, dry_run), + ANY, # __bool__ + ANY, # __str__ + mocker.call.helm_uninstall( + NAMESPACE, + CONSUMER_APP_CLEAN_RELEASE_NAME, + dry_run, + ), + ANY, # __bool__ + ANY, # __str__ + mocker.call.helm_upgrade_install( + CONSUMER_APP_CLEAN_RELEASE_NAME, + "bakdata-streams-bootstrap/consumer-app-cleanup-job", + dry_run, + NAMESPACE, + { + "nameOverride": CONSUMER_APP_CLEAN_HELM_NAME_OVERRIDE, + "fullnameOverride": CONSUMER_APP_CLEAN_HELM_NAME_OVERRIDE, + "image": "consumerApp", + "kafka": { + "bootstrapServers": "fake-broker:9092", + }, + }, + HelmUpgradeInstallFlags(version="3.6.1", wait=True, wait_for_jobs=True), + ), + mocker.call.helm_uninstall( + NAMESPACE, + CONSUMER_APP_CLEAN_RELEASE_NAME, + dry_run, + ), + ANY, # __bool__ + ANY, # __str__ + ] + + async def test_should_clean_consumer_app_and_deploy_clean_up_job_and_delete_clean_up( + self, + consumer_app: ConsumerApp, + empty_helm_get_values: MockerFixture, + mocker: MockerFixture, + ): + mock = mocker.MagicMock() + mock_helm_upgrade_install = mocker.patch.object(Helm, "upgrade_install") + mock.attach_mock(mock_helm_upgrade_install, "helm_upgrade_install") + mock_helm_uninstall = mocker.patch.object(Helm, "uninstall") + mock.attach_mock(mock_helm_uninstall, "helm_uninstall") + + dry_run = False + await consumer_app.clean(dry_run=dry_run) + + assert mock.mock_calls == [ + mocker.call.helm_uninstall(NAMESPACE, CONSUMER_APP_RELEASE_NAME, dry_run), + ANY, # __bool__ + ANY, # __str__ + mocker.call.helm_uninstall( + NAMESPACE, + CONSUMER_APP_CLEAN_RELEASE_NAME, + dry_run, + ), + ANY, # __bool__ + ANY, # __str__ + mocker.call.helm_upgrade_install( + CONSUMER_APP_CLEAN_RELEASE_NAME, + "bakdata-streams-bootstrap/consumer-app-cleanup-job", + dry_run, + NAMESPACE, + { + "nameOverride": CONSUMER_APP_CLEAN_HELM_NAME_OVERRIDE, + "fullnameOverride": CONSUMER_APP_CLEAN_HELM_NAME_OVERRIDE, + "image": "consumerApp", + "kafka": { + "bootstrapServers": "fake-broker:9092", + }, + }, + HelmUpgradeInstallFlags(version="3.6.1", wait=True, wait_for_jobs=True), + ), + mocker.call.helm_uninstall( + NAMESPACE, + CONSUMER_APP_CLEAN_RELEASE_NAME, + dry_run, + ), + ANY, # __bool__ + ANY, # __str__ + ] + + async def test_should_deploy_clean_up_job_with_values_in_cluster_when_reset( + self, mocker: MockerFixture + ): + image_tag_in_cluster = "1.1.1" + mocker.patch.object( + Helm, + "get_values", + return_value={ + "image": "registry/consumer-app", + "imageTag": image_tag_in_cluster, + "nameOverride": CONSUMER_APP_NAME, + "fullnameOverride": CONSUMER_APP_NAME, + "replicaCount": 1, + "persistence": {"enabled": False, "size": "1Gi"}, + "statefulSet": False, + "kafka": { + "bootstrapServers": "fake-broker:9092", + "inputTopics": ["test-input-topic"], + "schemaRegistryUrl": "http://localhost:8081", + }, + }, + ) + consumer_app = ConsumerApp.model_validate( + { + "name": CONSUMER_APP_NAME, + "namespace": NAMESPACE, + "values": { + "image": "registry/consumer-app", + "imageTag": "2.2.2", + "kafka": {"bootstrapServers": "fake-broker:9092"}, + }, + "from": { + "topics": { + "test-input-topic": {"type": "input"}, + } + }, + }, + ) + + mocker.patch.object(consumer_app._helm, "uninstall") + + mock_helm_upgrade_install = mocker.patch.object( + consumer_app._cleaner._helm, "upgrade_install" + ) + mocker.patch.object(consumer_app._cleaner._helm, "uninstall") + + dry_run = False + await consumer_app.reset(dry_run=dry_run) + + mock_helm_upgrade_install.assert_called_once_with( + CONSUMER_APP_CLEAN_RELEASE_NAME, + "bakdata-streams-bootstrap/consumer-app-cleanup-job", + dry_run, + NAMESPACE, + { + "image": "registry/consumer-app", + "nameOverride": CONSUMER_APP_CLEAN_HELM_NAME_OVERRIDE, + "fullnameOverride": CONSUMER_APP_CLEAN_HELM_NAME_OVERRIDE, + "imageTag": image_tag_in_cluster, + "persistence": {"size": "1Gi"}, + "replicaCount": 1, + "kafka": { + "bootstrapServers": "fake-broker:9092", + "inputTopics": ["test-input-topic"], + "schemaRegistryUrl": "http://localhost:8081", + }, + }, + HelmUpgradeInstallFlags(version="3.6.1", wait=True, wait_for_jobs=True), + ) + + async def test_should_deploy_clean_up_job_with_values_in_cluster_when_clean( + self, mocker: MockerFixture + ): + image_tag_in_cluster = "1.1.1" + mocker.patch.object( + Helm, + "get_values", + return_value={ + "image": "registry/consumer-app", + "imageTag": image_tag_in_cluster, + "nameOverride": CONSUMER_APP_NAME, + "fullnameOverride": CONSUMER_APP_NAME, + "replicaCount": 1, + "persistence": {"enabled": False, "size": "1Gi"}, + "statefulSet": False, + "kafka": { + "bootstrapServers": "fake-broker:9092", + "inputTopics": ["test-input-topic"], + "schemaRegistryUrl": "http://localhost:8081", + }, + }, + ) + consumer_app = ConsumerApp.model_validate( + { + "name": CONSUMER_APP_NAME, + "namespace": NAMESPACE, + "values": { + "image": "registry/consumer-app", + "imageTag": "2.2.2", + "kafka": {"bootstrapServers": "fake-broker:9092"}, + }, + "from": { + "topics": { + "test-input-topic": {"type": "input"}, + } + }, + }, + ) + + mocker.patch.object(consumer_app._helm, "uninstall") + + mock_helm_upgrade_install = mocker.patch.object( + consumer_app._cleaner._helm, "upgrade_install" + ) + mocker.patch.object(consumer_app._cleaner._helm, "uninstall") + + dry_run = False + await consumer_app.clean(dry_run=dry_run) + + mock_helm_upgrade_install.assert_called_once_with( + CONSUMER_APP_CLEAN_RELEASE_NAME, + "bakdata-streams-bootstrap/consumer-app-cleanup-job", + dry_run, + NAMESPACE, + { + "image": "registry/consumer-app", + "nameOverride": CONSUMER_APP_CLEAN_HELM_NAME_OVERRIDE, + "fullnameOverride": CONSUMER_APP_CLEAN_HELM_NAME_OVERRIDE, + "imageTag": image_tag_in_cluster, + "persistence": {"size": "1Gi"}, + "replicaCount": 1, + "kafka": { + "bootstrapServers": "fake-broker:9092", + "inputTopics": ["test-input-topic"], + "schemaRegistryUrl": "http://localhost:8081", + }, + }, + HelmUpgradeInstallFlags(version="3.6.1", wait=True, wait_for_jobs=True), + ) + + async def test_get_input_topics(self): + consumer_app = ConsumerApp.model_validate( + { + "name": "my-app", + "namespace": NAMESPACE, + "values": { + "image": "registry/consumer-app", + "kafka": {"bootstrapServers": "fake-broker:9092"}, + }, + "from": { + "topics": { + "example-input": {"type": "input"}, + "b": {"type": "input"}, + "a": {"type": "input"}, + "topic-extra2": {"label": "role2"}, + "topic-extra3": {"label": "role2"}, + "topic-extra": {"label": "role1"}, + ".*": {"type": "pattern"}, + "example.*": { + "type": "pattern", + "label": "another-pattern", + }, + } + }, + }, + ) + + assert consumer_app.values.kafka.input_topics == [ + KafkaTopic(name="example-input"), + KafkaTopic(name="b"), + KafkaTopic(name="a"), + ] + assert consumer_app.values.kafka.labeled_input_topics == { + "role1": [KafkaTopic(name="topic-extra")], + "role2": [KafkaTopic(name="topic-extra2"), KafkaTopic(name="topic-extra3")], + } + + # Verify no outputs are registered + assert consumer_app.to is None + assert not getattr(consumer_app, "outputs", None) or not list( + consumer_app.outputs + ) + + assert list(consumer_app.inputs) == [ + KafkaTopic(name="example-input"), + KafkaTopic(name="b"), + KafkaTopic(name="a"), + KafkaTopic(name="topic-extra2"), + KafkaTopic(name="topic-extra3"), + KafkaTopic(name="topic-extra"), + ] + + def test_raise_validation_error_when_persistence_enabled_and_size_not_set( + self, stateful_consumer_app: ConsumerApp + ): + with pytest.raises( + ValidationError, + match=re.escape( + "If app.persistence.enabled is set to true, the field app.persistence.size needs to be set." + ), + ): + stateful_consumer_app.values.persistence = PersistenceConfig(enabled=True) + + def test_generate(self, stateful_consumer_app: ConsumerApp): + assert stateful_consumer_app.generate() == { + "helm_name_override": CONSUMER_APP_HELM_NAME_OVERRIDE, + "helm_release_name": CONSUMER_APP_RELEASE_NAME, + "name": CONSUMER_APP_NAME, + "enabled": True, + "namespace": NAMESPACE, + "prefix": PREFIX, + "type": "consumer-app", + "values": { + "image": "consumerApp", + "kafka": { + "bootstrapServers": "fake-broker:9092", + }, + "persistence": {"enabled": True, "size": "5Gi", "storageClass": "foo"}, + "statefulSet": True, + }, + "version": "3.6.1", + } + + @pytest.fixture() + def pvc1(self) -> PersistentVolumeClaim: + return PersistentVolumeClaim( + apiVersion="v1", + kind="PersistentVolumeClaim", + metadata=ObjectMeta(name="test-pvc1"), + spec=PersistentVolumeClaimSpec(), + status=PersistentVolumeClaimStatus(), + ) + + @pytest.fixture() + def pvc2(self) -> PersistentVolumeClaim: + return PersistentVolumeClaim( + apiVersion="v1", + kind="PersistentVolumeClaim", + metadata=ObjectMeta(name="test-pvc2"), + spec=PersistentVolumeClaimSpec(), + status=PersistentVolumeClaimStatus(), + ) + + @pytest.fixture() + def pvc3(self) -> PersistentVolumeClaim: + return PersistentVolumeClaim( + apiVersion="v1", + kind="PersistentVolumeClaim", + metadata=ObjectMeta(name="test-pvc3"), + spec=PersistentVolumeClaimSpec(), + status=PersistentVolumeClaimStatus(), + ) + + @pytest.fixture() + def mock_list_pvcs( + self, + mocker: MockerFixture, + pvc1: PersistentVolumeClaim, + pvc2: PersistentVolumeClaim, + pvc3: PersistentVolumeClaim, + ) -> MagicMock: + async def async_generator_side_effect() -> AsyncIterator[PersistentVolumeClaim]: + yield pvc1 + yield pvc2 + yield pvc3 + + return mocker.patch.object( + PVCHandler, "list_pvcs", side_effect=async_generator_side_effect + ) + + @pytest.mark.usefixtures("kubeconfig") + async def test_stateful_clean_with_dry_run_false( + self, + stateful_consumer_app: ConsumerApp, + empty_helm_get_values: MockerFixture, + mock_list_pvcs: MagicMock, + mocker: MockerFixture, + ): + mock = MagicMock() + mock_helm_upgrade_install = mocker.patch.object(Helm, "upgrade_install") + mock.attach_mock(mock_helm_upgrade_install, "helm_upgrade_install") + mock_helm_uninstall = mocker.patch.object(Helm, "uninstall") + mock.attach_mock(mock_helm_uninstall, "helm_uninstall") + mock_delete_pvcs = mocker.patch.object(PVCHandler, "delete_pvcs") + mock.attach_mock(mock_delete_pvcs, "delete_pvcs") + + dry_run = False + await stateful_consumer_app.clean(dry_run=dry_run) + + assert mock.mock_calls == [ + mocker.call.helm_uninstall(NAMESPACE, CONSUMER_APP_RELEASE_NAME, dry_run), + ANY, # __bool__ + ANY, # __str__ + mocker.call.helm_uninstall( + NAMESPACE, + CONSUMER_APP_CLEAN_RELEASE_NAME, + dry_run, + ), + ANY, # __bool__ + ANY, # __str__ + mocker.call.helm_upgrade_install( + CONSUMER_APP_CLEAN_RELEASE_NAME, + "bakdata-streams-bootstrap/consumer-app-cleanup-job", + dry_run, + NAMESPACE, + { + "nameOverride": CONSUMER_APP_CLEAN_HELM_NAME_OVERRIDE, + "fullnameOverride": CONSUMER_APP_CLEAN_HELM_NAME_OVERRIDE, + "image": "consumerApp", + "kafka": { + "bootstrapServers": "fake-broker:9092", + }, + "statefulSet": True, + "persistence": { + "enabled": True, + "size": "5Gi", + "storageClass": "foo", + }, + }, + HelmUpgradeInstallFlags(version="3.6.1", wait=True, wait_for_jobs=True), + ), + mocker.call.helm_uninstall( + NAMESPACE, + CONSUMER_APP_CLEAN_RELEASE_NAME, + dry_run, + ), + ANY, # __bool__ + ANY, # __str__ + mocker.call.delete_pvcs(False), + ] + + @pytest.mark.usefixtures("kubeconfig") + async def test_stateful_clean_with_dry_run_true( + self, + stateful_consumer_app: ConsumerApp, + empty_helm_get_values: MockerFixture, + mocker: MockerFixture, + mock_list_pvcs: MagicMock, + caplog: pytest.LogCaptureFixture, + ): + caplog.set_level(logging.DEBUG) + # actual component + mocker.patch.object(stateful_consumer_app, "destroy") + + cleaner = stateful_consumer_app._cleaner + assert isinstance(cleaner, ConsumerAppCleaner) + + mocker.patch.object(cleaner, "destroy") + mocker.patch.object(cleaner, "deploy") + + dry_run = True + await stateful_consumer_app.clean(dry_run=dry_run) + + mock_list_pvcs.assert_called_once() + assert ( + f"Deleting in namespace 'test-namespace' StatefulSet '{CONSUMER_APP_FULL_NAME}' PVCs ['test-pvc1', 'test-pvc2', 'test-pvc3']" + in caplog.text + ) + + async def test_clean_should_fall_back_to_local_values_when_validation_of_cluster_values_fails( + self, + mocker: MockerFixture, + caplog: pytest.LogCaptureFixture, + ): + caplog.set_level(logging.WARNING) + + # invalid model + mocker.patch.object( + Helm, + "get_values", + return_value={ + "image": "registry/producer-app", + "imageTag": "1.1.1", + "nameOverride": CONSUMER_APP_NAME, + "fullnameOverride": CONSUMER_APP_NAME, + "streams": { + "brokers": "fake-broker:9092", + "inputTopics": ["test-input-topic"], + "schemaRegistryUrl": "http://localhost:8081", + }, + }, + ) + + consumer_app = ConsumerApp.model_validate( + { + "name": CONSUMER_APP_NAME, + "namespace": NAMESPACE, + "values": { + "image": "registry/consumer-app", + "imageTag": "2.2.2", + "kafka": {"bootstrapServers": "fake-broker:9092"}, + }, + "from": { + "topics": { + "test-input-topic": {"type": "input"}, + } + }, + }, + ) + + mocker.patch.object(consumer_app._helm, "uninstall") + + mock_helm_upgrade_install = mocker.patch.object( + consumer_app._cleaner._helm, "upgrade_install" + ) + mocker.patch.object(consumer_app._cleaner._helm, "uninstall") + + dry_run = False + await consumer_app.clean(dry_run=dry_run) + + assert ( + "The values in the cluster are invalid with the current model. Falling back to the enriched values of pipeline.yaml and defaults.yaml" + in caplog.text + ) + + mock_helm_upgrade_install.assert_called_once_with( + CONSUMER_APP_CLEAN_RELEASE_NAME, + "bakdata-streams-bootstrap/consumer-app-cleanup-job", + dry_run, + NAMESPACE, + { + "image": "registry/consumer-app", + "nameOverride": CONSUMER_APP_CLEAN_HELM_NAME_OVERRIDE, + "fullnameOverride": CONSUMER_APP_CLEAN_HELM_NAME_OVERRIDE, + "imageTag": "2.2.2", + "kafka": { + "bootstrapServers": "fake-broker:9092", + "inputTopics": ["test-input-topic"], + }, + }, + HelmUpgradeInstallFlags(version="3.6.1", wait=True, wait_for_jobs=True), + )