Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions backend/pkg/config/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ import (
// Kafka required for opening a connection to Kafka
type Kafka struct {
// General
Brokers []string `yaml:"brokers"`
ClientID string `yaml:"clientId"`
RackID string `yaml:"rackId"`
Brokers []string `yaml:"brokers"`
ClientID string `yaml:"clientId"`
RackID string `yaml:"rackId"`
KafkaNextGenBalancer bool `yaml:"kafkaNextGenBalancer"`

TLS TLS `yaml:"tls"`
SASL KafkaSASL `yaml:"sasl"`
Expand Down
10 changes: 10 additions & 0 deletions backend/pkg/factory/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ import (
loggerpkg "github.com/redpanda-data/console/backend/pkg/logger"
)

type contextKey string

const optInKafkaNextGenBalancerBeta contextKey = "opt_in_kafka_next_gen_balancer_beta"

// ClientFactory defines the interface for creating and retrieving Kafka clients.
type ClientFactory interface {
// GetKafkaClient retrieves a Kafka client based on the context.
Expand Down Expand Up @@ -144,6 +148,12 @@ func NewKgoConfig(cfg config.Kafka, logger *slog.Logger, metricsNamespace string
kgo.WithHooks(metricHooks),
}

// Add context with opt_in_kafka_next_gen_balancer_beta option if enabled
if cfg.KafkaNextGenBalancer {
ctx := context.WithValue(context.Background(), optInKafkaNextGenBalancerBeta, true)
opts = append(opts, kgo.WithContext(ctx))
}

// Add Rack Awareness if configured
if cfg.RackID != "" {
opts = append(opts, kgo.Rack(cfg.RackID))
Expand Down
5 changes: 5 additions & 0 deletions docs/config/console.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#
# Some examples:
# kafka.rackId => KAFKA_RACKID
# kafka.kafkaNextGenBalancer => KAFKA_KAFKANEXTGENBALANCER
# kafka.tls.caFilepath => KAFKA_TLS_CAFILEPATH
#
# --- Note
Expand All @@ -38,6 +39,10 @@ kafka:
# clientId: "console"
# Optional: Rack identifier to optimize message consumption in multi-zone clusters.
# rackId: "zone-a"
# Optional: Enable Kafka's next-generation balancer (beta feature from franz-go v1.19.0+).
# This enables improved partition assignment and rebalancing behavior.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have to educate myself on the next-gen partition balancer, but I'm wondering how is this relevant to Console given that Console does not do any consumer group style consuming?

# Defaults to false. Set to true to opt-in to the beta balancer.
# kafkaNextGenBalancer: false
# sasl:
# enabled: true
# Supported mechanisms include:
Expand Down