diff --git a/cmd/topicmappr/commands/rebalance.go b/cmd/topicmappr/commands/rebalance.go index 4d98f4e4..c05d3ca0 100644 --- a/cmd/topicmappr/commands/rebalance.go +++ b/cmd/topicmappr/commands/rebalance.go @@ -4,7 +4,6 @@ import ( "fmt" "os" - "github.com/DataDog/kafka-kit/v4/kafkaadmin" "github.com/spf13/cobra" ) @@ -57,8 +56,7 @@ func rebalance(cmd *cobra.Command, _ []string) { defer zk.Close() // Init kafkaadmin client. - bs := cmd.Parent().Flag("kafka-addr").Value.String() - ka, err := kafkaadmin.NewClient(kafkaadmin.Config{BootstrapServers: bs}) + ka, err := newKafkaAdminClient(cmd) if err != nil { fmt.Println(err) os.Exit(1) diff --git a/cmd/topicmappr/commands/rebuild.go b/cmd/topicmappr/commands/rebuild.go index cd43e61f..ccdf3995 100644 --- a/cmd/topicmappr/commands/rebuild.go +++ b/cmd/topicmappr/commands/rebuild.go @@ -6,7 +6,6 @@ import ( "regexp" "strings" - "github.com/DataDog/kafka-kit/v4/kafkaadmin" "github.com/DataDog/kafka-kit/v4/kafkazk" "github.com/spf13/cobra" @@ -153,8 +152,7 @@ func rebuild(cmd *cobra.Command, _ []string) { } // Init kafkaadmin client. - bs := cmd.Parent().Flag("kafka-addr").Value.String() - ka, err := kafkaadmin.NewClient(kafkaadmin.Config{BootstrapServers: bs}) + ka, err := newKafkaAdminClient(cmd) if err != nil { fmt.Println(err) os.Exit(1) diff --git a/cmd/topicmappr/commands/root.go b/cmd/topicmappr/commands/root.go index 6bb6a5c0..47ab942e 100644 --- a/cmd/topicmappr/commands/root.go +++ b/cmd/topicmappr/commands/root.go @@ -6,6 +6,7 @@ import ( "github.com/jamiealquiza/envy" "github.com/spf13/cobra" + "github.com/DataDog/kafka-kit/v4/kafkaadmin" ) var rootCmd = &cobra.Command{ @@ -24,8 +25,33 @@ func Execute() { func init() { rootCmd.PersistentFlags().String("kafka-addr", "localhost:9092", "Kafka bootstrap address") + rootCmd.PersistentFlags().String("kafka-ssl-ca-location", "", "Kafka SSL CA certificate location") + rootCmd.PersistentFlags().String("kafka-security-protocol", "", "Kafka security protocol") + rootCmd.PersistentFlags().String("kafka-sasl-mechanism", "", "Kafka SASL mechanism") + rootCmd.PersistentFlags().String("kafka-sasl-username", "", "Kafka SASL username") + rootCmd.PersistentFlags().String("kafka-sasl-password", "", "Kafka SASL password") rootCmd.PersistentFlags().String("zk-addr", "localhost:2181", "ZooKeeper connect string") rootCmd.PersistentFlags().String("zk-prefix", "", "ZooKeeper prefix (if Kafka is configured with a chroot path prefix)") rootCmd.PersistentFlags().String("zk-metrics-prefix", "topicmappr", "ZooKeeper namespace prefix for Kafka metrics") rootCmd.PersistentFlags().Bool("ignore-warns", false, "Produce a map even if warnings are encountered") } + +func newKafkaAdminClient(cmd *cobra.Command) (kafkaadmin.KafkaAdmin, error) { + bs, _ := cmd.Flags().GetString("kafka-addr") + ca, _ := cmd.Flags().GetString("kafka-ssl-ca-location") + sec, _ := cmd.Flags().GetString("kafka-security-protocol") + mech, _ := cmd.Flags().GetString("kafka-sasl-mechanism") + user, _ := cmd.Flags().GetString("kafka-sasl-username") + pass, _ := cmd.Flags().GetString("kafka-sasl-password") + + cfg := kafkaadmin.Config{ + BootstrapServers: bs, + SSLCALocation: ca, + SecurityProtocol: sec, + SASLMechanism: mech, + SASLUsername: user, + SASLPassword: pass, + } + + return kafkaadmin.NewClient(cfg) +} diff --git a/cmd/topicmappr/commands/scale.go b/cmd/topicmappr/commands/scale.go index 7e97975b..75accfbc 100644 --- a/cmd/topicmappr/commands/scale.go +++ b/cmd/topicmappr/commands/scale.go @@ -4,7 +4,6 @@ import ( "fmt" "os" - "github.com/DataDog/kafka-kit/v4/kafkaadmin" "github.com/spf13/cobra" ) @@ -55,8 +54,7 @@ func scale(cmd *cobra.Command, _ []string) { defer zk.Close() // Init kafkaadmin client. - bs := cmd.Parent().Flag("kafka-addr").Value.String() - ka, err := kafkaadmin.NewClient(kafkaadmin.Config{BootstrapServers: bs}) + ka, err := newKafkaAdminClient(cmd) if err != nil { fmt.Println(err) os.Exit(1)