From a14ba36e65efab5c8fa176c6403b0fb7c6ac12cd Mon Sep 17 00:00:00 2001 From: Faisal Misbah Date: Wed, 21 Jan 2026 18:31:24 +0500 Subject: [PATCH 1/2] Fix topic deletion issue in master-slave clusters Issue: Topics cannot be permanently deleted in master-slave RocketMQ clusters because slave brokers incorrectly delete all topics during synchronization. Root Cause: In SlaveSynchronize.java, the deleteTopicConfig() call was placed outside the conditional check, causing ALL topics to be deleted from slaves during sync, not just the ones missing from master's config. Fix: Move deleteTopicConfig() and deleteSubscriptionGroupConfig() calls inside the conditional blocks so they only execute for topics/subscription groups that should actually be deleted. This addresses GitHub issue #10030 and is related to issue #9984. Changes: - syncTopicConfig(): Move topicConfigManager.deleteTopicConfig() inside if block - syncSubscriptionGroupConfig(): Move subscriptionGroupManager.deleteSubscriptionGroupConfig() inside if block --- .../org/apache/rocketmq/broker/slave/SlaveSynchronize.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java index 2e3134016c7..4317e53df9e 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java @@ -94,8 +94,8 @@ private void syncTopicConfig() { Map.Entry entry = iterator.next(); if (!newTopicConfigTable.containsKey(entry.getKey())) { iterator.remove(); + topicConfigManager.deleteTopicConfig(entry.getKey()); } - topicConfigManager.deleteTopicConfig(entry.getKey()); } //update @@ -189,8 +189,8 @@ private void syncSubscriptionGroupConfig() { Map.Entry configEntry = iterator.next(); if (!newSubscriptionGroupTable.containsKey(configEntry.getKey())) { iterator.remove(); + subscriptionGroupManager.deleteSubscriptionGroupConfig(configEntry.getKey()); } - subscriptionGroupManager.deleteSubscriptionGroupConfig(configEntry.getKey()); } // update newSubscriptionGroupTable.values().forEach(subscriptionGroupManager::putSubscriptionGroupConfig); From ddb971e77e0518bed43d56912b7c2b277ad646c3 Mon Sep 17 00:00:00 2001 From: Faisal Misbah Date: Thu, 29 Jan 2026 08:08:42 +0500 Subject: [PATCH 2/2] test: add tests for topic and subscription group deletion --- .../broker/slave/SlaveSynchronizeTest.java | 69 +++++++++++++++++++ 1 file changed, 69 insertions(+) diff --git a/broker/src/test/java/org/apache/rocketmq/broker/slave/SlaveSynchronizeTest.java b/broker/src/test/java/org/apache/rocketmq/broker/slave/SlaveSynchronizeTest.java index c9461c42240..ccd3ec36afa 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/slave/SlaveSynchronizeTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/slave/SlaveSynchronizeTest.java @@ -36,6 +36,7 @@ import org.apache.rocketmq.remoting.protocol.DataVersion; import org.apache.rocketmq.remoting.protocol.body.ConsumerOffsetSerializeWrapper; import org.apache.rocketmq.remoting.protocol.body.MessageRequestModeSerializeWrapper; +import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper; import org.apache.rocketmq.remoting.protocol.body.TopicConfigAndMappingSerializeWrapper; import org.apache.rocketmq.store.MessageStore; @@ -205,4 +206,72 @@ private TimerMetrics.TimerMetricsSerializeWrapper createTimerMetricsWrapper() { wrapper.setDataVersion(dataVersion); return wrapper; } + + @Test + public void testSyncTopicConfig_withDeletion() throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException, UnsupportedEncodingException, RemotingCommandException { + // Setup: Slave has existing topics that should be deleted + TopicConfig existingTopic1 = new TopicConfig("ToBeDeleted1"); + TopicConfig existingTopic2 = new TopicConfig("ToBeDeleted2"); + TopicConfig existingTopic3 = new TopicConfig("ToBeKept"); + + when(topicConfigManager.getTopicConfigTable()).thenReturn(new ConcurrentHashMap<>()); + topicConfigManager.getTopicConfigTable().put("ToBeDeleted1", existingTopic1); + topicConfigManager.getTopicConfigTable().put("ToBeDeleted2", existingTopic2); + topicConfigManager.getTopicConfigTable().put("ToBeKept", existingTopic3); + + // Master returns only some topics (not the ones to be deleted) + TopicConfig newTopic = new TopicConfig("NewTopic"); + TopicConfigAndMappingSerializeWrapper wrapper = createTopicConfigWrapper(newTopic); + wrapper.getTopicConfigTable().put("ToBeKept", existingTopic3); + + when(brokerOuterAPI.getAllTopicConfig(anyString())).thenReturn(wrapper); + when(brokerOuterAPI.getAllConsumerOffset(anyString())).thenReturn(createConsumerOffsetWrapper()); + when(brokerOuterAPI.getAllDelayOffset(anyString())).thenReturn(""); + when(brokerOuterAPI.getAllSubscriptionGroupConfig(anyString())).thenReturn(createSubscriptionGroupWrapper()); + when(brokerOuterAPI.getAllMessageRequestMode(anyString())).thenReturn(createMessageRequestModeWrapper()); + when(brokerOuterAPI.getTimerMetrics(anyString())).thenReturn(createTimerMetricsWrapper()); + when(topicConfigManager.getDataVersion()).thenReturn(new DataVersion()); + + slaveSynchronize.syncAll(); + + // Verify deleteTopicConfig was called for the removed topics + org.mockito.Mockito.verify(topicConfigManager).deleteTopicConfig("ToBeDeleted1"); + org.mockito.Mockito.verify(topicConfigManager).deleteTopicConfig("ToBeDeleted2"); + } + + @Test + public void testSyncSubscriptionGroupConfig_withDeletion() throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException, UnsupportedEncodingException, RemotingCommandException { + // Setup: Slave has existing subscription groups that should be deleted + SubscriptionGroupConfig existingGroup1 = new SubscriptionGroupConfig(); + existingGroup1.setGroupName("ToBeDeletedGroup1"); + + SubscriptionGroupConfig existingGroup2 = new SubscriptionGroupConfig(); + existingGroup2.setGroupName("ToBeDeletedGroup2"); + + SubscriptionGroupConfig existingGroup3 = new SubscriptionGroupConfig(); + existingGroup3.setGroupName("ToBeKeptGroup"); + + when(subscriptionGroupManager.getSubscriptionGroupTable()).thenReturn(new ConcurrentHashMap<>()); + subscriptionGroupManager.getSubscriptionGroupTable().put("ToBeDeletedGroup1", existingGroup1); + subscriptionGroupManager.getSubscriptionGroupTable().put("ToBeDeletedGroup2", existingGroup2); + subscriptionGroupManager.getSubscriptionGroupTable().put("ToBeKeptGroup", existingGroup3); + + // Master returns only some subscription groups (not the ones to be deleted) + SubscriptionGroupWrapper wrapper = createSubscriptionGroupWrapper(); + wrapper.getSubscriptionGroupTable().put("ToBeKeptGroup", existingGroup3); + + when(brokerOuterAPI.getAllTopicConfig(anyString())).thenReturn(createTopicConfigWrapper(new TopicConfig("NewTopic"))); + when(brokerOuterAPI.getAllConsumerOffset(anyString())).thenReturn(createConsumerOffsetWrapper()); + when(brokerOuterAPI.getAllDelayOffset(anyString())).thenReturn(""); + when(brokerOuterAPI.getAllSubscriptionGroupConfig(anyString())).thenReturn(wrapper); + when(brokerOuterAPI.getAllMessageRequestMode(anyString())).thenReturn(createMessageRequestModeWrapper()); + when(brokerOuterAPI.getTimerMetrics(anyString())).thenReturn(createTimerMetricsWrapper()); + when(subscriptionGroupManager.getDataVersion()).thenReturn(new DataVersion()); + + slaveSynchronize.syncAll(); + + // Verify deleteSubscriptionGroupConfig was called for the removed groups + org.mockito.Mockito.verify(subscriptionGroupManager).deleteSubscriptionGroupConfig("ToBeDeletedGroup1"); + org.mockito.Mockito.verify(subscriptionGroupManager).deleteSubscriptionGroupConfig("ToBeDeletedGroup2"); + } }