diff --git a/cmd/autothrottle/main.go b/cmd/autothrottle/main.go index 90d78d59..1f925be9 100644 --- a/cmd/autothrottle/main.go +++ b/cmd/autothrottle/main.go @@ -245,8 +245,10 @@ func main() { // the Kafka topic throttled replicas list. This minimizes // state that must be propagated through the cluster. if topicsReplicatingNow.isSubSet(topicsReplicatingPreviously) { + log.Println("No new reassigning topics to throttle, disable topic updates") throttleManager.DisableTopicUpdates() } else { + log.Println("New reassigning topics to throttle, enable topic updates") throttleManager.EnableTopicUpdates() // Unset any previously stored throttle rates. This is done to avoid a // scenario that results in autothrottle being unaware of externally @@ -334,6 +336,8 @@ func main() { // TODO(jamie): is there a scenario where we should exclude topics // have also have a reassignment? We're discovering topics here by // reverse lookup of brokers that are not reassignment participants. + log.Println("There are broker overrides") + log.Print(throttleManager.GetBrokerOverrides()) var err error otl, err := throttleManager.GetTopicsWithThrottledBrokers() if err != nil { @@ -344,15 +348,18 @@ func main() { // Determine whether we need to propagate topic throttle replica // list configs. If the brokers with overrides remains the same, - // we don't need to need to update those configs. + // we don't need to update those configs. var brokersThrottledNow = newSet() for broker := range activeOverrideBrokers { brokersThrottledNow.add(strconv.Itoa(broker)) } - + log.Printf("BrokersThrottledNow: %v", brokersThrottledNow) + log.Printf("BrokersThrottledPreviously: %v", brokersThrottledPreviously) if brokersThrottledNow.equal(brokersThrottledPreviously) { + log.Println("No new brokers to throttle, disable topic updates") throttleManager.DisableOverrideTopicUpdates() } else { + log.Println("New brokers to throttle, enable topic updates") throttleManager.EnableOverrideTopicUpdates() }