Skip to content

Commit 2481012

Browse files
author
Robert Bernhof
committed
feat(apply): add --keep-throttle flag to preserve replication throttles
Add a new `--keep-throttle` boolean flag to the apply command that allows users to preserve replication throttle settings instead of having them automatically removed after applying topic configurations. This is particularly useful for topics where throttle settings (leader.replication.throttled.replicas, follower.replication.throttled.replicas) are intentionally configured in the topic YAML and should be maintained across applies. Changes: - Add --keep-throttle flag to apply command - Skip automatic throttle removal when flag is set - Update help documentation Example usage: topicctl apply --keep-throttle --skip-confirm \ --cluster-config cluster.yaml topics/my-topic.yaml
1 parent 6a0e620 commit 2481012

4 files changed

Lines changed: 106 additions & 2 deletions

File tree

README.md

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ docker-compose down
124124
#### apply
125125

126126
```
127-
topicctl apply [path(s) to topic config(s)]
127+
topicctl apply [flags] [path(s) to topic config(s)]
128128
```
129129

130130
The `apply` subcommand ensures that the actual state of a topic in the cluster
@@ -135,6 +135,28 @@ then the tool will initiate the necessary changes to bring it into compliance.
135135
See the [Config formats](#config-formats) section below for more information on the
136136
expected file formats.
137137

138+
##### apply flags
139+
140+
| Flag | Type | Default | Description |
141+
|------|------|---------|-------------|
142+
| `--skip-confirm` | bool | false | Skip confirmation prompts |
143+
| `--keep-throttle` | bool | false | Keep replication throttle settings instead of removing them after apply |
144+
| `--no-spinner` | bool | false | Disable progress spinner |
145+
| `--conn-timeout` | duration | 10s | Connection timeout |
146+
| `--rebalance` | bool | false | Rebalance topic replicas after apply |
147+
148+
##### Apply Examples
149+
150+
Apply while preserving replication throttle settings with skip confirm:
151+
```bash
152+
topicctl apply --skip-confirm --keep-throttle \
153+
--cluster-config examples/local-cluster/cluster.yaml examples/local-cluster/topics/my-topic.yaml
154+
```
155+
156+
This is useful for topics where throttle settings (e.g., `leader.replication.throttled.replicas`,
157+
`follower.replication.throttled.replicas`) are intentionally configured and should be maintained
158+
across applies.
159+
138160
#### bootstrap
139161

140162
```

cmd/topicctl/subcmd/apply.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ type applyCmdConfig struct {
3939
ignoreFewerPartitionsError bool
4040
sleepLoopDuration time.Duration
4141
failFast bool
42+
keepThrottle bool
4243

4344
shared sharedOptions
4445

@@ -120,6 +121,12 @@ func init() {
120121
true,
121122
"Fail upon the first error encountered during apply process",
122123
)
124+
applyCmd.Flags().BoolVar(
125+
&applyConfig.keepThrottle,
126+
"keep-throttle",
127+
false,
128+
"Keep replication throttle settings instead of removing them",
129+
)
123130

124131
addSharedConfigOnlyFlags(applyCmd, &applyConfig.shared)
125132
RootCmd.AddCommand(applyCmd)
@@ -263,6 +270,7 @@ func applyTopic(
263270
IgnoreFewerPartitionsError: applyConfig.ignoreFewerPartitionsError,
264271
SleepLoopDuration: applyConfig.sleepLoopDuration,
265272
TopicConfig: topicConfig,
273+
KeepThrottle: applyConfig.keepThrottle,
266274
}
267275

268276
if err := cliRunner.ApplyTopic(ctx, applierConfig); err != nil {

pkg/apply/apply.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ type TopicApplierConfig struct {
3838
IgnoreFewerPartitionsError bool
3939
SleepLoopDuration time.Duration
4040
TopicConfig config.TopicConfig
41+
KeepThrottle bool
4142
}
4243

4344
// TopicApplier executes an "apply" run on a topic by comparing the actual
@@ -281,7 +282,7 @@ func (t *TopicApplier) checkExistingState(
281282
return nil
282283
}
283284

284-
if topicInfo.IsThrottled() {
285+
if topicInfo.IsThrottled() && !t.config.KeepThrottle {
285286
log.Infof(
286287
"It looks there are still throttles on the topic (config: %+v)",
287288
topicInfo.Config,

pkg/apply/apply_test.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -901,6 +901,79 @@ func TestApplyOverrides(t *testing.T) {
901901
assert.Equal(t, applier.maxBatchSize, 8)
902902
}
903903

904+
func TestApplyKeepThrottle(t *testing.T) {
905+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
906+
defer cancel()
907+
topicName := util.RandomString("apply-keep-throttle-", 6)
908+
topicConfig := config.TopicConfig{
909+
Meta: config.ResourceMeta{
910+
Name: topicName,
911+
Cluster: "test-cluster",
912+
Region: "test-region",
913+
Environment: "test-environment",
914+
},
915+
Spec: config.TopicSpec{
916+
Partitions: 3,
917+
ReplicationFactor: 2,
918+
RetentionMinutes: 500,
919+
PlacementConfig: config.TopicPlacementConfig{
920+
Strategy: config.PlacementStrategyStatic,
921+
Picker: config.PickerMethodLowestIndex,
922+
StaticAssignments: [][]int{
923+
{1, 2},
924+
{2, 3},
925+
{1, 3},
926+
},
927+
},
928+
MigrationConfig: &config.TopicMigrationConfig{
929+
ThrottleMB: 2,
930+
PartitionBatchSize: 3,
931+
},
932+
},
933+
}
934+
// Create topic
935+
applier := testApplier(ctx, t, topicConfig)
936+
defer applier.adminClient.Close()
937+
err := applier.Apply(ctx)
938+
require.NoError(t, err)
939+
940+
topicInfo, err := applier.adminClient.GetTopic(ctx, topicName, true)
941+
require.NoError(t, err)
942+
// Topic should not be throttled initially
943+
assert.False(t, topicInfo.IsThrottled())
944+
945+
supported := applier.adminClient.GetSupportedFeatures()
946+
if !(supported.Locks && supported.DynamicBrokerConfigs) {
947+
// This test only works on zk-based clients for now
948+
return
949+
}
950+
951+
// Manually add throttles to the topic
952+
_, err = applier.adminClient.UpdateTopicConfig(
953+
ctx,
954+
topicName,
955+
[]kafka.ConfigEntry{
956+
{
957+
ConfigName: admin.LeaderReplicasThrottledKey,
958+
ConfigValue: "0:1,1:2,2:3",
959+
},
960+
{
961+
ConfigName: admin.FollowerReplicasThrottledKey,
962+
ConfigValue: "0:2,1:3,2:1",
963+
},
964+
},
965+
true,
966+
)
967+
require.NoError(t, err)
968+
969+
topicInfo, err = applier.adminClient.GetTopic(ctx, topicName, true)
970+
require.NoError(t, err)
971+
// Topic should now be throttled
972+
assert.True(t, topicInfo.IsThrottled())
973+
974+
// Test 1: keepThrottle = false (default behavior - should remove throttles)
975+
}
976+
904977
func testTopicName(name string) string {
905978
return util.RandomString(fmt.Sprintf("topic-%s-", name), 6)
906979
}

0 commit comments

Comments
 (0)