diff --git a/cmd/topicmappr/commands/metadata.go b/cmd/topicmappr/commands/metadata.go index 499a2b3a..0f7f9c86 100644 --- a/cmd/topicmappr/commands/metadata.go +++ b/cmd/topicmappr/commands/metadata.go @@ -4,6 +4,8 @@ import ( "fmt" "os" "time" + "io/ioutil" + "encoding/json" "github.com/DataDog/kafka-kit/kafkazk" @@ -42,6 +44,35 @@ func getBrokerMeta(cmd *cobra.Command, zk kafkazk.Handler, m bool) kafkazk.Broke } os.Exit(1) } + // Get a broker map of the brokers in the current partition map. + // If meta data isn't being looked up, brokerMeta will be empty. + bmif, _ := cmd.Flags().GetString("brokers-storage-in-file") + if bmif != "" { + jsonFile, err := os.Open(bmif) + // if we os.Open returns an error then handle it + if err != nil { + fmt.Printf("Error on %s",err) + os.Exit(1) + } + // defer the closing of our jsonFile so that we can parse it later on + defer jsonFile.Close() + data, _ := ioutil.ReadAll(jsonFile) + bmm := kafkazk.BrokerMetricsMap{} + err = json.Unmarshal(data, &bmm) + if err != nil { + fmt.Errorf("Error unmarshalling broker metrics: %s", err.Error()) + os.Exit(1) + } + // Populate each broker with + // metric data. + for bid := range brokerMeta { + m, exists := bmm[bid] + if exists { + brokerMeta[bid].StorageFree = m.StorageFree + brokerMeta[bid].MetricsIncomplete = false + } + } + } return brokerMeta } @@ -70,6 +101,24 @@ func getPartitionMeta(cmd *cobra.Command, zk kafkazk.Handler) kafkazk.PartitionM fmt.Println(err) os.Exit(1) } - + // Get a the partitionMetaMap from input file + psif, _ := cmd.Flags().GetString("partitions-size-in-file") + if psif != "" { + jsonFile, err := os.Open(psif) + // if we os.Open returns an error then handle it + if err != nil { + fmt.Printf("Error on %s", err) + os.Exit(1) + } + // defer the closing of our jsonFile so that we can parse it later on + defer jsonFile.Close() + data, _ := ioutil.ReadAll(jsonFile) + err = json.Unmarshal(data, &partitionMeta) + if err != nil { + fmt.Errorf("Error unmarshalling broker metrics: %s", err.Error()) + os.Exit(1) + } + } + //fmt.Println(partitionMeta) return partitionMeta } diff --git a/cmd/topicmappr/commands/rebuild.go b/cmd/topicmappr/commands/rebuild.go index e2cada1f..9987a146 100644 --- a/cmd/topicmappr/commands/rebuild.go +++ b/cmd/topicmappr/commands/rebuild.go @@ -29,6 +29,8 @@ func init() { rebuildCmd.Flags().Bool("use-meta", true, "Use broker metadata in placement constraints") rebuildCmd.Flags().String("out-path", "", "Path to write output map files to") rebuildCmd.Flags().String("out-file", "", "If defined, write a combined map of all topics to a file") + rebuildCmd.Flags().String("partitions-size-in-file", "", "Read Topics partitions sizes from a file") + rebuildCmd.Flags().String("brokers-storage-in-file", "", "Read Brokers free storage from a file") rebuildCmd.Flags().Bool("force-rebuild", false, "Forces a complete map rebuild") rebuildCmd.Flags().Int("replication", 0, "Normalize the topic replication factor across all replica sets (0 results in a no-op)") rebuildCmd.Flags().Bool("sub-affinity", false, "Replacement broker substitution affinity") @@ -54,7 +56,8 @@ func rebuild(cmd *cobra.Command, _ []string) { fr, _ := cmd.Flags().GetBool("force-rebuild") sa, _ := cmd.Flags().GetBool("sub-affinity") m, _ := cmd.Flags().GetBool("use-meta") - + bsif, _ := cmd.Flags().GetString("brokers-storage-in-file") + psif, _ := cmd.Flags().GetString("partitions-size-in-file") switch { case ms == "" && t == "": fmt.Println("\n[ERROR] must specify either --topics or --map-string") @@ -103,7 +106,9 @@ func rebuild(cmd *cobra.Command, _ []string) { // Fetch broker metadata. var withMetrics bool if cmd.Flag("placement").Value.String() == "storage" { - checkMetaAge(cmd, zk) + if bsif == "" || psif == "" { + checkMetaAge(cmd, zk) + } withMetrics = true } diff --git a/cmd/topicmappr/main.go b/cmd/topicmappr/main.go index c28d56e6..f8da961e 100644 --- a/cmd/topicmappr/main.go +++ b/cmd/topicmappr/main.go @@ -1,6 +1,6 @@ package main -import "github.com/DataDog/kafka-kit/cmd/topicmappr/commands" +import "kafka-kit/cmd/topicmappr/commands" func main() { commands.Execute()