Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions cmd/topicmappr/commands/rebalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"os"

"github.com/DataDog/kafka-kit/v4/kafkaadmin"

"github.com/spf13/cobra"
)
Expand Down Expand Up @@ -57,8 +56,7 @@ func rebalance(cmd *cobra.Command, _ []string) {
defer zk.Close()

// Init kafkaadmin client.
bs := cmd.Parent().Flag("kafka-addr").Value.String()
ka, err := kafkaadmin.NewClient(kafkaadmin.Config{BootstrapServers: bs})
ka, err := newKafkaAdminClient(cmd)
if err != nil {
fmt.Println(err)
os.Exit(1)
Expand Down
4 changes: 1 addition & 3 deletions cmd/topicmappr/commands/rebuild.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"regexp"
"strings"

"github.com/DataDog/kafka-kit/v4/kafkaadmin"
"github.com/DataDog/kafka-kit/v4/kafkazk"

"github.com/spf13/cobra"
Expand Down Expand Up @@ -153,8 +152,7 @@ func rebuild(cmd *cobra.Command, _ []string) {
}

// Init kafkaadmin client.
bs := cmd.Parent().Flag("kafka-addr").Value.String()
ka, err := kafkaadmin.NewClient(kafkaadmin.Config{BootstrapServers: bs})
ka, err := newKafkaAdminClient(cmd)
if err != nil {
fmt.Println(err)
os.Exit(1)
Expand Down
26 changes: 26 additions & 0 deletions cmd/topicmappr/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/jamiealquiza/envy"
"github.com/spf13/cobra"
"github.com/DataDog/kafka-kit/v4/kafkaadmin"
)

var rootCmd = &cobra.Command{
Expand All @@ -24,8 +25,33 @@ func Execute() {

func init() {
rootCmd.PersistentFlags().String("kafka-addr", "localhost:9092", "Kafka bootstrap address")
rootCmd.PersistentFlags().String("kafka-ssl-ca-location", "", "Kafka SSL CA certificate location")
rootCmd.PersistentFlags().String("kafka-security-protocol", "", "Kafka security protocol")
rootCmd.PersistentFlags().String("kafka-sasl-mechanism", "", "Kafka SASL mechanism")
rootCmd.PersistentFlags().String("kafka-sasl-username", "", "Kafka SASL username")
rootCmd.PersistentFlags().String("kafka-sasl-password", "", "Kafka SASL password")
rootCmd.PersistentFlags().String("zk-addr", "localhost:2181", "ZooKeeper connect string")
rootCmd.PersistentFlags().String("zk-prefix", "", "ZooKeeper prefix (if Kafka is configured with a chroot path prefix)")
rootCmd.PersistentFlags().String("zk-metrics-prefix", "topicmappr", "ZooKeeper namespace prefix for Kafka metrics")
rootCmd.PersistentFlags().Bool("ignore-warns", false, "Produce a map even if warnings are encountered")
}

func newKafkaAdminClient(cmd *cobra.Command) (kafkaadmin.KafkaAdmin, error) {
bs, _ := cmd.Flags().GetString("kafka-addr")
ca, _ := cmd.Flags().GetString("kafka-ssl-ca-location")
sec, _ := cmd.Flags().GetString("kafka-security-protocol")
mech, _ := cmd.Flags().GetString("kafka-sasl-mechanism")
user, _ := cmd.Flags().GetString("kafka-sasl-username")
pass, _ := cmd.Flags().GetString("kafka-sasl-password")

cfg := kafkaadmin.Config{
BootstrapServers: bs,
SSLCALocation: ca,
SecurityProtocol: sec,
SASLMechanism: mech,
SASLUsername: user,
SASLPassword: pass,
}

return kafkaadmin.NewClient(cfg)
}
4 changes: 1 addition & 3 deletions cmd/topicmappr/commands/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"os"

"github.com/DataDog/kafka-kit/v4/kafkaadmin"

"github.com/spf13/cobra"
)
Expand Down Expand Up @@ -55,8 +54,7 @@ func scale(cmd *cobra.Command, _ []string) {
defer zk.Close()

// Init kafkaadmin client.
bs := cmd.Parent().Flag("kafka-addr").Value.String()
ka, err := kafkaadmin.NewClient(kafkaadmin.Config{BootstrapServers: bs})
ka, err := newKafkaAdminClient(cmd)
if err != nil {
fmt.Println(err)
os.Exit(1)
Expand Down