From 202c1ce5ed53bce9d0274e881ce0096bc1db151a Mon Sep 17 00:00:00 2001 From: "tal.asulin" Date: Mon, 29 Dec 2025 22:46:58 +0200 Subject: [PATCH] adding support for kafka kip-848 for franz-go --- backend/pkg/config/kafka.go | 7 ++++--- backend/pkg/factory/kafka/kafka.go | 10 ++++++++++ docs/config/console.yaml | 5 +++++ 3 files changed, 19 insertions(+), 3 deletions(-) diff --git a/backend/pkg/config/kafka.go b/backend/pkg/config/kafka.go index 11e95346a..47225938d 100644 --- a/backend/pkg/config/kafka.go +++ b/backend/pkg/config/kafka.go @@ -18,9 +18,10 @@ import ( // Kafka required for opening a connection to Kafka type Kafka struct { // General - Brokers []string `yaml:"brokers"` - ClientID string `yaml:"clientId"` - RackID string `yaml:"rackId"` + Brokers []string `yaml:"brokers"` + ClientID string `yaml:"clientId"` + RackID string `yaml:"rackId"` + KafkaNextGenBalancer bool `yaml:"kafkaNextGenBalancer"` TLS TLS `yaml:"tls"` SASL KafkaSASL `yaml:"sasl"` diff --git a/backend/pkg/factory/kafka/kafka.go b/backend/pkg/factory/kafka/kafka.go index fb34a279d..810f7e305 100644 --- a/backend/pkg/factory/kafka/kafka.go +++ b/backend/pkg/factory/kafka/kafka.go @@ -44,6 +44,10 @@ import ( loggerpkg "github.com/redpanda-data/console/backend/pkg/logger" ) +type contextKey string + +const optInKafkaNextGenBalancerBeta contextKey = "opt_in_kafka_next_gen_balancer_beta" + // ClientFactory defines the interface for creating and retrieving Kafka clients. type ClientFactory interface { // GetKafkaClient retrieves a Kafka client based on the context. @@ -144,6 +148,12 @@ func NewKgoConfig(cfg config.Kafka, logger *slog.Logger, metricsNamespace string kgo.WithHooks(metricHooks), } + // Add context with opt_in_kafka_next_gen_balancer_beta option if enabled + if cfg.KafkaNextGenBalancer { + ctx := context.WithValue(context.Background(), optInKafkaNextGenBalancerBeta, true) + opts = append(opts, kgo.WithContext(ctx)) + } + // Add Rack Awareness if configured if cfg.RackID != "" { opts = append(opts, kgo.Rack(cfg.RackID)) diff --git a/docs/config/console.yaml b/docs/config/console.yaml index dfceb0456..6e41e7698 100644 --- a/docs/config/console.yaml +++ b/docs/config/console.yaml @@ -19,6 +19,7 @@ # # Some examples: # kafka.rackId => KAFKA_RACKID +# kafka.kafkaNextGenBalancer => KAFKA_KAFKANEXTGENBALANCER # kafka.tls.caFilepath => KAFKA_TLS_CAFILEPATH # # --- Note @@ -38,6 +39,10 @@ kafka: # clientId: "console" # Optional: Rack identifier to optimize message consumption in multi-zone clusters. # rackId: "zone-a" + # Optional: Enable Kafka's next-generation balancer (beta feature from franz-go v1.19.0+). + # This enables improved partition assignment and rebalancing behavior. + # Defaults to false. Set to true to opt-in to the beta balancer. + # kafkaNextGenBalancer: false # sasl: # enabled: true # Supported mechanisms include: